当前位置: 首页 > 知识库问答 >
问题:

如何限制DoFn线程数用于流式作业、apache beam、dataflow后端、python

越飞语
2023-03-14

我有一个问题,在apache beam中使用流作业(dataflow backend,python SDK)有大量并行工作人员

共有1个答案

夹谷英奕
2023-03-14

如果您使用的是runner v2,请通过以下方式启用:

--experiments=use_runner_v2

您可以使用以下参数定义每个进程的线程数:

--number_of_worker_harness_threads
 类似资料:
  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 我正在尝试创建一个具有一定数量线程的ThreadPoolExector,但同时,我想控制池队列的大小。所以我使用完整的构造函数创建了执行器: 然而,这给了我一个非法辩论例外。如果我将构造函数更改为 它起作用了。如果我希望理想的线程数和最大线程数相同,为什么它不起作用呢。

  • 是否有办法限制netty用于客户端连接的线程数(netty是连接到远程服务器的客户端)。我使用的是1个NioEventLoopGroup,它被传递到每个引导程序中。每个引导都得到相同的通道初始值设定项引用(我曾经为每个引导创建不同的初始值设定项,但共享相同的引用似乎效果很好)。我从Java应用程序连接到许多充当服务器的硬件设备。 我注意到,目前最多使用16个线程(我在一台8核机器上,据我所知,Ne

  • 如果我将MyPipelineOptions更改为PipelineOptions,错误就消失了,但是如果我试图在函数中强制转换回MyPipelineOptions,我会得到一个ClassCastException,所以我猜这不是正确的方法...知道如何将自定义选项类传递给元素处理器吗? 下面是代码结构: 注意文档只显示了一个非自定义PipelineOptions的示例: PipelineOption

  • 作为DataFlow/Apache Beam的一部分,我希望先从一个源读取,然后再写入一个源,然后再从一个源读取,然后再按此顺序写入。我如何确保下面R->W->R->W的顺序?我相信下面的运行就像一个r->w的并行管道。我不确定是否使用PDone对象来实现这一点。 (在下面的示例中,考虑BIGQUERYVIEWB是一个由TestDataSet1.Table2和其他几个表组成的大查询视图)

  • 问题内容: 我在实践中阅读Java Concurrency,并且有点与线程限制概念混淆。这本书说 当一个对象被限制在一个线程中时,即使该限制对象本身不是一个线程,这种使用也是自动的线程安全的 那么,当一个对象被限制在一个线程中时,没有其他线程可以访问它吗?那就是局限于线程吗?如何将对象限制在线程中? 编辑: 但是,如果我仍然想与另一个线程共享对象怎么办?假设在线程A完成对象O后,线程B想要访问O。