Scrapy-Redis源码解析及应用

秦景同
2023-12-01
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_1290259791/article/details/82393363

Scrapy-Redis的使用

GitHub地址核心源码的目录在scrapy-redis/src/scrapy_redis/

安装

pip install scrapy-redis

源码分析

picklecompat.py文件

try:
    import cPickle as pickle  # PY2
except ImportError:
    import pickle

def loads(s):
return pickle.loads(s)

def dumps(obj):
return pickle.dumps(obj, protocol=-1)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

使用了pickle库。
**dumps方法:**实现了序列化
**loads方法:**实现了反序列化

queue.py文件

Base类

功能:爬取队列的实现,有三个队列实现,首先实现了一个Base类,提供基础方法和属性。

数据库无法存储Requets对象,所以先将Request序列化为字符串。
**_encode_requests:**将Request对象转化为存储对象
**_decode_requests:**将Request反序列化转换为对象
push、pop、__len__需要子类来重写方法,所以直接抛出异常

FifoQueue类

class FifoQueue(Base):
    """Per-spider FIFO queue"""
def __len__(self):
    """Return the length of the queue"""
    return self.server.llen(self.key)

def push(self, request):
    """Push a request"""
    self.server.lpush(self.key, self._encode_request(request))

def pop(self, timeout=0):
    """Pop a request"""
    if timeout > 0:
        data = self.server.brpop(self.key, timeout)
        if isinstance(data, tuple):
            data = data[1]
    else:
        data = self.server.rpop(self.key)
    if data:
        return self._decode_request(data)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

重写了push、pop、__len__方法都是对Redis中的**列表(List)**操作,其中self.server就是Redis连接对象。

  1. __len__方法:获取列表的长度。
  2. push方法:将Requests对象序列化后存储到列表中。
  3. pop方法:调用的rpop()方法,从列表右侧取出数据,然后反序列化为Request对象。

Request在列表中存取的顺序是左侧进、右侧出,有序的进出,先进先出(FIFO)。

LifoQueue类

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)
    if data:
        return self._decode_request(data)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

LifoQueue和FifoQueue的区别是在pop()方法使用的是lpop,也就是左侧出去。效果就是先进后出,后进先出(LIFO),类似于栈的操作也称作为StackQueue。

PriorityQueue类

class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""
def __len__(self):
    """Return the length of the queue"""
    return self.server.zcard(self.key)

def push(self, request):
    """Push a request"""
    data = self._encode_request(request)
    score = -request.priority
    self.server.execute_command('ZADD', self.key, score, data)

def pop(self, timeout=0):
    """
    Pop a request
    timeout not support in this queue class
    """
    # use atomic range/remove using multi/exec
    pipe = self.server.pipeline()
    pipe.multi()
    pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
    results, count = pipe.execute()
    if results:
        return self._decode_request(results[0])

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这里使用Redis中的有序集合(zset),集合中的每个元素都可以设置一个分数,分数就代表优先级。

  1. __len__方法:调用zcard()操作返回有序集合的大小,也就是队列的长度。
  2. push方法:调用了zadd()操作向集合中添加元素,这里的分数设置为Request优先级的相反数,分数低的会排在集合前面,优先级高的Request就会在集合最前面。
  3. pop方法:调用了zrange()取出集合中的第一个元素,第一个元素就是优先级最高的Rquest,然后调用zremrangebyrank将这个元素删除。

改队列是默认使用的队列,默认使用有序集合来存储。

dupefilter.py文件

RFPDupeFilter类继承来自Scrapy中的BaseDupeFilter类。

Scrapy去重采用的是集合实现的,Scrapy分布式中去重就要利用共享集合,采用Redis的集合数据结构。

request_seen()方法和Scrapy中的request_seen()方法相似。这里的集合操作的是server对象的sadd()方法操作。Scrapy中的是数据结构,这里换成了数据库的存储方式。

鉴别重复的方式还是使用指纹,指纹依靠request_fingerprint()方法来获取。获取指纹后直接向集合中添加指纹,添加成功返回1,判定结果返回False就是不重复。
这里完成利用Redis的集合完成指纹的记录和重复的验证。

scheduler.py文件

这里实现配合Queue、Dupefilter使用的调度器Scheduler,可以指定一些配置在Scrapy中的setting.py文件中设置。

**SCHEDULER_FLUSH_ON_START:**是否在爬取开始的时候清空爬取队列。
**SCHEDULER_PERSIST:**是否在爬取结束后保持爬取队列不清楚。

其中实现两个核心存取方法

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True
def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

enqueue_request方法:向队列中添加Request,核心操作就是调用queue的push操作,还有一些统计和日志操作。
next_request方法:从队列中取出Request,核心操作就是调用queue的pop操作,队列中如果存在Request则取出,如果
队列为空爬取就会从新开始。

总结

  1. 提供了三种队列,使用Redis的列表或有序集合来维护。
  2. 使用Redis集合来保存Request的指纹,提供重复过滤。
  3. 中断后Redis的队列没有清空,爬取再次启动后,调度器的next_request()会从队列中取到下一个Request,继续爬取。

Scrapy-Redis的配置

使用Scrapy-Redis只需要修改Scrapy项目下的setting.py配置文件就可以。

核心配置

将调度器的类和去重的类替换为Scrapy-Redis提供的类,在setting.py添加如下配置。

SCHEDULER = "scrapy_redis.scheduler.Scheduler"      # 调度器
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"      # 去重

 
 
  • 1
  • 2

Redis连接配置

第一种方式

redis://[:password]@host:port/db
中括号内可有可无,host为IP地址,port为端口号,db是数据库代号默认0。
我的Redis数据库没设置密码在setting.py中配置如下。

REDIS_URL='redis://@127.0.0.1:6379'

 
 
  • 1

第二种方式

单独配置,根据Redis连接信息,在setting.py中配置如下。

REDIS_HOST='127.0.0.1'
REDIS_PORT=6379
REDIS_PASSWORD=''

 
 
  • 1
  • 2
  • 3

当两种方式都配置了,优先使用第一种方式。

调度队列

配置可选,默认使用PriorityQueue。

SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.PriorityQueue
SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.FifoQueue
SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.LifoQueue

 
 
  • 1
  • 2
  • 3

配置持久化

配置可选,默认是False。Scrapy-redis默认在爬取完成后清空爬取队列和去重指纹集合。
SCHEDULER_PERSIST=True该设置就会在爬取完成后不清空。
**注意:**强制终端爬虫运行,爬取队列和去重指纹集合不会自动清空。

配置重爬

分布式爬虫不用开启。
配置可选,默认是Flask。如果配置了持久化或强制中断爬虫,那么爬虫队列和指纹集合不会被清空,会继续上次的爬取。

SCHEDULER_FLUSH_ON_START=True

 
 
  • 1

设置为True后爬虫每次启动时,爬取队列和指纹集合都会清空。做分布式爬虫不需要配置,因为每一个爬虫任务在启动的时候都会清空依次。

Pipline配置

配置可选,默认不启动。
实现了存储到Redis的Item Pipline,启用了后爬虫把生成的Item存储到Redis数据库中。
数据量大的时候不要开启,因为Redis基于内存,用来做存储太浪费了。

ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300
}

 
 
  • 1
  • 2
  • 3
        </div>
					<link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-448c2d19d9.css" rel="stylesheet">
            </div>
 类似资料: