当前位置: 首页 > 文档资料 > Python 全栈 >

7. ItemPipeline的使用

优质
小牛编辑
133浏览
2023-12-01
  • 当Item在Spider中被收集之后,它将会被传递到Item Pipeline,一些组件会按照一定的顺序执行对Item的处理。

  • 每个item pipeline组件(有时称之为“Item Pipeline”)是实现了简单方法的Python类。他们接收到Item并通过它执行一些行为,同时也决定此Item是否继续通过pipeline,或是被丢弃而不再进行处理。

  • 以下是item pipeline的一些典型应用:

    • 清理HTML数据
    • 验证爬取的数据(检查item包含某些字段)
    • 查重(并丢弃)
    • 将爬取结果保存到数据库中

7.1 如何编写你自己的item pipeline

  • 编写你自己的item pipeline很简单,每个item pipiline组件是一个独立的Python类,同时必须实现以下方法:

process_item(item, spider)

  • 每个item pipeline组件都需要调用该方法,这个方法必须返回一个 Item (或任何继承类)对象, 或是抛出 DropItem 异常,被丢弃的item将不会被之后的pipeline组件所处理。
  • 参数:

    • item (Item 对象) – 被爬取的item
    • spider (Spider 对象) – 爬取该item的spider
  • 此外,他们也可以实现以下方法:

open_spider(spider)

  • 当spider被开启时,这个方法被调用。
  • 参数: spider (Spider 对象) – 被开启的spider

close_spider(spider)

  • 当spider被关闭时,这个方法被调用
  • 参数: spider (Spider 对象) – 被关闭的spider

7.2 样例:

验证价格,同时丢弃没有价格的item

  • 让我们来看一下以下这个假设的pipeline,它为那些不含税(price_excludes_vat 属性)的item调整了 price 属性,同时丢弃了那些没有价格的item:
from scrapy.exceptions import DropItem

class PricePipeline(object):

    vat_factor = 1.15

    def process_item(self, item, spider):
        if item['price']:
            if item['price_excludes_vat']:
                item['price'] = item['price'] * self.vat_factor
            return item
        else:
            raise DropItem("Missing price in %s" % item)

将item写入JSON文件:

  • 以下pipeline将所有(从所有spider中)爬取到的item,存储到一个独立地 items.jl 文件,每行包含一个序列化为JSON格式的item:
import json

class JsonWriterPipeline(object):

    def __init__(self):
        self.file = open('items.jl', 'wb')

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item
  • 注解:JsonWriterPipeline的目的只是为了介绍怎样编写item pipeline,如果你想要将所有爬取的item都保存到同一个JSON文件, 你需要使用 Feed exports 。

去重

  • 一个用于去重的过滤器,丢弃那些已经被处理过的item。让我们假设我们的item有一个唯一的id,但是我们spider返回的多个item中包含有相同的id:
from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

启用一个Item Pipeline组件:

  • 为了启用一个Item Pipeline组件,你必须将它的类添加到 ITEM_PIPELINES 配置,就像下面这个例子:
ITEM_PIPELINES = {
    'myproject.pipelines.PricePipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 800,
}
  • 分配给每个类的整型值,确定了他们运行的顺序,item按数字从低到高的顺序,通过pipeline,通常将这些数字定义在0-1000范围内。

7.3 Scrapy框架案例实战:

  • 任务:爬取csdn学院中的课程信息(编程语言的)

  • 网址:https://edu.csdn.net/courses/o280/p1 (第一页)

  • https://edu.csdn.net/courses/o280/p2 (第二页)

① 创建项目

  • 在命令行编写下面命令,创建项目demo
scrapy startproject  educsdn
  • 项目目录结构:
educsdn
├── educsdn
│   ├── __init__.py
│   ├── __pycache__
│   ├── items.py        # Items的定义,定义抓取的数据结构
│   ├── middlewares.py  # 定义Spider和DownLoader的Middlewares中间件实现。 
│   ├── pipelines.py    # 它定义Item Pipeline的实现,即定义数据管道
│   ├── settings.py     # 它定义项目的全局配置
│   └── spiders         # 其中包含一个个Spider的实现,每个Spider都有一个文件
│       ├── __init__.py
│       └── __pycache__
└── scrapy.cfg    #Scrapy部署时的配置文件,定义了配置文件路径、部署相关信息等内容

