学科分类
目录
网络爬虫

使用多个管道存储

在Scrapy-Redis项目中,同样可以定义多个管道,并让这些管道按照定义的顺序依次处理Item数据。打开创建的pipelines.py文件,其内部还没有设置任何内容,在该文件中,自定义多个管道类,每个独立的管道类代码如下:

1. AqiCSVPipeline类

scrapy.exporters提供了导出数据的功能,它使用ItemExporter来创建不同的输出格式,如XML、CSV。针对不同的文件格式,Scrapy专门提供相应的内置类分别进行处理,其中,CsvItemExporter类表示用于输出csv(逗号分隔值)文件格式的读写对象。

要想使用CsvItemExporter类,需要使用如下方法进行实例化:

__init__(self, file, include_headers_line=True, join_multivalued=',', **kwargs)

上述参数的含义如下:

  • file:文件。

  • include_headers_line:启动后,ItemExporter会输出第一行为列名,列名从 BaseItemExporter.fields_to_export 或第一个item fields获取。

  • join_multivalued:将用于连接多个值的fields。

创建完CsvItemExporter类对象后,必须按照如下三个步骤使用:

(1)调用 start_exporting() 方法以标识输出过程的开始;

(2)对要导出的每个项目调用 export_item() 方法;

(3)调用finish_exporting()方法以标识输出过程的结束。

按照上述要求编写AqiCSVipeline类,具体代码如下。

from scrapy.exporters import CsvItemExporter
class AqiCSVPipeline(object):
   def open_spider(self, spider):
     # 创建csv格式的文件
     self.file = open("aqi.csv", "w")
     # 创建csv文件读写对象,将数据写入到指定的文件中
     self.csv_exporter = CsvItemExporter(self.file)
     # 开始执行item数据读写
     self.csv_exporter.start_exporting()
   def process_item(self, item, spider):
     # 将item数据写入到文件中
     self.csv_exporter.export_item(item)
     return item
   def close_spider(self, spider):
     # 结束文件读写操作
     self.csv_exporter.finish_exporting()
     # 关闭文件
     self.file.close()

2. AqiRedisPipeline类

此管道负责将Item数据写入到Redis数据库中。在Python中,提供了用于操作Redis数据库的第三方模块redis,所以要先在pipelines.py文件中导入该模块:

import redis

redis模块提供了两个类:Redis和StrictRedis,用于实现操作Redis数据库的命令。其中,StrictRedis用于实现大部分官方的命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

在使用Redis之前,可以使用如下方式创建一个Redis数据库的连接:

redis_cli = redis.Redis(host="192.168.64.99", port=6379)

AqiRedisPipeline类的实现代码如下:

import redis
import json
class AqiRedisPipeline(object):
   def open_spider(self, spider):
     self.redis_cli = redis.Redis(host="192.168.64.99", port=6379)
   def process_item(self, item, spider):
     content = json.dumps(dict(item), ensure_ascii=False)
     self.redis_cli.lpush("AQI_List", content)
     return item

上述定义中,在open_spide(开始爬虫))方法中,创建了一个Redis数据库的连接redis_cli。在process_item方法中,调用json模块的dumps函数,将Item数据转换成字符串格式,之后调用lpush方法将数据写入到指定的Redis数据库中,并以列表的形式存储。

3. AqiMongoPipeline类

此管道用于将Item数据保存到MongoDB数据库中。因此,需要创建一个MongoDB数据库的连接,在这之前要先导入操作MongoDB数据库的模块,代码如下:

import pymongo

AqiMongoPipeline类的具体代码如下。

class AqiMongoPipeline(object):
   def open_spider(self, spider):
     self.mongo_cli = pymongo.MongoClient(host="127.0.0.1", port=27017)
     self.db = self.mongo_cli["AQI"]
     self.sheet = self.db['AQI_item']
   def process_item(self, item, spider): 
     self.sheet.insert(dict(item))
     return item

在上述代码中,开始爬虫时,创建了一个数据库连接mongo_cli,再创建了一个数据库db和列表sheet。在process_item方法中,调用insert方法将Item数据以字典的形式存入到sheet列表中。

打开settings.py文件,启动上述自定义的这些管道组件,ITEM_PIPELINES 配置项的设置如下:

ITEM_PIPELINES = {
   'AQI.pipelines.AqiCSVPipeline':200,
   'AQI.pipelines.AqiRedisPipeline':300,
   'AQI.pipelines.AqiMongoPipeline':400,
   'scrapy_redis.pipelines.RedisPipeline':900
 }

数值越低,管道的优先级越高,所以上述这些管道的执行顺序是自上而下的(AqiUTCPipeline->AqiCSVPipeline->AqiRedisPipeline->AqiMongoPipeline)。需要注意的是,如果指定了scrapy_redis提供的管道,一般给定的数值会偏大,这样可以保证其是最后执行的。

执行程序,Item数据分别交由上述的每个管道进行处理,将爬虫数据分别保存到csv文件、Redis数据库和MongoDB数据库中。

点击此处
隐藏目录