编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Scrapy源码剖析(五)Scrapy如何完成抓取任务(下)

wxchong 2024-08-19 02:14:23 开源技术 8 ℃ 0 评论

微信搜索关注「水滴与银弹」公众号,第一时间获取优质技术干货。7年资深后端研发,用简单的方式把技术讲清楚。

接上篇,继续剖析 Scrapy 核心的抓取流程。

下载请求

请求第一次进来后,肯定是不重复的,那么则会正常进入调度器队列。之后下一次调度,再次调用 _next_request_from_scheduler 方法,此时调用调度器的 next_request 方法,就是从调度器队列中取出一个请求,这次就要开始进行网络下载了,也就是调用 _download:

def _download(self, request, spider):
    # 下载请求
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        # 成功回调 结果必须是Request或Response
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            # 如果下载后结果为Response 返回Response
            response.request = request
            logkws = self.logformatter.crawled(request, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            self.signals.send_catch_log(signal=signals.response_received, \
                response=response, request=request, spider=spider)
        return response

    def _on_complete(_):
        # 此次下载完成后 继续进行下一次调度
        slot.nextcall.schedule()
        return _

    # 调用Downloader进行下载
    dwld = self.downloader.fetch(request, spider)
    # 注册成功回调
    dwld.addCallbacks(_on_success)
    # 结束回调
    dwld.addBoth(_on_complete)
    return dwld

在进行网络下载时,调用了 Downloader 的 fetch:

def fetch(self, request, spider):
    def _deactivate(response):
        # 下载结束后删除此记录
        self.active.remove(request)
        return response
    # 下载前记录处理中的请求
    self.active.add(request)
    # 调用下载器中间件download 并注册下载成功的回调方法是self._enqueue_request
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    # 注册结束回调
    return dfd.addBoth(_deactivate)

这里调用下载器中间件的 download,并注册下载成功的回调方法是 _enqueue_request,来看下载方法:

def download(self, download_func, request, spider):
    @defer.inlineCallbacks
    def process_request(request):
        # 如果下载器中间件有定义process_request 则依次执行
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_request must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            # 如果下载器中间件有返回值 直接返回此结果
            if response:
                defer.returnValue(response)
        # 如果下载器中间件没有返回值,则执行注册进来的方法 也就是Downloader的_enqueue_request
        defer.returnValue((yield download_func(request=request,spider=spider)))

    @defer.inlineCallbacks
    def process_response(response):
        assert response is not None, 'Received None in process_response'
        if isinstance(response, Request):
            defer.returnValue(response)

        # 如果下载器中间件有定义process_response 则依次执行
        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert isinstance(response, (Response, Request)), \
                'Middleware %s.process_response must return Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)

    @defer.inlineCallbacks
    def process_exception(_failure):
        exception = _failure.value
        # 如果下载器中间件有定义process_exception 则依次执行
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)

    # 注册执行、错误、回调方法
    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred

在下载过程中,首先找到所有定义好的下载器中间件,包括内置定义好的,也可以自己扩展下载器中间件,下载前先依次执行 process_request,可对 Request 进行加工、处理、校验等操作,然后发起真正的网络下载,也就是第一个参数 download_func,在这里是 Downloader 的 _enqueue_request 方法:

下载成功后回调 Downloader的 _enqueue_request:

def _enqueue_request(self, request, spider):
    # 加入下载请求队列
    key, slot = self._get_slot(request, spider)
    request.meta['download_slot'] = key

    def _deactivate(response):
        slot.active.remove(request)
        return response

    slot.active.add(request)
    deferred = defer.Deferred().addBoth(_deactivate)
    # 下载队列
    slot.queue.append((request, deferred))
    # 处理下载队列
    self._process_queue(spider, slot)
    return deferred
    
def _process_queue(self, spider, slot):
    if slot.latercall and slot.latercall.active():
        return

    # 如果延迟下载参数有配置 则延迟处理队列
    now = time()
    delay = slot.download_delay()
    if delay:
        penalty = delay - now + slot.lastseen
        if penalty > 0:
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return

    # 处理下载队列
    while slot.queue and slot.free_transfer_slots() > 0:
        slot.lastseen = now
        # 从下载队列中取出下载请求
        request, deferred = slot.queue.popleft()
        # 开始下载
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # 延迟
        if delay:
            self._process_queue(spider, slot)
            break
            
