当前位置: 首页 > 面试题库 >

Python asyncio:处理潜在的无限列表

龙浩博
2023-03-14
问题内容

我有以下情况:

  • Python 3.6+
  • 从文件中逐行读取输入数据。
  • 协程将数据发送到API(使用aiohttp),并将调用结果保存到Mongo(使用motor)。因此,有很多IO正在进行。

该代码使用async/编写await,并且对于手动执行的单个调用也可以正常工作。

我不知道该怎么做,就是要大量使用输入数据。

asyncio我看到的所有示例都asyncio.wait通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。

我的情况是关于通过传送带将数据流传输到消费者。

我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,而又不会感到不知所措。


问题答案:

我的情况是关于通过传送带将数据流传输到消费者。我还可以做些什么?

您可以创建固定数量的任务,这些任务大致与传送带的容量相对应,然后将它们弹出队列。例如:

async def consumer(queue):
    while True:
        line = await queue.get()
        # connect to API, Mongo, etc.
        ...
        queue.task_done()

async def producer():
    N_TASKS = 10
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(N_TASKS)
    tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
    try:
        with open('input') as f:
            for line in f:
                await queue.put(line)
        await queue.join()
    finally:
        for t in tasks:
            t.cancel()

由于与线程不同,任务是轻量级的,并且不占用操作系统资源,因此最好在创建“太多”任务时犯错。asyncio可以毫不费力地处理成千上万的任务,尽管这对于这些任务来说可能是过大的事-
几十就足够了。



 类似资料:
  • 问题内容: 我的问题很简短。我不明白为什么我的程序在捕获错误时会无限循环。我做了一个新的try- catch语句,但是它循环了,甚至复制,粘贴并修改了以前有效的程序中的适当变量。下面是语句本身,下面是整个程序。谢谢您的帮助! 程序: 问题答案: 您的程序将永远运行,因为在不更改扫描仪状态的情况下进行调用会一次又一次地引发异常:如果用户未输入,则调用不会更改扫描仪的外观,因此,当您在下一次迭代中进行

  • 你和我都想去听音乐会,但只剩下一张票了。我们都试图同时预订那张票。使用AWS SDK for PHP的PHP脚本运行以下updateItem API调用,该调用试图将“票据”的“状态”从0(可用)更改为1(保持)。 对于这个用例,这是正确的方法吗? 有人看过DynamoDB文档吗? 如果这是一个正确的实现,对测试方案有什么建议吗?

  • 问题内容: 为什么下面的代码不输出任何输出,而如果我们删除parallel,则输出0、1? 尽管我知道理想情况下应该将限制放在不同的位置,但是我的问题与添加并行处理导致的差异更多有关。 问题答案: 真正的原因是 有序并行 是完整的屏障操作,如文档中所述: 保持并行管道的稳定性是相对昂贵的(要求操作充当一个完整的屏障,并具有大量缓冲开销),并且通常不需要稳定性。 “完全屏障操作”是指必须先执行所有上

  • 我正在将数据仓库前端从Hadoop后端转换为雪花后端,我需要将其每个SQL函数转换为雪花等效函数。我遇到的一个麻烦是处理产生无穷大/-无穷大或NaN(不是一个数字)的公式。 在Hadoop中,这是一个使用IS_INF和is_nan检查公式的简单问题。在SQL Server(我们测试的另一个后端)中,这可以使用SET ARITHABORT off/SET ANSI_WARNINGS off来完成。H

  • 我知道对无限列表进行排序是不可能的,但我正试图为n个数的倍数的无限递增列表写一个定义。 我已经有这个功能了 它返回n的无限倍数列表。但现在我想构建一个函数,给定一个返回列表中所有数字的倍数的无限递增列表。所以函数

  • 我使用的是Amazon SQS队列,我有一个使用队列中的消息的类。我正在努力使消息尽可能接近实时地被消耗,所以我需要消耗代码被无休止地运行。队列中的消息将持续超过半天。 在web服务器上,使用delayed_job或sidekiq在后台连续运行进程。 备选方案2 有一个单独的服务器,有一个专用于使用消息的ruby应用程序。 备选方案3 将SQS使用者放置在rake任务中,并使用系统调用在后台启动该