当前位置: 首页 > 面试题库 >

与工作线程“线程”等效的asyncio.Queues

从阎宝
2023-03-14
问题内容

我试图弄清楚如何移植要使用的线程程序asyncio。我有很多代码可以围绕几个标准库进行同步Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

一个线程创建值(可能是用户输入),而另一个线程对它们执行某些操作。关键是这些线程在出现新数据之前一直处于空闲状态,此时它们将唤醒并对其进行处理。

我正在尝试使用asyncio实现此模式,但是我似乎无法弄清楚如何使其“运行”。

我的尝试或多或少看起来像这样(根本不做任何事情)。

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()?

loop = asyncio.get_event_loop()
loop.run_forever()

我尝试过使用协程,不使用协程,在Tasks中包装内容,试图使它们创建或返回期货等的变体。

我开始认为我对应该如何使用asyncio有错误的想法(也许应该以我不知道的其他方式来实现此模式)。任何指针将不胜感激。


问题答案:

对,就是这样。任务是您的朋友:

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

asyncio.ensure_future 也可以用于任务创建。

并且请记住:q.put()协程 ,因此您应该使用yield from q.put(value)

UPD

转自asyncio.Task()/asyncio.async()新品牌的APIloop.create_task()asyncio.ensure_future()示例所示。



 类似资料:
  • 主线程通常被用于运行主循环,而主循环负责的都是 UI 相关的工作,所以也可以说主线程是 UI 线程。为了不影响 UI 线程的工作效率,我们会需要创建额外的线程来负责各种各样的工作,而这些线程就是工作线程。 在主循环的章节中,我们已经了解到主循环执行频率影响界面的流畅度,它的每一次循环都会按顺序执行处理定时器、处理事件队列、更新组件、渲染组件等任务,其中最容易影响到主循环的执行频率的任务是处理事件队

  • 我有4-5个工作线程处理大型消息队列。我还有另一段代码,它使用2-3个worker运行。我想在处理大型消息队列时阻止所有其他工作者。 我正在使用JDK6和Jms 编辑: 队列进程工作者从未终止。当没有消息时,它们阻塞队列。这些工作者由执行器线程池管理,如果我使用读写锁,其中一个工作者也会被阻塞。此外,如果使用循环屏障,那么我必须终止线程,以便重新传递阻塞的第二个进程。由于工作者是由线程池管理的,所

  • 问题内容: Python中的和模块之间有什么区别? 问题答案: 在Python 3中,已重命名为。它是用于实现的基础结构代码,普通的Python代码不应该靠近它。 公开了底层操作系统级别流程的原始视图。这几乎绝不是您想要的,因此在Py3k中重命名以表明它实际上只是实现细节。 添加了一些额外的自动记帐功能,以及一些便捷实用程序,所有这些使它成为标准Python代码的首选。

  • 问题内容: 我正在为我的ubuntu服务器(针对我的多客户端匿名聊天程序)实现一种简单的线程池机制,并且需要使我的工作线程进入睡眠状态,直到需要执行一项工作(以函数指针和参数的形式) 。 我当前的系统即将关闭。我(工人线程正在)问经理是否有工作可用,以及是否有5毫秒没有睡眠。如果存在,请将作业添加到工作队列中并运行该函数。糟糕的循环浪费。 什么我 喜欢 做的是做一个简单的事件性的系统。我正在考虑有

  • 问题内容: 在http://marcio.io/2015/07/handling-1-million-requests-per-minute-with- golang/ 提供的示例中,很多地方都引用了该示例。 分派服务完许多工作后,工人池(chan chan工作)会不会耗尽?因为从信道和信道工作拉出第一类型后没有被补充被调用的第一次?还是我想念/误读了什么?如何为WorkerPool补充可用的工作

  • 我有一个应用程序调用插件DLL。其中一些调用是从辅助线程(即,不是UI线程)执行的,并且可能会弹出一个带有MessageBox的对话框。根据这个(http://www.codeproject.com/articles/121226/messagebox-and-worker-threads),有效的UI线程被切换到调用MessageBox的线程。这会使应用程序“崩溃”,因为消息泵停止接收消息。有什