def _download(self, slot, request, spider):
    # 注册方法 调用handlers的download_request
    dfd = mustbe_deferred(self.handlers.download_request, request, spider)

    # 注册下载完成回调方法
    def _downloaded(response):
        self.signals.send_catch_log(signal=signals.response_downloaded,
                                    response=response,
                                    request=request,
                                    spider=spider)
        return response
    dfd.addCallback(_downloaded)

    slot.transferring.add(request)

    def finish_transferring(_):
        slot.transferring.remove(request)
        # 下载完成后调用_process_queue
        self._process_queue(spider, slot)
        return _

    return dfd.addBoth(finish_transferring)

这里也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求是调用了 self.handlers.download_request:

def download_request(self, request, spider):
    # 获取请求的scheme
    scheme = urlparse_cached(request).scheme
    # 根据scheeme获取下载处理器
    handler = self._get_handler(scheme)
    if not handler:
        raise NotSupported("Unsupported URL scheme '%s': %s" %
                           (scheme, self._notconfigured[scheme]))
    # 开始下载 并返回结果
    return handler.download_request(request, spider)
    
def _get_handler(self, scheme):
    # 根据scheme获取对应的下载处理器
    # 配置文件中定义好了http、https、ftp等资源的下载处理器
    if scheme in self._handlers:
        return self._handlers[scheme]
    if scheme in self._notconfigured:
        return None
    if scheme not in self._schemes:
        self._notconfigured[scheme] = 'no handler available for that scheme'
        return None

    path = self._schemes[scheme]
    try:
        # 实例化下载处理器
        dhcls = load_object(path)
        dh = dhcls(self._crawler.settings)
    except NotConfigured as ex:
        self._notconfigured[scheme] = str(ex)
        return None
    except Exception as ex:
        logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"',
                     {"clspath": path, "scheme": scheme},
                     exc_info=True,  extra={'crawler': self._crawler})
        self._notconfigured[scheme] = str(ex)
        return None
    else:
        self._handlers[scheme] = dh
    return self._handlers[scheme]

下载前,先通过解析 request 的 scheme 来获取对应的下载处理器,默认配置文件中定义的下载处理器如下:

DOWNLOAD_HANDLERS_BASE = {
    'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler',
    'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler',
    'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}

然后调用 download_request 方法,完成网络下载,这里不再详细讲解每个处理器的实现,简单来说,你可以把它想象成封装好的网络下载库,输入URL,它会给你输出下载结果,这样方便理解。

在下载过程中,如果发生异常情况,则会依次调用下载器中间件的 process_exception 方法,每个中间件只需定义自己的异常处理逻辑即可。

如果下载成功,则会依次执行下载器中间件的 process_response 方法,每个中间件可以进一步处理下载后的结果,最终返回。

这里值得提一下,process_request 方法是每个中间件顺序执行的,而 process_response 和 process_exception 方法是每个中间件倒序执行的,具体可看一下 DownaloderMiddlewareManager 的 _add_middleware 方法,就可以明白是如何注册这个方法链的。

拿到最终的下载结果后,再回到 ExecuteEngine 的 _next_request_from_scheduler 中,会看到调用了 _handle_downloader_output,也就是处理下载结果的逻辑:

def _handle_downloader_output(self, response, request, spider):
    # 下载结果必须是Request、Response、Failure其一
    assert isinstance(response, (Request, Response, Failure)), response
    # 如果是Request 则再次调用crawl 执行Scheduler的入队逻辑
    if isinstance(response, Request):
        self.crawl(response, spider)
        return
    # 如果是Response或Failure 则调用scraper的enqueue_scrape进一步处理
    # 主要是和Spiders和Pipeline交互
    d = self.scraper.enqueue_scrape(response, request, spider)
    d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                        exc_info=failure_to_exc_info(f),
                                        extra={'spider': spider}))
    return d

拿到下载结果后,主要分 2 个逻辑:

  • 如果返回的是 Request 实例,则直接再次放入 Scheduler 请求队列
  • 如果返回的是是 Response 或 Failure 实例,则调用 Scraper 的 enqueue_scrape 方法,做进一步处理

处理下载结果

请求入队逻辑不用再说,前面已经讲过。现在主要看 Scraper 的 enqueue_scrape,看Scraper 组件是如何处理后续逻辑的:

def enqueue_scrape(self, response, request, spider):
    # 加入Scrape处理队列
    slot = self.slot
    dfd = slot.add_response_request(response, request)
    def finish_scraping(_):
        slot.finish_response(response, request)
        self._check_if_closing(spider, slot)
        self._scrape_next(spider, slot)
        return _
    dfd.addBoth(finish_scraping)
    dfd.addErrback(
        lambda f: logger.error('Scraper bug processing %(request)s',
                               {'request': request},
                               exc_info=failure_to_exc_info(f),
                               extra={'spider': spider}))
    self._scrape_next(spider, slot)
    return dfd

