当前位置: 首页 > 面试题库 >

带有迭代器的多处理池

徐翔
2023-03-14
问题内容

我想将多处理池与迭代器一起使用,以便在将迭代器拆分为N个元素的线程中执行函数,直到迭代器完成为止。

import arcpy
from multiprocessing import Pool

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        #i_cursor is an iterator
        for row in rows:
            i_cursor.insertRow(row)

input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
    #s_cursor is an iterator
    for row in s_cursor:
        if (count < 100):
            input_rows.append(row)
            count += 1
        else:
            #send 100 rows to the insert function in a new thread
            pool.apply_async(insert, input_rows)
            #reset count and input_rows
            count = 1
            input_rows = [row]


pool.join()
pool.close()

我的问题是,此脚本是正确的方法吗?有没有更好的办法?

该脚本可能出了点问题,因为我在 pool.join()

Traceback (most recent call last):
  File "G:\Maxime\truncate_append_pool.py", line 50, in <module>
    pool.join()
  File "C:\App\Python27\ArcGIS10.3\lib\multiprocessing\pool.py", line 460, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

问题答案:

如果我不得不猜测代码的主要问题,那是因为将您的代码传递input_rows给了流程函数insert()-multiprocessing.Pool.apply_async()工作方式是解压缩传递给它的参数,因此您的insert()函数实际上是获取100参数,而不是列出一个参数的100元素。这会在过程功能甚至没有机会启动之前立即导致错误。如果更改pool.apply_async(insert, [input_rows])对它的调用,它可能会开始起作用…您也将违反迭代器的目的,您可能会将整个输入迭代器转换为一个列表,并馈给100to的切片multiprocessing.Pool.map()并使用它完成。

但是您问是否有一种“更好”的方法。尽管“更好”是一个相对类别,但在理想情况下,multiprocessing.Pool它提供了一种方便的imap()(和imap_unordered())方法来使用可迭代对象,并以懒惰的方式将其散布到选定的池中(因此在处理之前不会在整个迭代器上运行),因此您需要构建的只是迭代器切片,并将其传递给它进行处理,即:

import arcpy
import itertools
import multiprocessing

# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred 
def iterator_slice(iterator, length):
    iterator = iter(iterator)
    while True:
        res = tuple(itertools.islice(iterator, length))
        if not res:
            break
        yield res

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        for row in rows:
            i_cursor.insertRow(row)

if __name__ == "__main__":  # guard for multi-platform use
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
            pass  # do whatever you want with your result (return from your process function)
        pool.close()  # all done, close cleanly

(顺便说一句,对于所有s_cursor不是100的倍数的尺寸,您的代码都不会给出最后一个切片)

但是…如果它确实如广告所宣传的那样,那就太好了。尽管多年来已经修复了很多问题,但在生成自己的迭代器时imap_unordered()仍会抽取
大量 的迭代器 样本
(远远大于实际池进程的数量),因此,如果您对此感到担忧,则必须放弃并且弄脏自己,并且您处在正确的轨道上—这apply_async()是您想要控制如何喂食池的方法,您只需要确保正确
喂食 池即可:


if __name__ == "__main__":
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience
        queue = []  # a queue for our current worker async results, a deque would be faster
        while cursor_iterator or queue:  # while we have anything to do...
            try:
                # add our next slice to the pool:
                queue.append(pool.apply_async(insert, [next(cursor_iterator)])) 
            except (StopIteration, TypeError):  # no more data, clear out the slice iterator
                cursor_iterator = None
            # wait for a free worker or until all remaining finish
            while queue and (len(queue) >= pool._processes or not cursor_iterator):
                process = queue.pop(0)  # grab a process response from the top
                process.wait(0.1)  # let it breathe a little, 100ms should be enough
                if not process.ready():  # a sub-process has not finished execution
                    queue.append(process)  # add it back to the queue
                else:
                    # you can use process.get() to get the result if needed
                    pass
        pool.close()

现在,s_cursor仅当需要下一个100个结果时(无论insert()流程函数是否干净退出),才调用迭代器。

