当前位置: 首页 > 面试题库 >

如何使用Python asyncio限制并发性?

虞航
2023-03-14
问题内容

假设我们有很多链接可供下载,并且每个链接可能花费不同的时间来下载。而且我只能使用最多3个连接进行下载。现在,我想确保使用asyncio有效地做到这一点。

这是我要实现的目标:在任何时间点,请尝试确保至少运行3个下载。

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

数字代表下载链接,连字符代表等待下载。

这是我现在正在使用的代码

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

输出是预期的:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

但是这是我的问题:

  1. 目前,我只是在等待9秒钟以使主要功能保持运行状态,直到下载完成。在退出主要功能之前,是否有一种有效的方式等待上一次下载完成?(我知道有一个asyncio.wait,但是我需要存储所有任务引用才能使其正常工作)

  2. 做这种任务的好图书馆是什么?我知道javascript有很多异步库,但是Python呢?

编辑:2.有什么好的库可以处理常见的异步模式?(类似https://www.npmjs.com/package/async)


问题答案:

在阅读本答案的其余部分之前,请注意,惯用的方法是使用asyncio来限制并行任务的数量asyncio.Semaphore,优雅地抽象了该方法。这个答案包含有效的方法,但要实现这一点则要复杂得多。我留下答案的原因是,在某些情况下,这种方法比信号量具有优势,特别是当要完成的工作量很大或不受限制时,并且您无法提前创建所有协程。在这种情况下,第二个(基于队列的)解决方案就是您想要的答案。但是在大多数常规情况下,例如通过aiohttp并行下载,您应该使用信号量。

基本上,您需要一个固定大小的下载任务
asyncio虽然没有预先创建的任务池,但是创建一个任务池很容易:只需保留一组任务,不要让它超出限制。尽管这个问题表明您不愿意这样做,但是代码的结尾却更加优雅:

async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

一种替代方法是创建一定数量的协程进行下载,就像固定大小的线程池一样,并使用来喂它们工作asyncio.Queue。这消除了手动限制下载数量的需要,下载数量将自动受到协程调用数量的限制download()

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

至于您的其他问题,显而易见的选择是aiohttp



 类似资料:
  • 如何编写限制Qpromise并发的方法? 例如,我有一个方法。 我希望一次生成不超过5个进程,但对调用代码是透明的。 我需要实现的是一个带有签名的函数 我可以这样称呼他 我已经开始编写我的版本,但我想知道是否有人有一个简洁的实现,我可以对照它进行检查。

  • 我需要限制使用weblogic登录我的web应用的用户数量。 找到的解决方案是oracle documents“weblogic.http.session.maxConcurrentRequest属性限制会话的并发请求数。如果给定会话的并发请求数超过指定值,servlet容器将开始拒绝请求。默认情况下,此属性设置为-1,这表示servlet容器不施加任何限制。” 但不知道在哪里或如何设置这些设置。

  • 问题内容: 如何编写限制Q许诺并发的方法? 例如,我有一个方法。它返回一个Q承诺。 我希望一次生成的进程不超过5个,但是对于调用代码是透明的。 我需要实现的是带有签名的功能 我可以这样称呼 我已经开始处理我的版本,但是我想知道是否有人可以检查一个简洁的实现。 问题答案: 我有一个库可以为您做到这一点https://github.com/ForbesLindesay/throat 您可以通过brow

  • 我们的团队正在开始学习fp-ts,我们从一些基本的异步示例开始(大部分是从这里拉出来的)。按顺序运行一组任务很棒,看起来像问题是,在fp-ts中执行并行任务时限制并发的惯用方法是什么?例如,Promise.map(在蓝鸟中)允许您设置像这样的并发限制。 一种解决方案可能是将数组拆分为块,然后使用序列和平面图迭代这些块。然而,这意味着每个区块中的每个任务都必须完成,然后才能进入下一个区块-一个长时间

  • 问题内容: 我想获取当前地图的边界,以便可以使用Overpass API搜索这些边界。 对于传单,我知道该方法只是map.getBounds(),但我不知道如何在react-leaflet中实现该方法。 这就是我尝试过的。错误说那不是功能。 问题答案: 尝试。 根据文档: 您可以使用此组件中的this.leafletElement直接访问该组件创建的Leaflet元素。该传单元素通常在compon

  • 问题内容: 我得到了这个servlet,它将pdf文件返回到客户端Web浏览器。我们不希望冒任何风险,即当请求数量过多时,服务器将瘫痪。 我们希望采用一种应用程序级别(程序)的方式来设置并发请求数的限制,并在达到限制时向浏览器返回错误消息。我们需要在审批级别进行操作,因为我们在开发级别(tomcat)和生产级别(websphere)具有不同的servlet容器。 我必须强调,我想控制最大请求数,而