当前位置: 首页 > 面试题库 >

具有有限CPU /端口的Python多线程处理

漆雕正奇
2023-03-14
问题内容

我有一个要 并行 处理的文件夹名称字典。在每个文件夹,里面是文件名的数组,我想在加工 系列

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

最终,我将创建一个名为folder_name的文件夹,其中包含名称为的文件len(folder_file_dict[folder_name][file_names_key])。我有这样的方法:

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

请注意time_consuming_method()上面的内容,由于通过UDP端口进行的调用会花费很长时间。我也仅限于在上面的阵列中使用UDP端口。因此,我必须等待time_consuming_methodUDP端口完成操作,然后才能再次使用该UDP端口。这意味着我一次只能len(udp_ports)运行线程。

因此,我最终将len(folder_file_dict.keys())通过len(folder_file_dict.keys())调用来创建线程process_files_in_series。我也有MAX_THREAD个计数。我正在尝试使用QueueThreading模块,但是我不确定我需要哪种设计。如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助。

注意

我没有试图提高读取/写入速度。我正在尝试并行调用time_consuming_methodunder
process_files_in_series。创建这些文件只是过程的一部分,而不是速率限制步骤。

另外,我要寻找一个解决方案,使用QueueThreading以及可能的Condition模块或相关于这些模块什么。线程池解决方案也可能会有所帮助。我不能使用进程,只能使用线程。

我也在寻找Python 2.7中的解决方案。


问题答案:

使用线程池:

#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool

folder_file_dict = {
    folder_name: {
        file_names_key: file_names_array
    }
}

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"
         ...

def mp_process(filenames):
    udp_port = free_udp_ports.get() # block until a free udp port is available
    args = filenames, udp_port
    try:
        return args, process_files_in_series(*args), None
    except Exception as e:
        return args, None, str(e)
    finally:
        free_udp_ports.put_nowait(udp_port)

free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
    free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
    if error is not None:
       print args, error

我认为如果不同文件名数组的处理时间可能不同,则不需要将线程数绑定到udp端口数。

如果我folder_file_dict正确理解了结构,则生成文件名数组:

def get_files_arrays(folder_file_dict=folder_file_dict):
    for folder_name_dict in folder_file_dict.itervalues():
        for filenames_array in folder_name_dict.itervalues():
            yield filenames_array


 类似资料:
  • 问题内容: 我过去两天一直在尝试构建具有多线程功能的刮板。不知何故我仍然无法管理它。最初,我尝试使用带有线程模块的常规多线程方法,但这并不比使用单个线程快。后来我了解到请求正在阻塞,并且多线程方法并没有真正起作用。因此,我不断研究并发现有关grequests和gevent的信息。现在,我正在使用gevent运行测试,它仍然没有比使用单个线程快。我的编码有误吗? 这是我课程的相关部分: 问题答案:

  • 问题内容: 我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求- 我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。 我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。 问题答案: 我希望有一个回调机制,当达到队列大

  • 问题内容: 我正在使用的类创建用于运行Web服务器的请求处理程序的固定线程池: 并且说明是: 创建一个线程池,该线程池重用在共享的 无边界 队列上运行的一组固定线程。 但是,我正在寻找实现与缓冲池完全相同的线程池实现,除了使用有 界 队列。有这样的实现吗?还是我需要为固定线程池实现自己的包装器? 问题答案: 您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExec

  • 问题内容: 在Linux中,当程序(可能具有多个线程)接收到诸如SIGTERM或SIGHUP之类的信号时会发生什么? 哪个线程拦截信号?多个线程可以得到相同的信号吗?有专门用于处理信号的特殊线程吗?如果不是,那么在处理信号的线程内部会发生什么?信号处理程序例程完成后,如何恢复执行? 问题答案: 根据您所使用的Linux内核版本,这会有些许细微差别。 假设有2.6个posix线程,并且如果您正在谈论

  • 我正在开发基于spring+Hibernate的web应用程序。在这个应用程序中,我必须对数据库中的50000个可用记录进行计算。当前逻辑:- 循环0到50000(所有50000记录彼此独立) 选择第i个元素 对第i个元素执行计算(删除CALCULATION_TEMP表(如果存在),创建新表CALCULATION_TEMP并在CALCULATION_TEMP表中插入计算) 在步骤3表上进行一些计算

  • 本文向大家介绍python实现多线程端口扫描,包括了python实现多线程端口扫描的使用技巧和注意事项,需要的朋友参考一下 一个简易的TCP端口扫描器,使用python3实现。 需求:扫描目标网站开放哪些端口号,将所有开放的端口号输出。 分析:使用socket连接,如果连接成功,认为端口开放,如果连接失败,认为端口关闭(有可能端口开放但连接失败,这里简单认为端口不开放) 使用到的库:socket,