当前位置: 首页 > 知识库问答 >
问题:

使用外部命令多处理数千个文件

苗学民
2023-03-14

我想从Python中为大约8000个文件启动一个外部命令。每个文件都独立于其他文件进行处理。唯一的限制是在处理完所有文件后继续执行。我有4个物理核,每个物理核有2个逻辑核(multiprocessing.cpu\u count()返回8)。我的想法是使用一个由四个并行独立进程组成的池,这些进程将在8个内核中的4个上运行。这样我的机器就可以同时使用了。

以下是我一直在做的事情:

import multiprocessing
import subprocess
import os
from multiprocessing.pool import ThreadPool


def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.Popen, (cmd,))
    pool.close()
    pool.join()


def main():
    process_files('dir1', 'dir2', 'mol:H')
    do_some_stuff('dir2')
    process_files('dir2', 'dir3', 'mol:a')
    do_more_stuff('dir3')

连续处理一批100个文件需要120秒。上述多处理版本(函数处理\u文件)只需20秒即可完成批处理。但是,当我对整套8000个文件运行process_files时,我的电脑会挂起,一小时后不会解除冻结。

我的问题是:

1) 我认为ThreadPool应该初始化一个进程池(确切地说是multiprocessing.cpu\u count()/2进程池)。然而,我的电脑挂起了8000个文件,而不是100个,这表明可能没有考虑池的大小。要么这样,要么我做错了什么。你能解释一下吗?

2) 当每个进程都必须启动一个外部命令时,这是在Python下启动独立进程的正确方法吗?并且以这样的方式,所有资源都不会被进程占用?

共有2个答案

裴翰学
2023-03-14

如果您使用Python 3,我会考虑使用<代码> map < /COD>方法>代码>并发。期货ThreadPoolExecutor。

或者,您可以自己管理子流程列表。

以下示例定义了一个函数,用于启动ffmpeg,将视频文件转换为Theora/Vorbis格式。它为每个启动的子流程返回一个Popen对象。

def startencoder(iname, oname, offs=None):
    args = ['ffmpeg']
    if offs is not None and offs > 0:
        args += ['-ss', str(offs)]
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
            'libvorbis', '-q:a', '3', '-sn', oname]
    with open(os.devnull, 'w') as bb:
        p = subprocess.Popen(args, stdout=bb, stderr=bb)
    return p

在主程序中,一个表示正在运行的子进程的Popen对象列表是这样维护的。

outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
    # Wait while the list of processes is full.
    while len(procs) == maxprocs:
        manageprocs(procs)
    # Add a new process
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
    procs.append(startencoder(ifile, ogvname, offset))
    ogvlist.append(ogvname)
# All jobs have been submitted, wail for them to finish.
while len(procs) > 0:
    manageprocs(procs)

因此,只有当运行的子进程少于核心时,才会启动新进程。多次使用的代码被分为manageprocs功能。

def manageprocs(proclist):
    for pr in proclist:
        if pr.poll() is not None:
            proclist.remove(pr)
    sleep(0.5)

调用sleep可防止程序在循环中旋转。

谢英耀
2023-03-14

我认为您的基本问题是子流程的使用。Popen。该方法不会在返回之前等待命令完成。由于该函数立即返回(即使命令仍在运行),因此就线程池而言,该函数已完成,并且可以生成另一个。。。这意味着最终会产生8000个左右的进程。

使用子流程可能会更幸运。检查呼叫

Run command with arguments.  Wait for command to complete.  If
the exit code was zero then return, otherwise raise
CalledProcessError.  The CalledProcessError object will have the
return code in the returncode attribute.

因此:

def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.check_call, (cmd,))
    pool.close()
    pool.join()

如果您真的不关心退出代码,那么您可能需要子流程。调用,如果流程中存在非零退出代码,则不会引发异常。

 类似资料:
  • 问题内容: 我想将一些文件复制到我的图像中,并希望使用ADD命令。我在Docker文档中阅读了有关ADD的正则表达式的信息,但是我不知道我可以使用哪种表达式? 我想要这样的东西 注意:该表达式是错误的,但我这样做是为了向您展示对ADD命令的期望。(我在shell正则表达式中进行了思考)。 那么,我该怎么做呢?我无法访问链接filepath.Match。如果有人有这些规则,请告诉我? 更新资料 我正

  • 实际付款将由外部合作伙伴完成。我想通过以下事件在我的系统中跟踪它:支付请求发出后,合作伙伴同意支付或拒绝支付。 命令发出的每个事件都应该登记在同一个数据库事务中。 给我在Axon 4的搭档打电话的最佳做法是什么? null 记住这句话,我就明白了我应该创建和事件一样多的命令?但在这种情况下,所有这些命令的意义是什么?我是不是应该得到这样的结果: 我的命令将生成事件。 然后从某个地方打电话给我的合作

  • 问题内容: 我正在尝试为Node.js应用程序设置环境。但我每次都会收到此错误。 无法将“ NODE_ENV”识别为内部或外部命令,可操作命令或批处理文件。 这是什么意思,我该如何解决? 我正在使用Windows,也尝试过但没有运气。 问题答案: 听起来您的错误是由于尝试运行类似命令(在Linux中有效)而引起的 Windows中的等效项是 在同一命令外壳中运行。您提到set NODE_ENV不起

  • Vim遵循UNIX哲学"做一件事,做好它"。 与其试图集成你可能想要的功能到编辑器自身,更好的办法是在适当时使用Vim来调用外部命令。 让我们在插件中添加一些跟Potion编译器交互的命令,来浅尝在Vim里面调用外部命令的方法。 编译 首先我们将加入一个命令来编译和执行当前Potion文件。 有很多方法可以实现这一点,不过我们暂且用外部命令简单地实现。 在你的插件的repo中创建potion/ft

  • 我是反应编程的新手,并尝试使用project reactor模拟下面的用例,但我发现将响应从一个服务调用传递到另一个依赖的服务有点困难。如有任何建议或参考,将不胜感激。 响应getDetails(Request inputRequest){ 我尝试了下面的示例,它对一个服务调用起作用,但不能将响应传递给其他依赖的服务调用。 null

  • 主要内容:GCC编译多文件项目通过前面几节的学习,读者已经了解了如何使用 gcc(g++)指令调用 GCC 编译器编译(包括预处理、编译、汇编和链接)C 或者 C++ 源代码,例如: [root@bogon demo]# ls demo1.c  demo2.c [root@bogon demo]# cat demo1.c #include<stdio.h> int main(){     printf("GCC:https:/