当前位置: 首页 > 知识库问答 >
问题:

aiohttp:按域限制每秒请求的速率

归松
2023-03-14

我正在写一个网络爬虫,运行并行抓取许多不同的域。我想限制每秒向每个域发出的请求数量,但我不关心打开的连接总数,也不关心跨所有域发出的每秒请求总数。我想最大限度地增加打开连接和每秒请求的总数,同时限制对单个域的每秒请求数。

我可以找到的所有现有示例(1)限制打开连接的数量,或(2)限制在fetch循环中每秒发出的请求总数。例子包括:

  • aiohttp:速率限制并行请求

它们都不做我要求的事情,即限制每个域每秒的请求。第一个问题只回答如何限制每秒的总体请求。第二个问题甚至没有实际问题的答案(OP询问每秒的请求,而答案都是关于限制连接)。

下面是我尝试的代码,使用我为同步版本制作的简单速率限制器,当DomainTimer代码在异步事件循环中运行时,它不起作用:

from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
import async_timeout
import aiohttp
from urllib.parse import urlparse
from queue import Queue, Empty

from HTMLProcessing import processHTML
import URLFilters

SEED_URLS = ['http://www.bbc.co.uk', 'http://www.news.google.com']
url_queue = Queue()
for u in SEED_URLS:
    url_queue.put(u)

# number of pages to download per run of crawlConcurrent()
BATCH_SIZE = 100
DELAY = timedelta(seconds = 1.0) # delay between requests from single domain, in seconds

HTTP_HEADERS = {'Referer': 'http://www.google.com', 
                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0'}


class DomainTimer():
    def __init__(self):
        self.timer = None

    def resetTimer(self):
        self.timer = datetime.now()

    def delayExceeded(self, delay):
        if not self.timer: #We haven't fetched this before
            return True
        if (datetime.now() - self.timer) >= delay:
            return True
        else:
            return False


crawl_history = defaultdict(dict) # given a URL, when is last time crawled?
domain_timers = defaultdict(DomainTimer)

async def fetch(session, url):
    domain = urlparse(url).netloc
    print('here fetching ' + url + "\n")
    dt = domain_timers[domain]

    if dt.delayExceeded(DELAY) or not dt:
        with async_timeout.timeout(10):
            try:
                dt.resetTimer() # reset domain timer
                async with session.get(url, headers=HTTP_HEADERS) as response:
                    if response.status == 200:
                        crawl_history[url] = datetime.now()
                        html = await response.text()
                        return {'url': url, 'html': html}
                    else:
                        # log HTTP response, put into crawl_history so
                        # we don't attempt to fetch again
                        print(url + " failed with response: " + str(response.status) + "\n")
                        return {'url': url, 'http_status': response.status}

            except aiohttp.ClientConnectionError as e:
                print("Connection failed " + str(e))

            except aiohttp.ClientPayloadError as e: 
                print("Recieved bad data from server @ " + url + "\n")

    else: # Delay hasn't passed yet: skip for now & put @ end of q
        url_queue.put(url);
        return None


async def fetch_all(urls):
    """Launch requests for all web pages."""
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(session, url))
            tasks.append(task) # create list of tasks
        return await asyncio.gather(*tasks) # gather task responses


def batch_crawl():
    """Launch requests for all web pages."""
    start_time = datetime.now()

    # Here we build the list of URLs to crawl for this batch
    urls = []
    for i in range(BATCH_SIZE):
        try:
            next_url = url_queue.get_nowait() # get next URL from queue
            urls.append(next_url)
        except Empty:
            print("Processed all items in URL queue.\n")
            break;

    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)  
    pages = loop.run_until_complete(fetch_all(urls))
    crawl_time = (datetime.now() - start_time).seconds
    print("Crawl completed. Fetched " + str(len(pages)) + " pages in " + str(crawl_time) + " seconds.\n")  
    return pages


def parse_html(pages):
    """ Parse the HTML for each page downloaded in this batch"""
    start_time = datetime.now()
    results = {}

    for p in pages:
        if not p or not p['html']:
            print("Received empty page")
            continue
        else:
            url, html = p['url'], p['html']
            results[url] = processHTML(html)

    processing_time = (datetime.now() - start_time).seconds
    print("HTML processing finished. Processed " + str(len(results)) + " pages in " + str(processing_time) + " seconds.\n")  
    return results


def extract_new_links(results):
    """Extract links from """
    # later we could track where links were from here, anchor text, etc, 
    # and weight queue priority  based on that
    links = []
    for k in results.keys():
        new_urls = [l['href'] for l in results[k]['links']]
        for u in new_urls:
            if u not in crawl_history.keys():
                links.append(u)
    return links

def filterURLs(urls):
    urls = URLFilters.filterDuplicates(urls)
    urls = URLFilters.filterBlacklistedDomains(urls)
    return urls

def run_batch():
    pages = batch_crawl()
    results = parse_html(pages)
    links = extract_new_links(results)
    for l in filterURLs(links):
        url_queue.put(l)

    return results

没有抛出错误或异常,并且速率限制代码在同步抓取中运行良好,但在异步循环中运行DomainTimer时没有明显的效果。不支持每个域每秒一个请求的延迟。。。

如何修改此同步速率限制代码以在异步事件循环中工作?谢谢!

共有2个答案

