当前位置: 首页 > 面试题库 >

Python多重处理-跟踪pool.map操作的过程

施令秋
2023-03-14
问题内容

我有一个执行一些模拟并以字符串格式返回数组的函数。

我想运行模拟(函数)以更改输入参数值(超过10000个可能的输入值),并将结果写入单个文件。

我正在使用多重处理,特别是pool.map函数来并行运行模拟。

由于运行仿真功能超过10000次的整个过程需要很长时间,因此我真的很想跟踪整个操作的过程。

我认为下面我当前代码中的问题是,pool.map运行该函数10000次,在这些操作过程中没有任何进程跟踪。一旦并行处理完成了10000个模拟的运行(可能是数小时到数天),那么我将继续跟踪10000个模拟结果何时保存到文件中。因此,这实际上并不是在跟踪pool.map操作的处理。

我的代码是否有简单的修复程序,可以进行流程跟踪?

def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')

问题答案:

如果使用迭代map函数,则跟踪进度非常容易。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> def simFunction(x,y):
...   import time
...   time.sleep(2)
...   return x**2 + y
... 
>>> x,y = range(100),range(-100,100,2)
>>> res = Pool().imap(simFunction, x,y)
>>> with open('results.txt', 'w') as out:
...   for i in x:
...     out.write("%s\n" % res.next())
...     if i%10 is 0:
...       print "%s of %s simulated" % (i, len(x))
... 
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated

或者,您可以使用异步map。在这里,我将做些不同的事情,只是将其混合在一起。

>>> import time
>>> res = Pool().amap(simFunction, x,y)
>>> while not res.ready():
...   print "waiting..."
...   time.sleep(5)
... 
waiting...
waiting...
waiting...
waiting...
>>> res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]

请注意,我使用pathos.multiprocessing而不是multiprocessing。这仅仅是一个分支multiprocessing,使您能够map使用多个输入来执行函数,具有更好的序列化,并允许您在map任何地方(不仅是in
__main__)执行html" target="_blank">调用。您也可以使用multiprocessing上面的方法,但是代码会稍有不同。

迭代的或异步的map都可以使您编写任何代码,以进行更好的过程跟踪。例如,将唯一的“
id”传递给每个作业,并观察返回的作业,或者让每个作业返回其进程ID。有很多跟踪进度和过程的方法……但是以上内容应该为您提供一个开始。

你可以在pathos这里找到:https :
//github.com/uqfoundation



 类似资料:
  • 问题内容: 我正在Ubuntu 14.04上使用Python 3.4进行开发。我试图做递归。在我调用之后,它挂在那里并且永远不会返回。 问题答案: 这是不可能的。所述对象本身不能安全进程之间被共享,所以相同的池不能在两者中使用和。即使您 可以 执行此操作,也很快会导致挂起,因为您的池仅限于并发工作程序。一旦开始递归地创建更多的工作人员,您最终将获得比工作人员更多的工作量,这将永远无法完成。正在运行

  • 问题内容: 我正在尝试使用的功能同时划分工作。当我使用以下代码时,它可以正常工作: 但是,当我以更加面向对象的方式使用它时,它将无法正常工作。它给出的错误信息是: 这是我的课: 任何人都知道问题可能是什么,或解决问题的简单方法? 问题答案: 问题在于,多处理必须使进程中的东西腌制,而绑定的方法却不能腌制。解决方法(无论你是否认为它“容易” 是向你的程序中添加基础结构,以允许对这些方法进行腌制,并使

  • 问题内容: 我想知道是否有人可以指向我简单地等效于java中python的多处理模块。 我有一个简单的并行处理场景(其中没有2个进程交互):取一个数据集并将其分成12个,然后将Java方法应用于12个数据集,收集结果并将它们按相同的顺序加入到某种列表中。 作为“专业”语言的Java似乎具有多个库和方法-谁能帮助这个Java新手入门? 我想用最少的编码做到这一点-正如我说的那样,我的要求非常简单。

  • 目前,我们使用三个嵌套的Foreach循环来获取运行批处理的信息。然而,我相当确定我们可以通过一个带有连接和子查询的MySQL语句来获取信息。 我们有大约30个类别,2000个用户。我们的目标是大约100个类别,拥有10万用户,尽管很明显Foreach循环并不理想(即使现在运行它们也需要大约一分钟)。 情况:用户希望得到通知,如果有工作可用于他们可以在某一领域做的贸易 目标:将批处理(每日、每周等

  • 关键点: 关键点:

  • 如果用户把浏览器的Cookie功能关闭,或者浏览器不支持Cookie功能,那么SessionId就不能通过Cookie向服务端发送了。Servlet规范为了弥补这个不足,允许通过URL请求参数来发送SessionId。这样当浏览器的Cookie功能关闭时,在浏览器中仍然可以通过由URL请求参数发送的SessionId在服务端找到相应的HttpSession对象。 在下面的例子演示了如何通过URL的