我在python中遇到以下问题。
我需要并行执行一些计算,这些计算的结果需要按顺序写入文件中。因此,我创建了一个函数,该函数接收multiprocessing.Queue
和文件句柄,进行计算并将结果打印到文件中:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
但是脚本运行后文件最终为空。我试图将worker()函数更改为:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
并将文件名作为参数传递。然后它按我的预期工作。当我尝试按顺序执行相同的操作而不进行多处理时,它也可以正常工作。
为什么它在第一个版本中不起作用?我看不到问题。
另外:我可以保证两个进程不会尝试同时写入文件吗?
编辑:
谢谢。我知道了 这是工作版本:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join ()
for p in calcProc:
p.join()
writProc.join ()
您确实应该使用两个队列和三种单独的处理。
将内容放入队列#1。
从队列1中取出内容并进行计算,然后将内容放入队列2中。您可以有许多这样的应用程序,因为它们从一个队列中安全地放入另一个队列中。
从队列2中获取内容并将其写入文件。您必须恰好具有这些中的1个,并且不能再有其他。它“拥有”文件,保证原子访问,并绝对确保文件被整洁且一致地写入。
我是Spring批处理的新手,目前正在处理一个新的批处理作业。我的批处理作业: 读取器:读取数据库中的记录(即5家公司的员工) 处理器:处理记录 5个writer:过滤并写入每个文件(即A公司的员工在A路径写入A公司文件,B公司的员工在B路径写入B公司文件),最后一个writer将A公司的所有离职员工写入D公司。 任务:将文件保存到数据库中。 目前,我正在使用CompositeItemWriter
我目前正在编写spring batch,在这里我读取一个XML数据,对其进行处理,然后将处理结果作为< code>map传递 生成用于编组和解组的JAXB xjc类。JAXB生成的类如下所示。 JAXB员工类 Spring XML配置文件 处理器和写入器: 问题: XML文件对于每个固定的提交间隔都是重写的,很明显,XML文件是在每个提交级别之后创建的。但我必须附加所有
问题内容: 我正在使用opencsv,并希望通过多个会话写入文件。但是,每次我启动新的CSVWriter时,旧文件都会被删除。我可以更改CSVWriter的行为以在文件末尾写入而不是替换文件吗? 问题答案: FileWriter中有一个选项而不是CSVWriter可以附加在文件末尾。 此代码使其工作:
我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目
当通过java将列名写入CSV文件时,列中两个单词之间的空格将显示为双引号。。。 例如,测试数据[0][36]=“True Accept”;在csv文件中显示为True“Accept”。。 有没有办法解决这个问题? 代码如下: 编辑:这是解决方案: CSVWriter writer=new CSVWriter(new FileWriter(“list\u of_churers.csv”)、'、'、
我有几个不同的Spring批处理作业需要写入同一个平面文件。在平面文件中写入数据的顺序并不重要。这些批处理作业可能同时运行。 FlatFileItemWriter是否合适?我担心的是,如果多个作业同时尝试写入平面文件,数据可能会混合在一起。