使用多个管道存储
在Scrapy-Redis项目中,同样可以定义多个管道,并让这些管道按照定义的顺序依次处理Item数据。打开创建的pipelines.py文件,其内部还没有设置任何内容,在该文件中,自定义多个管道类,每个独立的管道类代码如下:
1. AqiCSVPipeline类
scrapy.exporters提供了导出数据的功能,它使用ItemExporter来创建不同的输出格式,如XML、CSV。针对不同的文件格式,Scrapy专门提供相应的内置类分别进行处理,其中,CsvItemExporter类表示用于输出csv(逗号分隔值)文件格式的读写对象。
要想使用CsvItemExporter类,需要使用如下方法进行实例化:
__init__(self, file, include_headers_line=True, join_multivalued=',', **kwargs)
上述参数的含义如下:
file:文件。
include_headers_line:启动后,ItemExporter会输出第一行为列名,列名从 BaseItemExporter.fields_to_export 或第一个item fields获取。
join_multivalued:将用于连接多个值的fields。
创建完CsvItemExporter类对象后,必须按照如下三个步骤使用:
(1)调用 start_exporting() 方法以标识输出过程的开始;
(2)对要导出的每个项目调用 export_item() 方法;
(3)调用finish_exporting()方法以标识输出过程的结束。
按照上述要求编写AqiCSVipeline类,具体代码如下。
from scrapy.exporters import CsvItemExporter
class AqiCSVPipeline(object):
def open_spider(self, spider):
# 创建csv格式的文件
self.file = open("aqi.csv", "w")
# 创建csv文件读写对象,将数据写入到指定的文件中
self.csv_exporter = CsvItemExporter(self.file)
# 开始执行item数据读写
self.csv_exporter.start_exporting()
def process_item(self, item, spider):
# 将item数据写入到文件中
self.csv_exporter.export_item(item)
return item
def close_spider(self, spider):
# 结束文件读写操作
self.csv_exporter.finish_exporting()
# 关闭文件
self.file.close()
2. AqiRedisPipeline类
此管道负责将Item数据写入到Redis数据库中。在Python中,提供了用于操作Redis数据库的第三方模块redis,所以要先在pipelines.py文件中导入该模块:
import redis
redis模块提供了两个类:Redis和StrictRedis,用于实现操作Redis数据库的命令。其中,StrictRedis用于实现大部分官方的命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
在使用Redis之前,可以使用如下方式创建一个Redis数据库的连接:
redis_cli = redis.Redis(host="192.168.64.99", port=6379)
AqiRedisPipeline类的实现代码如下:
import redis
import json
class AqiRedisPipeline(object):
def open_spider(self, spider):
self.redis_cli = redis.Redis(host="192.168.64.99", port=6379)
def process_item(self, item, spider):
content = json.dumps(dict(item), ensure_ascii=False)
self.redis_cli.lpush("AQI_List", content)
return item
上述定义中,在open_spide(开始爬虫))方法中,创建了一个Redis数据库的连接redis_cli。在process_item方法中,调用json模块的dumps函数,将Item数据转换成字符串格式,之后调用lpush方法将数据写入到指定的Redis数据库中,并以列表的形式存储。
3. AqiMongoPipeline类
此管道用于将Item数据保存到MongoDB数据库中。因此,需要创建一个MongoDB数据库的连接,在这之前要先导入操作MongoDB数据库的模块,代码如下:
import pymongo
AqiMongoPipeline类的具体代码如下。
class AqiMongoPipeline(object):
def open_spider(self, spider):
self.mongo_cli = pymongo.MongoClient(host="127.0.0.1", port=27017)
self.db = self.mongo_cli["AQI"]
self.sheet = self.db['AQI_item']
def process_item(self, item, spider):
self.sheet.insert(dict(item))
return item
在上述代码中,开始爬虫时,创建了一个数据库连接mongo_cli,再创建了一个数据库db和列表sheet。在process_item方法中,调用insert方法将Item数据以字典的形式存入到sheet列表中。
打开settings.py文件,启动上述自定义的这些管道组件,ITEM_PIPELINES 配置项的设置如下:
ITEM_PIPELINES = {
'AQI.pipelines.AqiCSVPipeline':200,
'AQI.pipelines.AqiRedisPipeline':300,
'AQI.pipelines.AqiMongoPipeline':400,
'scrapy_redis.pipelines.RedisPipeline':900
}
数值越低,管道的优先级越高,所以上述这些管道的执行顺序是自上而下的(AqiUTCPipeline->AqiCSVPipeline->AqiRedisPipeline->AqiMongoPipeline)。需要注意的是,如果指定了scrapy_redis提供的管道,一般给定的数值会偏大,这样可以保证其是最后执行的。
执行程序,Item数据分别交由上述的每个管道进行处理,将爬虫数据分别保存到csv文件、Redis数据库和MongoDB数据库中。