def _scrape_next(self, spider, slot):
    while slot.queue:
        # 从Scraper队列中获取一个待处理的任务
        response, request, deferred = slot.next_response_request_deferred()
        self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, response, request, spider):
    assert isinstance(response, (Response, Failure))
    # 调用_scrape2继续处理
    dfd = self._scrape2(response, request, spider)
    # 注册异常回调
    dfd.addErrback(self.handle_spider_error, request, response, spider)
    # 出口回调
    dfd.addCallback(self.handle_spider_output, request, response, spider)
    return dfd

def _scrape2(self, request_result, request, spider):
    # 如果结果不是Failure实例 则调用爬虫中间件管理器的scrape_response
    if not isinstance(request_result, Failure):
        return self.spidermw.scrape_response(
            self.call_spider, request_result, request, spider)
    else:
        # 直接调用call_spider
        dfd = self.call_spider(request_result, request, spider)
        return dfd.addErrback(
            self._log_download_errors, request_result, request, spider)

首先把请求和响应加入到 Scraper 的处理队列中,然后从队列中获取到任务,如果不是异常结果,则调用爬虫中间件管理器的 scrape_response 方法:

def scrape_response(self, scrape_func, response, request, spider):
    fname = lambda f:'%s.%s' % (
            six.get_method_self(f).__class__.__name__,
            six.get_method_function(f).__name__)

    def process_spider_input(response):
        # 执行一系列爬虫中间件的process_spider_input
        for method in self.methods['process_spider_input']:
            try:
                result = method(response=response, spider=spider)
                assert result is None, \
                        'Middleware %s must returns None or ' \
                        'raise an exception, got %s ' \
                        % (fname(method), type(result))
            except:
                return scrape_func(Failure(), request, spider)
        # 执行完中间件的一系列process_spider_input方法后 执行call_spider
        return scrape_func(response, request, spider)

    def process_spider_exception(_failure):
        # 执行一系列爬虫中间件的process_spider_exception
        exception = _failure.value
        for method in self.methods['process_spider_exception']:
            result = method(response=response, exception=exception, spider=spider)
            assert result is None or _isiterable(result), \
                'Middleware %s must returns None, or an iterable object, got %s ' % \
                (fname(method), type(result))
            if result is not None:
                return result
        return _failure

    def process_spider_output(result):
        # 执行一系列爬虫中间件的process_spider_output
        for method in self.methods['process_spider_output']:
            result = method(response=response, result=result, spider=spider)
            assert _isiterable(result), \
                'Middleware %s must returns an iterable object, got %s ' % \
                (fname(method), type(result))
        return result

    # 执行process_spider_input
    dfd = mustbe_deferred(process_spider_input, response)
    # 注册异常回调
    dfd.addErrback(process_spider_exception)
    # 注册出口回调
    dfd.addCallback(process_spider_output)
    return dfd

有没有感觉套路很熟悉?与上面下载器中间件调用方式非常相似,也调用一系列的前置方法,再执行真正的处理逻辑,最后执行一系列的后置方法。

回调爬虫

接下来看一下,Scrapy 是如何执行我们写好的爬虫逻辑的,也就是 call_spider 方法,这里回调我们写好的爬虫类:

def call_spider(self, result, request, spider):
    # 回调爬虫模块
    result.request = request
    dfd = defer_result(result)
    # 注册回调方法 取得request.callback 如果未定义则调用爬虫模块的parse方法
    dfd.addCallbacks(request.callback or spider.parse, request.errback)
    return dfd.addCallback(iterate_spider_output)

看到这里,你应该更熟悉,平时我们写的最多的爬虫代码,parse 则是第一个回调方法。之后爬虫类拿到下载结果,就可以定义下载后的 callback 方法,也是在这里进行回调执行的。

处理输出

在与爬虫类交互完成之后,Scraper 调用了 handle_spider_output 方法处理爬虫的输出结果:

def handle_spider_output(self, result, request, response, spider):
    # 处理爬虫输出结果
    if not result:
        return defer_succeed(None)
    it = iter_errback(result, self.handle_spider_error, request, response, spider)
    # 注册_process_spidermw_output
    dfd = parallel(it, self.concurrent_items,
        self._process_spidermw_output, request, response, spider)
    return dfd

