当前位置: 首页 > 工具软件 > Scraper > 使用案例 >

scrapy源码3:scraper的源码分析

何睿范
2023-12-01

我们看看scraper.py文件吧。
从注释中我们可以看出这个scraper模块是实现爬虫组件去解析响应流并且提取数据的。


import logging
from collections import deque
# 这2个就是日志的deque队列的导入。

from twisted.python.failure import Failure
from twisted.internet import defer
# 这2句,导入了一个Failure和一个defer延迟。

from scrapy.utils.defer import defer_result, defer_succeed, parallel, iter_errback
# 这个从自己的工具类defer里面导入了好几个方法,基本看是并行,错误,成功,结果的一些相关方法。 我们定位过去一个一个分析下。
def defer_result(result):
    if isinstance(result, defer.Deferred):
        return result
    elif isinstance(result, failure.Failure):
        return defer_fail(result)
    else:
        return defer_succeed(result)
    # 这里对result进行判定, 如果是Deferred对象,那就返回,如果是Failure结果 返回defer_fail(result) 这个defer_fail应该是对这个错误的result解析的吧。先放放
    # 其他的情况就直接使用defer_succeed去解析。既然这里提到了defer_fail,defer_succeed 2个方法, 那我们干脆就先看看这2个方法做了啥。
    def defer_fail(_failure):
    """Same as twisted.internet.defer.fail but delay calling errback until
    next reactor loop

    It delays by 100ms so reactor has a chance to go trough readers and writers
    before attending pending delayed calls, so do not set delay to zero.
    """
    d = defer.Deferred()
    reactor.callLater(0.1, d.errback, _failure)
    return d
    # 这个是google翻译了下twisted.internet.defer.fail相同,但延迟调用errback直到下一个反应器回路它延迟了100ms,所以反应堆有机会通过读写在等待延迟呼叫之前,所以不要将延迟设置为零
    # 我们从上面可以知道,他故意延迟0.1为了读写操作的。
    def defer_succeed(result):
    """Same as twisted.internet.defer.succeed but delay calling callback until
    next reactor loop

    It delays by 100ms so reactor has a chance to go trough readers and writers
    before attending pending delayed calls, so do not set delay to zero.
    """
    d = defer.Deferred()
    reactor.callLater(0.1, d.callback, result)
    return d
    # 这个方法不多解释了。 和上面的那个是一样的。 

def parallel(iterable, count, callable, *args, **named):
    """Execute a callable over the objects in the given iterable, in parallel,
    using no more than ``count`` concurrent calls.

    Taken from: http://jcalderone.livejournal.com/24285.html
    """
    coop = task.Cooperator()
    work = (callable(elem, *args, **named) for elem in iterable)
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])
    # 这个看的不太懂。 就是解决并发问题的。 去了http://jcalderone.livejournal.com/24285.html 也没有看懂。

def iter_errback(iterable, errback, *a, **kw):
    """Wraps an iterable calling an errback if an error is caught while
    iterating it.
    """
    it = iter(iterable)
    while True:
        try:
            yield next(it)
        except StopIteration:
            break
        except:
            errback(failure.Failure(), *a, **kw)
    # 迭代处理iterable这个对象,google翻译下吧: 如果在迭代时捕获到错误,则可以包装一个可调用的回调函数。这个方法一直迭代,如果错误就使用errback包装一个可回调的函数。 

from scrapy.utils.spider import iterate_spider_output

def iterate_spider_output(result):
    return arg_to_iter(result)
    # 这个方法定位到另一个位置了。 我们追踪过去看看
 def arg_to_iter(arg):
    """Convert an argument to an iterable. The argument can be a None, single
    value, or an iterable.

    Exception: if arg is a dict, [arg] will be returned
    """
    if arg is None:
        return []
    elif not isinstance(arg, _ITERABLE_SINGLE_VALUES) and hasattr(arg, '__iter__'):
        return arg
    else:
        return [arg]   
    # 将对象转化为可迭代的对象。
    # 如果是none返回[], 如果是不可迭代就返回[arg] 本身迭代的就返回自身。
    _ITERABLE_SINGLE_VALUES = dict, BaseItem, six.text_type, bytes
    # 这里是不是需要还需要判定下arg是不是有__next__呢。 
from scrapy.utils.misc import load_object 这个01,02的文档都提到了就是将str转对象的。

from scrapy.utils.log import logformatter_adapter, failure_to_exc_info