更新 -如果需要捕获的结果,则先前发布的代码最后在关闭队列中存在一个错误,此代码应能很好地完成工作。我们可以使用一些模拟功能轻松地对其进行测试:

import random
import time

# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
    for i in range(limit):
        if not i % 3:  # print every third generation to reduce verbosity
            print("Generated: {}".format(i))
        yield i

# our process function, just prints what's passed to it and waits for 1-6 seconds
def test_process(values):
    time_to_wait = 1 + random.random() * 5
    print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
    time.sleep(time_to_wait)
    print("Processed: {}".format(values))

现在我们可以像这样缠绕它们:

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers
    count = get_counter(30)  # get our counter iterator set to iterate from 0-29
    count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7
    queue = []  # a queue for our current worker async results, a deque would be faster
    while count_iterator or queue:
        try:
            # add our next slice to the pool:
            queue.append(pool.apply_async(test_process, [next(count_iterator)]))
        except (StopIteration, TypeError):  # no more data, clear out the slice iterator
            count_iterator = None
        # wait for a free worker or until all remaining workers finish
        while queue and (len(queue) >= pool._processes or not count_iterator):
            process = queue.pop(0)  # grab a process response from the top
            process.wait(0.1)  # let it breathe a little, 100ms should be enough
            if not process.ready():  # a sub-process has not finished execution
                queue.append(process)  # add it back to the queue
            else:
                # you can use process.get() to get the result if needed
                pass
    pool.close()

结果是(当然,系统之间会有所不同):

Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)

证明我们的生成器/迭代器仅在池中有可用插槽进行工作时才可收集数据,以确保最小的内存使用(和/或如果迭代器最终做到了这一点,则I /
O繁重)。您不会比这更简化。您可以获得的唯一的(尽管是微不足道的)加速是减少等待时间(但是您的主进程将消耗更多的资源)并增加允许的queue大小(以内存为代价),该大小被锁定到进程数在上面的代码中-
如果使用while queue and (len(queue) >= pool._processes + 3 or not count_iterator):它,它将加载3个以上的迭代器片,以确保在进程结束且池中的插槽释放时的延迟减少。



 类似资料:
  • 给定这样一个对象: 用法如下: 我想用更面向对象的东西来代替这个while循环,比如: 因此,我可以很容易地,例如,使匹配流,坚持使用流畅的API等。 问题是,我不知道也找不到更简洁的方法来创建这个流或迭代器。像上面这样的匿名类对我来说太冗长了。 我曾希望在jdk中找到像或这样的东西,但到目前为止还没有运气。毫无疑问,像apache commons或番石榴这样的库为此提供了一些东西,但让我们说我不

  • 假设我有一个类< code>Point和一个函数来处理< code>Point实例 现在我想从一个文件中读取<code>点 我现在该怎么办?我可以编写一个函数将文件读取到< code>points列表中,并调用< code>handlePoints函数。 不幸的是,这个函数看起来并不特别优雅,因为它在内存中创建了一个不必要的点列表。 使用迭代器不是更好吗? 有意义吗?在Java,这段代码不会太“吵

  • 问题内容: 更新: 在2006年python.org上提出了使内置字符串不可迭代的想法。我的问题有所不同,因为我试图偶尔仅抑制一次此功能。整个线程还是很相关的。 这是Guido的批判性评论,他们在试用的基础上实施了不可重复的操作: […]我实现了这一点(这确实很简单),但是后来发现我不得不修复大量遍历字符串的地方。例如: sre解析器和编译器使用set(“ 0123456789”)之类的东西,并且

  • 我当时正在读一本名为《现代Java在行动》的书,其中一部分代码我无法理解。 作者表示,代码不会终止。原因是无法在过滤器中知道数字是否继续增加,因此它会无限地继续过滤它们! 我不明白原因。有人能解释一下为什么吗。

  • 我有以下python生成器: 而且我想在Java中实现一个迭代器,它的行为有点像以前的生成器。我试图使用两个内部迭代器,但它不起作用。想法?