当前位置: 首页 > 面试题库 >

处理多个进程中的单个文件

仲孙向明
2023-03-14
问题内容

我有一个大文本文件,我想在其中处理每一行(执行一​​些操作)并将它们存储在数据库中。由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成。每个线程/进程应从单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作,然后将它们放入数据库中,以便最后我可以处理所有数据,数据库随我需要的数据一起转储了。

但是我无法弄清楚如何解决这个问题。


问题答案:

您正在寻找的是生产者/消费者模式

基本线程示例

这是使用线程模块的基本示例(而不是多处理)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

您不会与线程共享文件对象。您可以通过为队列提供数据行来为他们工作。然后,每个线程将接起一行,对其进行处理,然后将其返回到队列中。

在多处理模块中内置了一些更高级的功能来共享数据,例如列表和特殊的Queue。在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束。

基本的多处理池示例

这是一个多处理池的真正基本示例

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

池是管理其自身进程的便捷对象。由于打开的文件可以遍历其行,因此可以将其传递到pool.map(),该文件将循环遍历并将行传递给worker函数。映射将阻止并在完成后返回整个结果。请注意,这是一个过于简化的示例,pool.map()在进行工作之前,它将立即将整个文件读入内存。如果您希望有大文件,请记住这一点。有更多高级方法可以设计生产者/消费者设置。

手动“池”,具有限制和行重新排序

这是Pool.map的手动示例,但是您可以设置队列大小,以使只按其可以处理的最快速度逐个喂入,而不是一次性消耗整个可迭代对象。我还添加了行号,以便以后可以跟踪它们并引用它们。

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)


 类似资料:
  • 我想在同一个Java过程中使用KCL处理多个Kinesis流。 想法很简单:为每个流创建一个新的KCL实例,然后并发运行worker。 我的问题是,在这种情况下,所有KCL实例是否都使用相同的线程池,以及在处理流处理时,这种想法是否是一种好的/常见的做法。 非常感谢。

  • 问题内容: 有没有办法在单个函数调用上做到这一点? 就像是: 我知道这是一个语法混乱,但是只是为了给我一个我想实现的目标一个思路,一系列路由就很棒了! 有人知道怎么做吗? 问题答案: 我在寻找相同功能时遇到了这个问题。 @Jonathan Ong在上面的评论中提到,不建议将数组用于路径,但已在Express 4中对其进行了明确描述,并且它在Express 3.x中有效。这是尝试的示例: 从对象内部

  • 我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b

  • 嗯,我做了这个脚本,它支持日志一些击键一段时间保存在一个文件中,然后擦除文件,如果用户想要然而当脚本tryes删除文件我得到这个错误。 Traceback(最近一次调用最后一次):文件"C:\用户\Tormenter\Desktop\S. D. A. K. L\pregunta.py",第34行,os.remove(path 2"\"name)PermissionError:[WinError 3

  • 我在视图布局中使用了三个EditText小部件,用于三个不同的过滤器。如果我输入其中一条,另一条文本不应该是空白的吗? 下面是我的片段: GenericTextWatcher方法: 当我运行这个程序并输入EditText时,logcat看起来是这样的: 03-03 15:25:39.616 25952-25952/com.xyz.abcI/art:显式并发标记扫描GC释放23671(1194KB)

  • 问题内容: 试图使用es6在没有多个处理程序的情况下创建动态状态,但我陷入了困境。我不知道下面的代码有什么问题 我检查了我的其他函数,handleAdvancePrice是罪魁祸首,但是出了什么问题? 问题答案: 罪魁祸首是后面的多余迹象。另外,也不需要使用单独的; y,因为您可以直接使用

  • 我知道子进程是进程,而不是线程。我使用了错误的语义,因为大多数人在谈到“多线程”时都知道您的意图。所以我会把它保留在标题中。 想象一下这样一个场景:使用一个自定义函数或模块,您连续有多个类似和复杂的事情要做。使用所有可用的核心/线程(例如8/16)非常有意义,这就是的目的。 理想情况下,您需要多个同时工作的人员,并向一个控制器发送/从一个控制器发送/回调消息。 node cpool、fork po

  • 我有一个开关声明,它检查提交的文本中是否存在特定的字符串。 通常,我会创建专用的文件来处理每个的POST数据。 但是如何在一个中处理多个POST。 对于每个ajax数据,我都包含了一个唯一的标识符,这将作为PHP接收到的ajax的参考 但我不确定如何正确地编写PHP代码来处理提交的$_POST类型。