# 这个是基础日志的。我们也看看吧。 
def logformatter_adapter(logkws):
    """
    Helper that takes the dictionary output from the methods in LogFormatter
    and adapts it into a tuple of positional arguments for logger.log calls,
    handling backward compatibility as well.
    """
    if not {'level', 'msg', 'args'} <= set(logkws):
        warnings.warn('Missing keys in LogFormatter method',
                      ScrapyDeprecationWarning)

    if 'format' in logkws:
        warnings.warn('`format` key in LogFormatter methods has been '
                      'deprecated, use `msg` instead',
                      ScrapyDeprecationWarning)

    level = logkws.get('level', logging.INFO)
    message = logkws.get('format', logkws.get('msg'))
    # NOTE: This also handles 'args' being an empty dict, that case doesn't
    # play well in logger.log calls
    args = logkws if not logkws.get('args') else logkws['args']

    return (level, message, args)
# 这个方法是个适配器,将logkws字典对象最终返回一个元组,日志级别,信息,和详细参数。


def failure_to_exc_info(failure):
    """Extract exc_info from Failure instances"""
    if isinstance(failure, Failure):
        return (failure.type, failure.value, failure.getTracebackObject())
# 方法比较简单, 用于提取错误实例的信息。


from scrapy.exceptions import CloseSpider, DropItem, IgnoreRequest
# 这里从异常类导入几个异常吧。 我们也定位过去吧。 
class DropItem(Exception):
    """Drop item from the item pipeline"""
    pass
 class CloseSpider(Exception):
    """Raise this from callbacks to request the spider to be closed"""

    def __init__(self, reason='cancelled'):
        super(CloseSpider, self).__init__()
        self.reason = reason
class IgnoreRequest(Exception):
    """Indicates a decision was made not to process a request"""
# 这三个基本都是异常类而已,继承了exception,无实质代码。
# dropitem: item被丢弃的异常。 
# closespider:关闭爬虫的异常。
# IgnoreRequest:忽略请求的异常。

from scrapy import signals # 这里就是导入那几个信号了。 

from scrapy.http import Request, Response # 这个导入了请求和响应类。
from scrapy.core.spidermw import SpiderMiddlewareManager # 导入了爬虫中间件管理
from scrapy.utils.request import referer_str # 这个不清楚。 定位过去看下吧。 
def referer_str(request):
    """ Return Referer HTTP header suitable for logging. """
    referrer = request.headers.get('Referer')
    if referrer is None:
        return referrer
    return to_native_str(referrer, errors='replace')
    # 从请求头信息里面获取referer信息。

logger = logging.getLogger(__name__) # 全局的日志对象

# 上面的包分析完毕了。 我们看看下面的类吧,有2个一个是slot,一个是scraper。一个一个看吧。 
slot的方法看下
    def __init__(self, max_active_size=5000000):
        self.max_active_size = max_active_size
        self.queue = deque()
        self.active = set()
        self.active_size = 0
        self.itemproc_size = 0
        self.closing = None
    # 这里设置了,最大活动大小,默认值为5000000, 这个值为何不放到默认配置文件里面呢。疑惑下。
    # 构造一个deques,使用集合去存储活动的, 活动的大小开始为0,itemproc_size item 处理的大小,关闭状态为none.
    def add_response_request(self, response, request):
        deferred = defer.Deferred()
        self.queue.append((response, request, deferred))
        if isinstance(response, Response):
            self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE)
        else:
            self.active_size += self.MIN_RESPONSE_SIZE
        return deferred
        # 这个方法从名字上看 ,应该是添加响应请求吧,
        # 创建一个defer对象,队列里面添加一个(response,request,deferred)元祖,如果response shi REsposne的示例的话
        # 活动的大小就是原来活动的大小+ body的长度h或者最小响应的大小。
        # 否则,就设置为最小的响应大小。放回那个deferred.
     
    def next_response_request_deferred(self):
        response, request, deferred = self.queue.popleft()
        self.active.add(request)
        return response, request, deferred
    # 从队列中popleft一个元组,活动请求添加request,返回一个元组,response,request,deferred

    def finish_response(self, response, request):
    self.active.remove(request)
    if isinstance(response, Response):
        self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE)
    else:
        self.active_size -= self.MIN_RESPONSE_SIZE
    # 完成响应的话, 就从active活动列表中移除这个请求,active_size 减去对应大小。
    def is_idle(self):
        return not (self.queue or self.active)   
        # 是否空闲的判断, 如果队列不为空, 或者active不为空。
    def needs_backout(self):
        return self.active_size > self.max_active_size    
    # 判断是否超限了。 


