当前位置: 首页 > 面试题库 >

通过多处理写入文件

籍光熙
2023-03-14
问题内容

我在python中遇到以下问题。

我需要并行执行一些计算,这些计算的结果需要按顺序写入文件中。因此,我创建了一个函数,该函数接收multiprocessing.Queue和文件句柄,进行计算并将结果打印到文件中:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

但是脚本运行后文件最终为空。我试图将worker()函数更改为:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

并将文件名作为参数传递。然后它按我的预期工作。当我尝试按顺序执行相同的操作而不进行多处理时,它也可以正常工作。

为什么它在第一个版本中不起作用?我看不到问题。

另外:我可以保证两个进程不会尝试同时写入文件吗?

编辑:

谢谢。我知道了 这是工作版本:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()

问题答案:

您确实应该使用两个队列和三种单独的处理。

  1. 将内容放入队列#1。

  2. 从队列1中取出内容并进行计算,然后将内容放入队列2中。您可以有许多这样的应用程序,因为它们从一个队列中安全地放入另一个队列中。

  3. 从队列2中获取内容并将其写入文件。您必须恰好具有这些中的1个,并且不能再有其他。它“拥有”文件,保证原子访问,并绝对确保文件被整洁且一致地写入。



 类似资料:
  • 我是Spring批处理的新手,目前正在处理一个新的批处理作业。我的批处理作业: 读取器:读取数据库中的记录(即5家公司的员工) 处理器:处理记录 5个writer:过滤并写入每个文件(即A公司的员工在A路径写入A公司文件,B公司的员工在B路径写入B公司文件),最后一个writer将A公司的所有离职员工写入D公司。 任务:将文件保存到数据库中。 目前,我正在使用CompositeItemWriter

  • 我目前正在编写spring batch,在这里我读取一个XML数据,对其进行处理,然后将处理结果作为< code>map传递 生成用于编组和解组的JAXB xjc类。JAXB生成的类如下所示。 JAXB员工类 Spring XML配置文件 处理器和写入器: 问题: XML文件对于每个固定的提交间隔都是重写的,很明显,XML文件是在每个提交级别之后创建的。但我必须附加所有

  • 问题内容: 我正在使用opencsv,并希望通过多个会话写入文件。但是,每次我启动新的CSVWriter时,旧文件都会被删除。我可以更改CSVWriter的行为以在文件末尾写入而不是替换文件吗? 问题答案: FileWriter中有一个选项而不是CSVWriter可以附加在文件末尾。 此代码使其工作:

  • 我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目

  • 当通过java将列名写入CSV文件时,列中两个单词之间的空格将显示为双引号。。。 例如,测试数据[0][36]=“True Accept”;在csv文件中显示为True“Accept”。。 有没有办法解决这个问题? 代码如下: 编辑:这是解决方案: CSVWriter writer=new CSVWriter(new FileWriter(“list\u of_churers.csv”)、'、'、

  • 我有几个不同的Spring批处理作业需要写入同一个平面文件。在平面文件中写入数据的顺序并不重要。这些批处理作业可能同时运行。 FlatFileItemWriter是否合适?我担心的是,如果多个作业同时尝试写入平面文件,数据可能会混合在一起。