当前位置: 首页 > 面试题库 >

How to parallelize a sum calculation in python numpy?

姚海
2023-03-14
问题内容

I have a sum that I’m trying to compute, and I’m having difficulty
parallelizing the code. The calculation I’m trying to parallelize is kind of
complex (it uses both numpy arrays and scipy sparse matrices). It spits out a
numpy array, and I want to sum the output arrays from about 1000 calculations.
Ideally, I would keep a running sum over all the iterations. However, I
haven’t been able to figure out how to do this.

So far, I’ve tried using joblib’s Parallel function and the pool.map function
with python’s multiprocessing package. For both of these, I use an inner
function that returns a numpy array. These functions return a list, which I
convert to a numpy array and then sum over.

However, after the joblib Parallel function completes all iterations, the main
program never continues running (it looks like the original job is in a
suspended state, using 0% CPU). When I use pool.map, I get memory errors after
all the iterations are complete.

Is there a way to simply parallelize a running sum of arrays?

Edit : The goal is to do something like the following, except in parallel.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr

问题答案:

I figured out how to do parallelize a sum of arrays with multiprocessing,
apply_async, and callbacks, so I’m posting this here for other people. I used
the example page for Parallel
Python for the Sum
callback class, although I did not actually use that package for
implementation. It gave me the idea of using callbacks, though. Here’s the
simplified code for what I ended up using, and it does what I wanted it to do.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

I was also able to get this working using a parallelized map, which was
suggested in another answer. I had tried this earlier, but I wasn’t
implementing it correctly. Both ways work, and I think this
answer explains the issue of which method
to use (map or apply.async) pretty well. For the map version, you don’t need
to define the class Sum and the summers function becomes

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr


 类似资料:

相关阅读

相关文章

相关问答