def _process_spidermw_output(self, output, request, response, spider):
    # 处理Spider模块返回的每一个Request/Item
    if isinstance(output, Request):
        # 如果结果是Request 再次入Scheduler的请求队列
        self.crawler.engine.crawl(request=output, spider=spider)
    elif isinstance(output, (BaseItem, dict)):
        # 如果结果是BaseItem/dict
        self.slot.itemproc_size += 1
        # 调用Pipeline的process_item
        dfd = self.itemproc.process_item(output, spider)
        dfd.addBoth(self._itemproc_finished, output, response, spider)
        return dfd
    elif output is None:
        pass
    else:
        typename = type(output).__name__
        logger.error('Spider must return Request, BaseItem, dict or None, '
                     'got %(typename)r in %(request)s',
                     {'request': request, 'typename': typename},
                     extra={'spider': spider})

执行完我们自定义的解析逻辑后,解析方法可返回新的 Request 或 BaseItem 实例。

如果是新的请求,则再次通过 Scheduler 进入请求队列,如果是 BaseItem 实例,则调用 Pipeline 管理器,依次执行 process_item。我们想输出结果时,只需要定义 Pepeline 类,然后重写这个方法就可以了。

ItemPipeManager 处理逻辑:

class ItemPipelineManager(MiddlewareManager):

    component_name = 'item pipeline'

    @classmethod
    def _get_mwlist_from_settings(cls, settings):
        return build_component_list(settings.getwithbase('ITEM_PIPELINES'))

    def _add_middleware(self, pipe):
        super(ItemPipelineManager, self)._add_middleware(pipe)
        if hasattr(pipe, 'process_item'):
            self.methods['process_item'].append(pipe.process_item)

    def process_item(self, item, spider):
        # 依次调用Pipeline的process_item
)

可以看到 ItemPipeManager 也是一个中间件,和之前下载器中间件管理器和爬虫中间件管理器类似,如果子类有定义 process_item,则依次执行它。

执行完之后,调用 _itemproc_finished:

def _itemproc_finished(self, output, item, response, spider):
    self.slot.itemproc_size -= 1
    if isinstance(output, Failure):
        ex = output.value
        # 如果在Pipeline处理中抛DropItem异常 忽略处理结果
        if isinstance(ex, DropItem):
            logkws = self.logformatter.dropped(item, ex, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            return self.signals.send_catch_log_deferred(
                signal=signals.item_dropped, item=item, response=response,
                spider=spider, exception=output.value)
        else:
            logger.error('Error processing %(item)s', {'item': item},
                         exc_info=failure_to_exc_info(output),
                         extra={'spider': spider})
    else:
        logkws = self.logformatter.scraped(output, response, spider)
        logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
        return self.signals.send_catch_log_deferred(
            signal=signals.item_scraped, item=output, response=response,
            spider=spider)

这里可以看到,如果想在 Pipeline 中丢弃某个结果,直接抛出 DropItem 异常即可,Scrapy 会进行对应的处理。

到这里,抓取结果会根据自定义的输出类,然后输出到指定位置,而新的 Request 则会再次进入请求队列,等待引擎下一次调度,也就是再次调用 ExecutionEngine 的 _next_request,直至请求队列没有新的任务,整个程序退出。

CrawlerSpider

以上,基本上整个核心抓取流程就讲完了。

这里再简单说一下 CrawlerSpider 类,我们平时用得也比较多,它其实就是继承了 Spider 类,然后重写了 parse 方法(这也是继承此类不要重写此方法的原因),并结合 Rule 规则类,来完成 Request 的自动提取逻辑。

Scrapy 提供了这个类方便我们更快速地编写爬虫代码,我们也可以基于此类进行再次封装,让我们的爬虫代码写得更简单。

由此我们也可看出,Scrapy 的每个模块的实现都非常纯粹,每个组件都通过配置文件定义连接起来,如果想要扩展或替换,只需定义并实现自己的处理逻辑即可,其他模块均不受任何影响,所以我们也可以看到,业界有非常多的 Scrapy 插件,都是通过此机制来实现的。

总结

这篇文章的代码量较多,也是 Scrapy 最为核心的抓取流程,如果你能把这块逻辑搞清楚了,那对 Scrapy 开发新的插件,或者在它的基础上进行二次开发也非常简单了。

总结一下整个抓取流程,还是用这两张图表示再清楚不过:


Scrapy 整体给我的感觉是,虽然它只是个单机版的爬虫框架,但我们可以非常方便地编写插件,或者自定义组件替换默认的功能,从而定制化我们自己的爬虫,最终可以实现一个功能强大的爬虫框架,例如分布式、代理调度、并发控制、可视化、监控等功能,它的灵活度非常高。

近期文章推荐:

微信搜索关注「水滴与银弹」公众号,第一时间获取优质技术干货。7年资深后端研发,用简单的方式把技术讲清楚。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表