我有一个大文本文件,我想在其中处理每一行(执行一些操作)并将它们存储在数据库中。由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成。每个线程/进程应从单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作,然后将它们放入数据库中,以便最后我可以处理所有数据,数据库随我需要的数据一起转储了。
但是我无法弄清楚如何解决这个问题。
您正在寻找的是生产者/消费者模式
基本线程示例
这是使用线程模块的基本示例(而不是多处理)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
您不会与线程共享文件对象。您可以通过为队列提供数据行来为他们工作。然后,每个线程将接起一行,对其进行处理,然后将其返回到队列中。
在多处理模块中内置了一些更高级的功能来共享数据,例如列表和特殊的Queue。在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束。
基本的多处理池示例
这是一个多处理池的真正基本示例
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
池是管理其自身进程的便捷对象。由于打开的文件可以遍历其行,因此可以将其传递到pool.map()
,该文件将循环遍历并将行传递给worker函数。映射将阻止并在完成后返回整个结果。请注意,这是一个过于简化的示例,pool.map()
在进行工作之前,它将立即将整个文件读入内存。如果您希望有大文件,请记住这一点。有更多高级方法可以设计生产者/消费者设置。
手动“池”,具有限制和行重新排序
这是Pool.map的手动示例,但是您可以设置队列大小,以使只按其可以处理的最快速度逐个喂入,而不是一次性消耗整个可迭代对象。我还添加了行号,以便以后可以跟踪它们并引用它们。
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
我想在同一个Java过程中使用KCL处理多个Kinesis流。 想法很简单:为每个流创建一个新的KCL实例,然后并发运行worker。 我的问题是,在这种情况下,所有KCL实例是否都使用相同的线程池,以及在处理流处理时,这种想法是否是一种好的/常见的做法。 非常感谢。
问题内容: 有没有办法在单个函数调用上做到这一点? 就像是: 我知道这是一个语法混乱,但是只是为了给我一个我想实现的目标一个思路,一系列路由就很棒了! 有人知道怎么做吗? 问题答案: 我在寻找相同功能时遇到了这个问题。 @Jonathan Ong在上面的评论中提到,不建议将数组用于路径,但已在Express 4中对其进行了明确描述,并且它在Express 3.x中有效。这是尝试的示例: 从对象内部
我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b
嗯,我做了这个脚本,它支持日志一些击键一段时间保存在一个文件中,然后擦除文件,如果用户想要然而当脚本tryes删除文件我得到这个错误。 Traceback(最近一次调用最后一次):文件"C:\用户\Tormenter\Desktop\S. D. A. K. L\pregunta.py",第34行,os.remove(path 2"\"name)PermissionError:[WinError 3
我在视图布局中使用了三个EditText小部件,用于三个不同的过滤器。如果我输入其中一条,另一条文本不应该是空白的吗? 下面是我的片段: GenericTextWatcher方法: 当我运行这个程序并输入EditText时,logcat看起来是这样的: 03-03 15:25:39.616 25952-25952/com.xyz.abcI/art:显式并发标记扫描GC释放23671(1194KB)
问题内容: 试图使用es6在没有多个处理程序的情况下创建动态状态,但我陷入了困境。我不知道下面的代码有什么问题 我检查了我的其他函数,handleAdvancePrice是罪魁祸首,但是出了什么问题? 问题答案: 罪魁祸首是后面的多余迹象。另外,也不需要使用单独的; y,因为您可以直接使用
我知道子进程是进程,而不是线程。我使用了错误的语义,因为大多数人在谈到“多线程”时都知道您的意图。所以我会把它保留在标题中。 想象一下这样一个场景:使用一个自定义函数或模块,您连续有多个类似和复杂的事情要做。使用所有可用的核心/线程(例如8/16)非常有意义,这就是的目的。 理想情况下,您需要多个同时工作的人员,并向一个控制器发送/从一个控制器发送/回调消息。 node cpool、fork po
我有一个开关声明,它检查提交的文本中是否存在特定的字符串。 通常,我会创建专用的文件来处理每个的POST数据。 但是如何在一个中处理多个POST。 对于每个ajax数据,我都包含了一个唯一的标识符,这将作为PHP接收到的ajax的参考 但我不确定如何正确地编写PHP代码来处理提交的$_POST类型。