当前位置: 首页 > 工具软件 > pipelines > 使用案例 >

scrapy配置多条pipelines存储csv文件,存MySQL数据库

呼延河
2023-12-01

又是一篇干货很多的文章,我把代码都给记录下来了 ,算是笔记。

首先是spider
他这个导包这个真是蛋疼,pycharm会报错,这个要注意一下就行了,这个没啥玩意就是搞成item就行了

先弄一个item
item

import scrapy


class LotterySpiderItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    qihao = scrapy.Field()
    red = scrapy.Field()
    blue = scrapy.Field()

spdier

import scrapy

from lottery_spider.items import LotterySpiderItem


class LearnLotterySpider(scrapy.Spider):
    # name = 'learn_lottery'
    name = 'ssq'
    allowed_domains = ['500.com']
    start_urls = ['http://datachart.500.com/ssq/']

    def parse(self, resp, **kwargs):
        trs = resp.xpath("//tbody[@id='tdata']/tr")
        for tr in trs:
            red_ball = tr.xpath("./td[@class='chartBall01']/text()").extract()  # extract() 默认返回一堆内容
            blue_ball = tr.xpath("./td[@class='chartBall02']/text()").extract_first()  # extract_first()  只拿到第一项
            qihao = tr.xpath("./td[1]/text()").extract_first()
            # 针对当前这个程序而言. 我想要的数据中. qihao, red, blue 每一个都不应该是空
            if qihao and red_ball and blue_ball:
                item = LotterySpiderItem()
                item['qihao'] = qihao
                item['red'] = red_ball
                item['blue'] = blue_ball
                yield item

然后这里说流水线 pipeline 这里有几个点

settings

这里可以塞入一个连接MySQL的配置,代码和配置文件分开
在settings里可以弄多个流水线,他那名字就是流水线的名字,数字越小,优先级高

ITEM_PIPELINES = {
    'lottery_spider.pipelines.LotterySpiderPipeline': 300,
    'lottery_spider.pipelines.LotterySpiderPipeline_mysql': 301,
}

MYSQL_CONFIG = {
   "HOST": "127.0.0.1",
   "PORT": 3306,
   'USER': "root",
   "PASSWORD": "password",
   "DATABASE": "database"
}

这里存MySQL,我用了老办法,就直接搞封装那一套,也是没问题的,但就是得注意包的引用关系,不然你看着没问题,就是找不到模块

mysql

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import pymysql


class MysqlHelper:
    def __init__(self, host, port, db, user, passwd, charset='utf8'):
        self.host = host
        self.port = port
        self.db = db
        self.user = user
        self.passwd = passwd
        self.charset = charset

    def connect(self):
        self.conn = pymysql.connect(host=self.host, port=self.port, db=self.db, user=self.user, passwd=self.passwd,
                                    charset=self.charset)
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

    def close(self):
        self.cursor.close()
        self.conn.close()

    def get_all(self, sql):
        res = ()
        try:
            self.connect()
            self.cursor.execute(sql)
            res = self.cursor.fetchall()
        except Exception as e:
            print(e)
        return res

    def insert(self, sql):
        try:
            self.connect()
            self.cursor.execute(sql)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            print(e)

    def update(self, sql):
        try:
            self.connect()
            self.cursor.execute(sql)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            print(e)

    def delete(self, sql):
        try:
            self.connect()
            self.cursor.execute(sql)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            print(e)

savedata

#!/usr/bin/python3
# -*- coding: utf-8 -*-

from lottery_spider.MysqlHelper import MysqlHelper


class LotteryInfoHelper:

    def __init__(self, ip, port, db_name, db_user, db_password):
        self.ip = ip
        self.port = port
        self.db_name = db_name
        self.db_user = db_user
        self.db_password = db_password

    def save_data(self, **dict_info):
        mysqlhelper = MysqlHelper(self.ip, self.port, self.db_name, self.db_user, self.db_password)
        sql = "insert into test_code (`number`,blue,red) value ('%s','%s','%s')" % (
            dict_info['number'], dict_info['blue'], dict_info['red'])
        mysqlhelper.insert(sql)
        mysqlhelper.close()

重点来了 ,pipelines

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter

from lottery_spider import settings
from lottery_spider.LotteryInfo import LotteryInfoHelper

# 可以直接用配置文件里的变量了
lottery = LotteryInfoHelper(
            settings.MYSQL_CONFIG['HOST'],
            settings.MYSQL_CONFIG['PORT'],
            settings.MYSQL_CONFIG['DATABASE'],
            settings.MYSQL_CONFIG['USER'],
            settings.MYSQL_CONFIG['PASSWORD']
        )

"""
关于文件这个
我们希望的是, 能不能打开一个文件, 然后就用这一个文件句柄来完成数据的保存. 答案是可以的. 我们可以在pipeline中创建两个方法, 一个是open_spider(), 另一个是close_spider(). 看名字也能明白其含义: 

​		open_spider(), 在爬虫开始的时候执行一次
​		close_spider(), 在爬虫结束的时候执行一次

​		有了这俩货, 我们就可以很简单的去处理这个问题
"""
class LotterySpiderPipeline:

    def open_spider(self, spider):  # 在启动一个spider的时候自动运行
        self.f = open("shuangseqiu.csv", mode="w", encoding='utf-8')

    def close_spider(self, spider):  # 在关闭一个spider的时候自动运行
        self.f.close()
"""
# 这个方法的声明不能动!!! 在spider返回的数据会自动的调用这里的process_item方法. 
    # 你把它改了. 管道就断了
"""
    def process_item(self, item, spider):
        if spider.name == 'ssq':
            self.f.write(f"{item['qihao'].strip()},{','.join(item['red'])},{item['blue']}\n")
        return item


# 存储到Mysql
class LotterySpiderPipeline_mysql:

    def process_item(self, item, spider):
        data_info = {}
        data_info['number'] = item['qihao']
        data_info['blue'] = item['blue']
        data_info['red'] = ','.join(item['red'])
        lottery.save_data(**data_info)
        return item
        """
        # 必须要返回item. 要不然下一个管道接收不到数据
        """

"""
还有一种存MySQL的方法,供参考,这个就跟上面文件那个套路差不多,上面的封装MySQL套路是我自创的。
"""
class LotterySpiderPipeline_mysql:

    def open_spider(self, spider):
        self.conn = pymysql.connect(host=mysql["host"], port=mysql["port"], user=mysql["user"], password=mysql["password"], database=mysql["database"])

    def close_spider(self, spider):
        self.conn.close()

    def process_item(self, item, spider):
        # 写入文件
        try:
            cursor = self.conn.cursor()
            sql = "insert into test_code(number, red, blue) values(%s, %s, %s)"
            red = ",".join(item['red_ball'])
            blue = ",".join(item['blue_ball'])
            cursor.execute(sql, (item['number'], red, blue))
            self.conn.commit()
            spider.logger.info(f"保存数据{item}")
        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f"保存数据库失败!", e, f"数据是: {item}")  # 记录错误日志
        return item

 类似资料: