当前位置: 首页 > 面试题库 >

如何将asyncio与现有的阻止库一起使用?

丁理
2023-03-14
问题内容

我的阻塞功能很少foobar我无法更改(某些我无法控制的内部库。与一个或多个网络服务进行对话)。如何将其用作异步?例如,我将不执行以下操作。

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

这将是低效的,因为我foo在等待第一个输入的同时可以调用第二个输入bar。我如何包装它们以便它们可与asyncio一起使用(即new
asyncawait语法)?

让我们假设函数是可重入的。也就是说,foo如果已经foo处理过先前的事件,则可以再次调用。

更新资料

使用可重用的装饰器扩展答案。单击此处为例。

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner

问题答案:

这里有两个问题:首先,如何异步运行阻塞代码,其次,如何并行运行异步代码(异步是单线程的,因此GIL仍然适用,因此不是 真正的 并发,但我离题了。

可以使用asyncio.ensure_future创建并行任务,如此处所述。

要运行同步代码,您将需要在executor中运行阻塞代码。例:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')

async def non_blocking(loop, executor):
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),

            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),

            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))

如果要使用for循环调度这些任务(如您的示例中所示),则有几种不同的策略,但是基本方法是使用for循环(或列表理解等) 调度
任务,并通过asyncio等待它们。等待, 然后 检索结果。例:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]


 类似资料:
  • 问题内容: 我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。 然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。 现在,我想定期检查该结构到磁盘的位置,最好使用pickle。 由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上

  • 我试图在我的codeigniter应用程序中包含甘特图。接下来的教程一是:https://docs.dhtmlx.com/tutorials__connector_codeigniter__step6.html 当我深入研究这个问题并直接点击控制器时,我发现了一个问题: Sub_文件夹是实际的项目文件夹,其中有复制粘贴dhtmlx、dhtmlxGantt和dhtmlxScheduler文件夹。我不

  • 我正在创建一个Android库(.aar文件),我需要使用JNI。(如果可能的话,我很清楚谷歌不鼓励使用JNI/NDK,但在这种情况下,这是不可能的)。 我从一个独立的hello jni示例应用程序开始(首先学习jni),其中包含以下文件: HelloJni.java Android.mk 应用程序.mk 你好,jni.c 以下构建良好的应用程序(apk),我能够在我的设备上运行它,它打印“Hel

  • 问题内容: GitLab是一种免费的开放源代码方式,用于托管私有存储库,但它似乎不适用于Go。创建项目时,它将生成以下形式的URL: 哪里: 是gitlab服务器的IP地址 是有权访问私有存储库的用户组 Golang 1.2.1似乎不了解这种语法。 结果是: 有办法让它工作吗? 问题答案: 现在,此问题已在Gitlab 8. *中解决,但仍然不直观。确实,最困难的挑战是,以下步骤将使您克服这些挑战

  • 问题内容: 如何创建使芹菜任务看起来像的包装器?还是有更好的方法与Celery集成? Celery的创建者@asksol这样说: 将Celery用作异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。 但是我找不到任何专门针对框架的代码示例。 问题答案: 如官方网站上所述,这可以通过Celery 5.0版实现: htt

  • 问题内容: 是否有将javascript转换为java的工具,所以我可以使用GWT处理该项目? 更新 对于那些不知道的人,GWT(Google Web工具包)是编写Java并获取Javascript的工具包,所以我的问题是。 问题答案: 您到底在想什么?如果您正在寻找某种可以从Javascript生成GWT Java代码的自动工具,那么恐怕没有这种东西了。 您可以(并且应该)使用JavaScrip