② 进入educsdn项目目录,创建爬虫spider类文件(courses课程)

  • 执行genspider命令,第一个参数是Spider的名称,第二个参数是网站域名。
scrapy genspider courses edu.csdn.net

$ tree 

├── demo
│   ├── __init__.py
│   ├── __pycache__
│   │   ├── __init__.cpython-36.pyc
│   │   └── settings.cpython-36.pyc
│   ├── items.py
│   ├── middlewares.py
│   ├── pipelines.py
│   ├── settings.py
│   └── spiders
│       ├── __init__.py
│       ├── __pycache__
│       │   └── __init__.cpython-36.pyc
│       └── courses.py  #在spiders目录下有了一个爬虫类文件courses.py
└── scrapy.cfg


# courses.py的文件代码如下:

# -*- coding: utf-8 -*-
import scrapy

class CoursesSpider(scrapy.Spider):
    name = 'courses'
    allowed_domains = ['edu.csdn.net']
    start_urls = ['http://edu.csdn.net/']

    def parse(self, response):
        pass

③ 创建Item

  • Item是保存爬取数据的容器,它的使用方法和字典类型,但相比字典多了些保护机制。

  • 创建Item需要继承scrapy.Item类,并且定义类型为scrapy.Field的字段:(课程标题、课程地址、图片、授课老师,视频时长、价格)

  • 具体代码如下:(修改类名为CoursesItem)

import scrapy

class CoursesItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    title = scrapy.Field()
    url = scrapy.Field()
    pic = scrapy.Field()
    teacher = scrapy.Field()
    time = scrapy.Field()
    price = scrapy.Field()
    #pass

④ 解析Response

  • 在fang.py文件中,parse()方法的参数response是start_urls里面的链接爬取后的结果。

  • 提取的方式可以是CSS选择器、XPath选择器或者是re正则表达式。

# -*- coding: utf-8 -*-
import scrapy
from educsdn.items import CoursesItem

class CoursesSpider(scrapy.Spider):
    name = 'courses'
    allowed_domains = ['edu.csdn.net']
    start_urls = ['https://edu.csdn.net/courses/o280/p1']
    p=1
    def parse(self, response):
        #解析并输出课程标题
        #print(response.selector.css("div.course_dl_list span.title::text").extract())
        #获取所有课程
        dlist = response.selector.css("div.course_dl_list")
        #遍历课程,并解析信息后封装到item容器中
        for dd in dlist:
            item = CoursesItem()
            item['title'] = dd.css("span.title::text").extract_first()
            item['url'] = dd.css("a::attr(href)").extract_first()
            item['pic'] = dd.css("img::attr(src)").extract_first()
            item['teacher'] = dd.re_first("<p>讲师:(.*?)</p>")
            item['time'] = dd.re_first("<em>([0-9]+)</em>课时")
            item['price'] = dd.re_first("¥([0-9\.]+)")
            #print(item)
            #print("="*70)
            yield item

        #获取前10页的课程信息 
        self.p += 1
        if self.p <= 10:
            next_url = 'https://edu.csdn.net/courses/o280/p'+str(self.p)
            url = response.urljoin(next_url) #构建绝对url地址(这里可省略)
            yield scrapy.Request(url=url,callback=self.parse)

⑤、创建数据库和表:

  • 在mysql中创建数据库csdndb和数据表courses
