当前位置: 首页 > 知识库问答 >
问题:

什么可以限制数据流管道只能使用单个工作人员?

夹谷英奕
2023-03-14

我一直试图调试一个管道,它接受一个驱动后续ParDo操作的输入参数。由于我无法理解的原因,即使我禁用了自动伸缩并设置了工人数量,管道也不会伸缩到单个工人之外。可悲的是,GCP上糟糕的数据流接口几乎没有说明无法伸缩。有谁能告诉你可能是什么问题或者如何有效地调试?

with beam.Pipeline(options=opts) as p:
  result = (
      p | "Initialize Pipeline" >> beam.Create(
          [(f'gs://data/']) |
      "Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
      "Extract data" >> beam.ParDo(worker.TaskToData()))

共有1个答案

程峻
2023-03-14

这个问题被证明与数据流中一种名为“融合”的优化有关,在这种优化中,相邻的操作被融合在一起,大概是为了让它们在同一个工作人员上无缝地运行。问题是,如果一个流水线由一个生成大量下游任务的单个项播种,那么所有这些任务都将在处理初始播种任务的同一个worker上处理。

解决方案是直接在管道中添加任务,以防止这种“优化”降低性能

def scan_for_tasks():
  tasks = []
  # Build your task list here
  return tasks

with beam.Pipeline(options=opts) as p:
  result = (
    p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
    "Extract data" >> beam.ParDo(worker.TaskToData()))
 类似资料:
  • 问题内容: 我想在一个定义管道构建作业的框架中利用Jenkins 的现有Mailer插件。给定以下简单的失败脚本,我希望每个构建版本都会收到一封电子邮件。 构建的输出为: 如您所见,它确实记录了它在失败后立即执行管道的过程,但是没有生成电子邮件。 利用自由工作的其他自由式工作中的电子邮件,只是通过管道工作来调用。 这与Jenkins 2.2和mailer 1.17一起运行。 是否有其他机制可以用来

  • 问题内容: 通常,建议将RSA用于加密对称密钥,然后再将其用于加密“有效负载”。 可以使用RSA加密的数据量的实际(或理论)限制是多少(我使用的是2048位RSA密钥大小)。 特别是,我想知道使用(不同的)RSA公钥加密RSA公钥(256字节)是否安全?我在Java中使用Bouncy Castle加密库。 问题答案: 对于 n 位RSA密钥,直接加密(使用PKCS#1 “旧式”填充)适用于最大 下

  • 问题内容: 如果我做 是一样的吗 ? 我想要我所有的东西,就像在8023端口上监听一样。 问题答案: 不,您需要将nohup分别添加到命令中。 建议这样的事情: 或者:

  • 在以下Java代码中 我在控制台中得到以下输出 在使用next()或nextFoo()后,从扫描仪查看答案时跳过nextLine()?,我们必须添加Cmd8的原因是,nextInt()一直读取输入,直到将输入发送到程序,并将输入放回输入流的前面。现在,当“Cmd8”中的nextLine()开始读取它时,它会在输入流的开始处发现前面没有任何字符串,它假定用户没有输入任何内容,因此将空字符串作为输入并

  • 我有两个Jenkins工作流作业,它们以不同的参数启动同一个作业,即它们构建的分支。后一项工作是在多个平台上构建项目。“头”作业,即Workflow作业可以在不同的机器上启动。此外,设置中还有两台linux机器。 有时会发生这样的情况,其中一个(比如master)在其中一台linux机器上启动,另一个在另一台上启动。他们都必须在linux机器上构建一个目标,因为他们都很忙,所以两个作业都停滞不前。

  • 问题内容: 有什么办法可以限制芹菜的工人数量?我的服务器很小,芹菜总是在1个核心处理器上创建10个进程。我想将此数目限制为3个进程。 问题答案: 我尝试在settings.py文件中将并发设置为1,将max_tasks_per_child设置为1,并同时运行3个任务。它只是以用户的身份生成1个进程,而以芹菜的形式生成其他2个进程。它应该只运行1个进程,然后等待其完成再运行另一个进程。 我正在使用d