仲浩旷
2023-03-14

我开发了一个名为octopusapi的库(https://pypi.org/project/octopus-api/),这使您能够在引擎盖下使用aiohttp对endpoint的连接数进行分级限制和设置。它的目标是简化所有需要的aiohttp设置。

下面是一个如何使用它的示例,其中get_ethereum是用户定义的请求函数。它也可能是一个网络爬虫功能请求或任何合适的请求:

from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List

if __name__ == '__main__':
    async def get_ethereum(session: TentacleSession, request: Dict):
        async with session.get(url=request["url"], params=request["params"]) as response:
            body = await response.json()
            return body

    client = OctopusApi(rate=50, resolution="sec", connections=6)
    result: List = client.execute(requests_list=[{
        "url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
        "params": {}}] * 1000, func=get_ethereum)
    print(result)

触手会话的工作原理与您为aiohttp编写POST、GET、PUT和PATCH的方式相同。客户会议。

让我知道,如果它有助于您的速度限制和爬行连接相关的问题。

储臻
2023-03-14

调试您的代码很困难,因为它包含许多不相关的东西,所以在一个新的简单示例中更容易展示您的想法。

主要思想:

  • 使用接受url(域)

代码:

import asyncio
import aiohttp
from urllib.parse import urlparse
from collections import defaultdict


class Limiter:
    # domain -> req/sec:
    _limits = {
        'httpbin.org': 4,
        'eu.httpbin.org': 1,
    }

    # domain -> it's lock:
    _locks = defaultdict(lambda: asyncio.Lock())

    # domain -> it's last request time
    _times = defaultdict(lambda: 0)

    def __init__(self, url):
        self._host = urlparse(url).hostname

    async def __aenter__(self):
        await self._lock

        to_wait = self._to_wait_before_request()
        print(f'Wait {to_wait} sec before next request to {self._host}')
        await asyncio.sleep(to_wait)

    async def __aexit__(self, *args):        
        print(f'Request to {self._host} just finished')

        self._update_request_time()
        self._lock.release()

    @property
    def _lock(self):
        """Lock that prevents multiple requests to same host."""
        return self._locks[self._host]

    def _to_wait_before_request(self):
        """What time we need to wait before request to host."""
        request_time = self._times[self._host]
        request_delay = 1 / self._limits[self._host]
        now = asyncio.get_event_loop().time()
        to_wait = request_time + request_delay - now
        to_wait = max(0, to_wait)
        return to_wait

    def _update_request_time(self):
        now = asyncio.get_event_loop().time()
        self._times[self._host] = now


# request that uses Limiter instead of Semaphore:
async def get(url):
    async with Limiter(url):
        async with aiohttp.ClientSession() as session:  # TODO reuse session for different requests.
            async with session.get(url) as resp:
                return await resp.text()


# main:
async def main():
    coros = [
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
    ]

    await asyncio.gather(*coros)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
 类似资料:
  • 问题内容: API通常具有用户必须遵循的速率限制。举个例子,让我们50个请求/秒。连续的请求采取0.5-1秒,因此是来接近极限速度太慢。但是,使用aiohttp的并行请求超出了速率限制。 轮询API尽可能快地允许,需要限速并行调用。 例如,我发现到目前为止装饰,大约像这样: 这非常适用于连续通话。试图并行调用来实现这个按预期不起作用。 下面是一些代码示例: 这里的问题是,它会率限制 排队 的任务。

  • 问题内容: 如何使用aiohttp在客户端设置每秒最大请求数(限制请求数)? 问题答案: 我在这里找到了一种可能的解决方案:http : //compiletoi.net/fast-scraping-in-python-with- asyncio.html 同时执行3个请求很酷,但是同时执行5000个则不太好。如果您尝试同时执行太多请求,则连接可能会开始关闭,甚至可能被网站禁止。 为避免这种情况,

  • 问题内容: 我正在用GRequests和lxml在Python 2.7.3中编写一个小脚本,这将允许我从各个网站收集一些可收集的卡价格并进行比较。问题是网站之一限制了请求的数量,如果我超过了它,则会发回HTTP错误429。 有没有一种方法可以限制GRequestes中的请求数量,以使我不超过我指定的每秒请求数量?另外-如果发生HTTP 429,如何让GRequestes在一段时间后重试? 附带说明

  • 我正在用Python 2.7.3编写一个小脚本,其中包含GRequests和lxml,它将允许我从各种网站收集一些可收集的卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回HTTP错误429。 有没有办法在grequests中增加限制请求数,这样我就不会超过我指定的每秒请求数?还有——如果HTTP 429出现,我如何让GRequestes在一段时间后重试? 另一方面,他们

  • 问题内容: 我正在使用mikeal / request 进行API调用。我最常使用的API之一(Shopify API)。最近设置了新的通话限制,我看到类似以下错误: 我已经进行了升级,但是无论获得多少带宽,我都必须考虑这一点。对Shopify API的大部分请求都在async.map()函数内,该函数循环异步请求并收集主体。 我正在寻找任何帮助,也许是一个已经存在的库,该库将环绕请求模块,并实际

  • 我看的是Incapsula的速率限制,它基于每分钟限制请求等。 有没有一种基于每秒的速率限制方法?因此,如果ip超过每秒1个请求,那么就阻止它。 干杯