# 下面看看这个scraper类吧。 
    def __init__(self, crawler):
        self.slot = None
        self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
        itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
        self.itemproc = itemproc_cls.from_crawler(crawler)
        self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
        self.crawler = crawler
        self.signals = crawler.signals
        self.logformatter = crawler.logformatter
# 这个是初始化了,爬虫中间件从crawler获取,item处理类从crawler.settings获取。然后获取一个item处理类的对象。
# 并发item数量,信号和日志formatter设置都是从crawler获取。

  @defer.inlineCallbacks
    def open_spider(self, spider):
        """Open the given spider for scraping and allocate resources for it"""
        self.slot = Slot()
        yield self.itemproc.open_spider(spider)
    # 这个方法就是打开给定的爬虫,并分配指定的资源,
    # 创建一个slot,然后调用对应的itemproessor类创建的处理类去打开爬虫。
def close_spider(self, spider):
    """Close a spider being scraped and release its resources"""
    slot = self.slot
    slot.closing = defer.Deferred()
    slot.closing.addCallback(self.itemproc.close_spider)
    self._check_if_closing(spider, slot)
    return slot.closing  
    # 关闭爬虫并且释放资源。
    # 获取slot, 然后给slot添加一个closing的事件,然后放回方法。
def is_idle(self):
    """Return True if there isn't any more spiders to process"""
    return not self.slot 
    # 如果没有爬虫去处理了。 就返回true了。 

def _check_if_closing(self, spider, slot):
    if slot.closing and slot.is_idle():
        slot.closing.callback(spider)        
    # 如果closing不为空,不为空闲,  就调用指定spider的关闭回调。

def enqueue_scrape(self, response, request, spider):
    slot = self.slot
    dfd = slot.add_response_request(response, request)
    def finish_scraping(_):
        slot.finish_response(response, request)
        self._check_if_closing(spider, slot)
        self._scrape_next(spider, slot)
        return _
    dfd.addBoth(finish_scraping)
    dfd.addErrback(
        lambda f: logger.error('Scraper bug processing %(request)s',
                                {'request': request},
                                exc_info=failure_to_exc_info(f),
                                extra={'spider': spider}))
    self._scrape_next(spider, slot)
    return dfd
    # 调用add_response_request添加返回一个defferd对象,定义一个完成的方法,给成功和失败都添加一个finish_scraping的回调。
    # 给错误的在添加一个匿名的回调方法。
    # 调用_scrape_next 处理下一个。 
def _scrape_next(self, spider, slot):
    while slot.queue:
        response, request, deferred = slot.next_response_request_deferred()
        self._scrape(response, request, spider).chainDeferred(deferred)   
    # 这里如果slot的queue有内容的haunted, 就一直循环下去, 调用_scrape去处理。

def _scrape(self, response, request, spider):
    """Handle the downloaded response or failure through the spider
    callback/errback"""
    assert isinstance(response, (Response, Failure))

    dfd = self._scrape2(response, request, spider) # returns spiders processed output
    dfd.addErrback(self.handle_spider_error, request, response, spider)
    dfd.addCallback(self.handle_spider_output, request, response, spider)
    return dfd    
    # 这个方法就是处理下载响应或者失败,通过给爬虫指定的成功和错误的回调方法。
    # 先断言这个是响应流或者failure,调用_scrape2获取爬虫处理的输出
    # 添加错误回调和成功回调。
def _scrape2(self, request_result, request, spider):
    """Handle the different cases of request's result been a Response or a
    Failure"""
    if not isinstance(request_result, Failure):
        return self.spidermw.scrape_response(
            self.call_spider, request_result, request, spider)
    else:
        # FIXME: don't ignore errors in spider middleware
        dfd = self.call_spider(request_result, request, spider)
        return dfd.addErrback(
            self._log_download_errors, request_result, request, spider)
    # 如果响应是成功的的。调用自己的爬虫中间件去处理响应。如果是错误的,调用call_spider方法,给dfd添加一个错误的回调。 

def call_spider(self, result, request, spider):
    result.request = request
    dfd = defer_result(result)
    dfd.addCallbacks(request.callback or spider.parse, request.errback)
    return dfd.addCallback(iterate_spider_output)
    defer_result 这个方法我们上面已经看了。 主要是等100ms读写的。 添加成功的回调。
    # 这个地方注意了。 先使用request。callback , 如果没有指定的话,默认采用spider.parse方法。
    # 这就是我们的爬虫为何使用parse方法解析response的原因了。 
    # 添加一个成功回调。 iterate_spider_output 这个上面已经看过了, 就是返回一个可迭代的对象。 
def handle_spider_error(self, _failure, request, response, spider):
    exc = _failure.value
    if isinstance(exc, CloseSpider):
        self.crawler.engine.close_spider(spider, exc.reason or 'cancelled')
        return
    logger.error(
        "Spider error processing %(request)s (referer: %(referer)s)",
        {'request': request, 'referer': referer_str(request)},
        exc_info=failure_to_exc_info(_failure),
        extra={'spider': spider}
    )
    self.signals.send_catch_log(
        signal=signals.spider_error,
        failure=_failure, response=response,
        spider=spider
    )
    self.crawler.stats.inc_value(
        "spider_exceptions/%s" % _failure.value.__class__.__name__,
        spider=spider
    )
    # 这个方法就是处理爬虫错误的 , 如果是关闭爬虫异常, 调用对应引擎去关闭爬虫,返回
    # 其他情况,就记录下日志信息。 发送对应的爬虫错误信号, 统计信息的添加。
def handle_spider_output(self, result, request, response, spider):
    if not result:
        return defer_succeed(None)
    it = iter_errback(result, self.handle_spider_error, request, response, spider)
    dfd = parallel(it, self.concurrent_items,
        self._process_spidermw_output, request, response, spider)
    return dfd   
    # 处理爬虫的输出,如果结果不为空 调用defer_succeed,错误的话调用错误回调,平行处理, _process_spidermw_output去处理

def _process_spidermw_output(self, output, request, response, spider):
    """Process each Request/Item (given in the output parameter) returned
    from the given spider
    """
    if isinstance(output, Request):
        self.crawler.engine.crawl(request=output, spider=spider)
    elif isinstance(output, (BaseItem, dict)):
        self.slot.itemproc_size += 1
        dfd = self.itemproc.process_item(output, spider)
        dfd.addBoth(self._itemproc_finished, output, response, spider)
        return dfd
    elif output is None:
        pass
    else:
        typename = type(output).__name__
        logger.error('Spider must return Request, BaseItem, dict or None, '
                        'got %(typename)r in %(request)s',
                        {'request': request, 'typename': typename},
                        extra={'spider': spider})
    # 处理每个请求或者从给定的爬虫得到的item
    # 如果output是个请求的话, 调用engine.crawl抓取。 
    # 如果是baseitem或者dict的话, 处理个数加1,调用item处理类的process_item去处理item。
    # 添加处理完毕事件。 
    # 其他请求输出日志。 报告你的返回类型不是给定的item类或者字典类。 或者请求。 
    # 这里就是限定了。 我们爬虫里面的parse方法只能返回这3类的原因了。 

def _log_download_errors(self, spider_failure, download_failure, request, spider):
    """Log and silence errors that come from the engine (typically download
    errors that got propagated thru here)
    """
    if (isinstance(download_failure, Failure) and
            not download_failure.check(IgnoreRequest)):
        if download_failure.frames:
            logger.error('Error downloading %(request)s',
                            {'request': request},
                            exc_info=failure_to_exc_info(download_failure),
                            extra={'spider': spider})
        else:
            errmsg = download_failure.getErrorMessage()
            if errmsg:
                logger.error('Error downloading %(request)s: %(errmsg)s',
                                {'request': request, 'errmsg': errmsg},
                                extra={'spider': spider})

    if spider_failure is not download_failure:
        return spider_failure    
    # 这里定义个方法下载错误的, 如果是错误 并且不是ignorerequest的话进入if块。
    # 如果错误frames不为空,记录错误信息。
    # 否则调用geterrmessage方法,记录错误。
    # 如果错误不是下载错误,返回爬虫的错误。

def _itemproc_finished(self, output, item, response, spider):
    """ItemProcessor finished for the given ``item`` and returned ``output``
    """
    self.slot.itemproc_size -= 1
    if isinstance(output, Failure):
        ex = output.value
        if isinstance(ex, DropItem):
            logkws = self.logformatter.dropped(item, ex, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            return self.signals.send_catch_log_deferred(
                signal=signals.item_dropped, item=item, response=response,
                spider=spider, exception=output.value)
        else:
            logger.error('Error processing %(item)s', {'item': item},
                            exc_info=failure_to_exc_info(output),
                            extra={'spider': spider})
    else:
        logkws = self.logformatter.scraped(output, response, spider)
        logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
        return self.signals.send_catch_log_deferred(
            signal=signals.item_scraped, item=output, response=response,
            spider=spider)
    # itemprocess处理类结束,如果输出㐊错误。 判定他是不是dropitem。  分别记录日志。  
    # 正常情况下,记录日志 。通过日志适配器将logkws 转出logger.log方法接受的参数。 
    # 发送itemscraped信号。 
    
 类似资料: