当前位置: 首页 > 面试题库 >

我将如何在实时场景中使用并发。期货和队列?

景轶
2023-03-14
问题内容

concurrent.futures如下所示,使用Python 3的模块进行并行工作非常容易。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()

将项目插入和检索到队列中也非常方便。

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

我有一个脚本在后台运行,以监听更新。现在,理论上假设,随着这些更新的到来,我将对它们进行排队,并使用进行并发处理ThreadPoolExecutor

现在,单独地,所有这些组件都是独立工作的,并且很有意义,但是我如何一起使用它们呢?我不知道是否有可能ThreadPoolExecutor实时从队列中馈送工作,除非预先确定要工作的数据?

简而言之,我要做的就是,每秒接收4条消息的更新,将它们推送到队列中,并让我的current.futures对其进行处理。如果我不这样做,那我就会陷入缓慢的顺序方法中。

让我们以下面的Python文档中的规范示例为例:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

的列表URLS是固定的。是否可以实时提供此列表,并让工作人员从列表中进行处理,也许出于管理目的而从队列中进行处理?我有点困惑我的方法是否 真能


问题答案:

Python文档中的示例已扩展为可以从队列中进行工作。需要注意的更改是,此代码使用concurrent.futures.wait而不是concurrent.futures.as_completed允许在等待其他工作完成时开始新工作。

import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
    """ Simulate outside actors sending in work to do, request each url twice """
    for url in URLS + URLS:
        time.sleep(spacing)
        q.put(url)
    return "DONE FEEDING"

def load_url(url, timeout):
    """ Retrieve a single page and report the URL and contents """
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    # start a future for a thread which sends work in through the queue
    future_to_url = {
        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

    while future_to_url:
        # check for status of the futures which are currently working
        done, not_done = concurrent.futures.wait(
            future_to_url, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

        # if there is incoming work, start a new future
        while not q.empty():

            # fetch a url from the queue
            url = q.get()

            # Start the load operation and mark the future with its URL
            future_to_url[executor.submit(load_url, url, 60)] = url

        # process any completed futures
        for future in done:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                if url == 'FEEDER DONE':
                    print(data)
                else:
                    print('%r page is %d bytes' % (url, len(data)))

            # remove the now completed future
            del future_to_url[future]

url两次获取的输出:

'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes


 类似资料:
  • 本文向大家介绍消息队列的作用和使用场景相关面试题,主要包含被问及消息队列的作用和使用场景时的应答技巧和注意事项,需要的朋友参考一下 通过异步处理提高响应时间,削峰填谷: 场景:数据比较集中且实时要求不是太高,如果同步处理,假如业务高峰需要4台服务支撑,那么在业务高峰过了之后,就会出现资源闲置,如果引入消息队列的话,将数据放到消息队列后直接返回成功,提升了响应时间,真正的业务在消息队列后面消费处理,

  • 我有一种情况,我需要在网站(Selenium)中执行一些步骤,在那里我执行一些步骤来创建一些数据,并将它们推送到移动应用程序。我想在移动端(Appium)上工作,并将流量领先。一旦我在手机上完成操作。我必须再次在网站上验证相同的内容。这一切都需要在一个场景中完成,因为我不希望我的场景依赖于其他场景。基本上,web流将出现在我将要写的每个场景中。 现在,当我尝试使用Background或来完成此操作

  • 如何将JavaFX中的线程与FXML以及任务或服务类一起使用? 我在我的程序中需要并发,因为我使用了一个很长的循环。如果我“手动”(没有FXML)编写这个代码,那么它就可以工作。但使用FXML它不起作用(JavaFX Scene Builder)。而不是在控制台中打印出“字符串缓冲区”变量,我想把它写在文本区域(这是一个可更新的组件)中。但在这种情况下,当然会出现错误消息,因为它不再属于JavaF

  • 问题内容: 从JavaDocs: 一的ConcurrentLinkedQueue是当许多线程共享访问一个共同的集合一个合适的选择。此队列不允许空元素。 ArrayBlockingQueue是一个经典的“有界缓冲区”,其中固定大小的数组保存由生产者插入并由消费者提取的元素。此类支持可选的公平性策略,用于订购正在等待的生产者和使用者线程 与基于阵列的队列相比,LinkedBlockingQueue通常

  • 本篇为Powershell攻击指南——黑客后渗透之道系列之实战篇,主要介绍的一些实用的利用方式与利用场景和一些实用工具。 在实际的渗透环境中我们利用Powershell的方式无非两种: 使用编码的方式对Powershell命令进行编码之后运行 远程下载Powershell代码之后直接运行 两种方式各有利弊,第一种比较方便直接编码即可执行,第二种需要到一台拥有公网IP的机器,那么在实际的渗透环境中如

  • cucumber场景使用JUnit运行。 我有一个用例,需要根据Jira票据的状态跳过特定的Cumber场景。然后,我需要将该场景标记为在html报告中传递。 我可以使用PendingException()或AssumptionViolatedException()轻松跳过场景,两者都可以正常跳过场景执行并将步骤标记为跳过。但是场景本身将被标记为失败,我希望它被通过。 无论如何,我可以做到这一点?