CREATE TABLE `courses` (                          
   `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  
   `title` varchar(255) DEFAULT NULL,              
   `url` varchar(255) DEFAULT NULL,                
   `pic` varchar(255) DEFAULT NULL,                
   `teacher` varchar(32) DEFAULT NULL,             
   `time` varchar(16) DEFAULT NULL,                
   `price` varchar(16) DEFAULT NULL,               
   PRIMARY KEY (`id`)                              
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

⑥、使用Item Pipeline

  • Item Pipeline为项目管道,当Item生产后,他会自动被送到Item Pipeline进行处理:
  • 我们常用Item Pipeline来做如下操作:
    • 清理HTML数据
    • 验证抓取数据,检查抓取字段
    • 查重并丢弃重复内容
    • 将爬取结果保存到数据库里。
import pymysql
from scrapy.exceptions import DropItem

class EducsdnPipeline(object):
    def process_item(self, item, spider):
        if item['price'] == None:
            raise DropItem("Drop item found: %s" % item)
        else:
            return item


class MysqlPipeline(object):
    def __init__(self,host,database,user,password,port):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.db=None
        self.cursor=None

    @classmethod
    def from_crawler(cls,crawler):
        return cls(
            host = crawler.settings.get("MYSQL_HOST"),
            database = crawler.settings.get("MYSQL_DATABASE"),
            user = crawler.settings.get("MYSQL_USER"),
            password = crawler.settings.get("MYSQL_PASS"),
            port = crawler.settings.get("MYSQL_PORT")
        )

    def open_spider(self,spider):
        self.db = pymysql.connect(self.host,self.user,self.password,self.database,charset='utf8',port=self.port)
        self.cursor = self.db.cursor()

    def process_item(self, item, spider):
        sql = "insert into courses(title,url,pic,teacher,time,price) values('%s','%s','%s','%s','%s','%s')"%(item['title'],item['url'],item['pic'],item['teacher'],str(item['time']),str(item['price']))
        #print(item)
        self.cursor.execute(sql)
        self.db.commit()
        return item

    def close_spider(self,spider):
        self.db.close()

⑦ 修改配置文件

  • 打开配置文件:settings.py 开启并配置ITEM_PIPELINES信息,配置数据库连接信息
ITEM_PIPELINES = {
    'educsdn.pipelines.EducsdnPipeline': 300,
    'educsdn.pipelines.MysqlPipeline': 301,
}
MYSQL_HOST = 'localhost'
MYSQL_DATABASE = 'csdndb'
MYSQL_USER = 'root'
MYSQL_PASS = ''
MYSQL_PORT = 3306

⑧、运行爬取:

  • 执行如下命令来启用数据爬取
    scrapy crawl courses
    

7.4 下载和处理文件和图像:

① ImagesPipeline介绍

  • Scrapy提供了专门处理下载的Pipeline,包含文件下载和图片下载,其原理与抓取页面的原理一样。

  • 下载过程支持异步和多线程,下载十分高效。

  • 网址:https://docs.scrapy.org/en/latest/topics/media-pipeline.html

  • 实现方式:定义一个ItemPipeline 类继承 scrapy.pipelines.images.ImagesPipeline

② 具体使用:

  • 首先定义存储文件的路径,在settings.py配置文件中添加代码:IMAGES_STORE = './images'

  • 并在项目目录下创建 images 文件夹 (与scrapy.cfg文件同级)。

  • 要在pipelines.py文件中定义一个‘ImagePipeline’类,其代码如下:


from scrapy import Request
from scrapy.exceptions import DropItem
from scrapy.pipelines.images import ImagesPipeline

class ImagePipeline(ImagesPipeline):
    '''自定义图片存储类'''

    def get_media_requests(self, item, info):
        '''通过抓取的item对象获取图片信息,并创建Request请求对象添加调度队列,等待调度执行下载'''
        yield Request(item['pic'])

    def file_path(self,request,response=None,info=None):
        '''返回图片下载后保存的名称,没有此方法Scrapy则自动给一个唯一值作为图片名称'''
        url = request.url
        file_name = url.split("/")[-1]
        return file_name

    def item_completed(self, results, item, info):
        ''' 下载完成后的处理方法,其中results内容结构如下说明'''
        image_paths = [x['path'] for ok, x in results if ok]
        if not image_paths:
            raise DropItem("Item contains no images")
        #item['image_paths'] = image_paths
        return item
  • 在item_completed()方法中,results参数内容结构如下:

      print(results)
      [(True, {'url': 'https://img-bss.csdn.net/201803191642534078.png', 
               'path': '201803191642534078.png', 
               'checksum': 'cc1368dbc122b6762f3e26ccef0dd105'}
      )]
    
  • 在settings.py文件中配置如下(启用下载):

...
ITEM_PIPELINES = {
    'educsdn.pipelines.EducsdnPipeline': 300,
    'educsdn.pipelines.ImagePipeline': 301,
    'educsdn.pipelines.MysqlPipeline': 302,
}
MYSQL_HOST = "localhost"
MYSQL_DATABASE = "csdndb"
MYSQL_USER = "root"
MYSQL_PASS = ""
MYSQL_PORT = 3306

IMAGES_STORE = "./images"
...