当前位置: 首页 > 面试题库 >

给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器?

西门梓
2023-03-14
问题内容

假设我有N个生成器gen_1, ..., gen_N,其中每个生成器将产生相同数量的值。我想要一个生成器gen,使其在N个并行进程中运行gen_1,…,gen_N(next(gen_1), next(gen_2), ... next(gen_N))

那就是我想要的:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

每个gen_i都在自己的进程上运行。是否有可能做到这一点?我在下面的虚拟示例中尝试执行此操作,但未成功:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

但是我得到了错误TypeError: cannot pickle 'generator' object

编辑:

我已经修改了@darkonaut答案,以适应我的需要。如果有人觉得有用,我会发布它。我们首先定义几个实用程序函数:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

下列类负责将任意数量的生成器拆分为n个(进程数)批处理,并对其进行处理以产生所需的结果:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

要使用它,只需执行以下操作:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

问题答案:

可以花些力气来获得这样的“ 统一并行生成器(UPG)
”(尝试命名),但是正如@jasonharper所述,您肯定需要在子进程内组装子生成器,因为运行发电机不能腌制。

下面的模式是可重用的,只有生成器函数gen()是针对此示例定制的。该设计multiprocessing.SimpleQueue用于将生成器结果返回给父级并
multiprocessing.Barrier进行同步。

调用Barrier.wait()将阻塞调用者(任何进程中的线程),直到指定的数目parties已调用.wait(),随后所有当前等待Barrier获取的线程同时释放。Barrier此处的用法可确保仅在父级从迭代接收到
所有 结果 之后 才开始计算进一步的生成器结果,这可能是控制总体内存消耗的理想方法。 __

使用的并行工作程序数量等于您在gen_args_tuples-iterable中提供的参数元组的数量,因此gen_args_tuples=zip(range(4))将使用四个工作程序。有关更多详细信息,请参见代码中的注释。

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

输出:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0


 类似资料:
  • 问题内容: 假设我有一台Koa Web服务器,其端点如下所示: 现在,在所有动作都明显执行之后,客户端将获得响应。但事情是每个动作都取决于前一个动作的完成。 有没有办法并行执行它们? 注意:除非我能以某种方式返回结果而不是resolve(),否则将它们变为Promises是不可行的。 问题答案: 将生成器功能转换为Promises,并异步执行它们。等待他们全部完成:

  • 问题内容: 作为一个完整的Python新手,它肯定是这样。运行以下… …我注意到:(a)确实有一种方法,这似乎是生成器所必需的,并且(b)只能迭代一次,这是著名标记答案中所强调的生成器的特征。 另一方面, 关于如何确定对象是否为生成器的这个问题,两个最受好评的答案似乎表明它 没有 返回生成器。 …虽然对该问题的第三次不良评价似乎表明实际上 确实 返回了一个生成器: 发生什么了?是发电机吗?从某种意

  • 块状应用程序通常生成JavaScript作为其输出语言,通常在网页(可能是同一网页或嵌入式WebView)中运行。 像任何生成器一样,第一步是导入javascript生成器。 对于Web Blockly,请在blockly_compressed.js之后添加javascript_compressed.js: <script src="blockly_compressed.js"></script>

  • 我正在编写一个通用组件,该组件封装了一个生成器并执行常规操作: 按键筛选 转换值 等等 为了尽可能接近地模拟包装生成器,如果生成器使用引用,我想使用引用。 当我尝试使用 构造的情况下迭代生成器对引用完全不起作用:

  • 我正在编写一个通用组件,该组件封装了一个生成器并执行常规操作: 按键筛选 转换值 等等 为了尽可能接近地模拟包装生成器,如果生成器使用引用,我想使用引用。 当我尝试使用 构造的情况下迭代生成器对引用完全不起作用:

  • 我最近为Java开发人员安装了日食IDE 版: 2019-03 (4.11.0) 内部版本号: 20190314-1200 创建项目时,我收到以下错误: '构建工作区'遇到问题。构建过程中发生错误。 生成期间发生错误。在项目“我的第一个项目”上运行生成器“Java生成器”时出错。java.lang.NullPointerException 请帮帮忙

  • 问题内容: 我有大量的数据(几场演出),我需要用Python编写一个zip文件。我无法一次全部将其加载到内存中以传递给ZipFile的.writestr方法,我真的不想使用临时文件将其全部馈入磁盘,然后再读回。 有没有办法将生成器或类似文件的对象提供给ZipFile库?还是由于某种原因似乎不支持此功能? 压缩文件是指压缩文件。如Python zipfile包中所支持。 问题答案: 唯一的解决方案是

  • Rust 的Future是懒惰的:除非是向着'完成'这一个目标积极前进,否则他们不会做任何事情。向 Future 完成前进的一种方法是,在async函数里面,对它.await,但这只会将问题升了个级:谁来管理,从顶层 async函数返回的 Futures ?答案是:我们需要一个Future执行者(executor)。 Future executor 获取一组顶层Future,并每当Future可以