我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn?
我的beam版本是2.16.0
class Calculate extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
try {
c.output(i + "_" + InetAddress.getLocalHost().getHostName() + "_" + Math.random() + "_" + c.element());
for (int j = 0; j < 10000000; j++ ) {
Math.sqrt(j);
}
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@GetInitialRestriction
public OffsetRange getInitialRange(String element) {
return new OffsetRange(1L, 50000000L);
}
@SplitRestriction
public void splitRestriction(String element, OffsetRange restriction, OutputReceiver<OffsetRange> receiver) {
for (long i = restriction.getFrom(); i < restriction.getTo(); i += 100000) {
receiver.output(new OffsetRange(i, i + 100000));
}
}
}
public static void main(String[] args) {
//PipelineOptions options = PipelineOptionsFactory.create();
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://mybucket/EMEA/20200525/singleposition.csv"))
.apply(ParDo.of(new Calculate()))
.apply(TextIO.write().to("gs://mybucket/EMEA/20200525/obligor.csv"));
p.run().waitUntilFinish();
}
我在你的理解中发现了两个主要的差距。
云数据流提供2种缩放算法。
b)None(--autoscaling_algorithm=None):在此中,您可以手动提供希望为您的作业生成的员工数量。您可以使用--num_worker参数指定员工数量。
在您的用例中,您使用的是默认模式。因此,您只能得到2个工人取决于您的负荷。
请阅读此链接以获得更详细的信息。
-读取kafka主题的所有分区列表,并从每个分区读取数据(有界源)
上面的用例在使用ParDo(阅读旧ParDo)实现时非常棘手,因为
ParDo fucknitions是单块的--单个processElement()调用,只能输出有限数量的元素,但不能提供进度或大小调整信息,不能接受并行化提示、拆分或检查点。
现在,在您的场景中,我看不到任何非单片处理的需求。在这里,使用SDF不会获得任何额外的性能好处。
我的目标是使用Kafka作为源设置一个高吞吐量集群 我在主服务器和辅助服务器上设置了一个2节点集群,配置如下。 flink-conf.yaml大师 Worker flink-conf.yaml 主节点上的文件如下所示: 两个节点上的 flink 设置位于具有相同名称的文件夹中。我通过运行 这将启动Worker节点上的任务管理器。 我的输入源是Kafka。以下是片段。 这是我的水槽功能 这是我的po
在正常情况下,一个工作流是否会由多个工作流工作人员同时执行?因为多个工作流工作者可以投票决定任务来执行,如果没有,他该怎么做?
到目前为止,我们还没有在我们项目上使用过分支。然而你并不知道,我们实际已经工作在了一个分支上了。这是因为在 Git 上的分支功能并不是可选的,你永远会工作在一个分支中的(当前的 “active”,或者 “checked out”,或者 “HEAD”分支)。 那么 Git 是如何知道你当前在哪个分支上工作的呢? “git status” 命令输出的第一行会向我们显示出 “在主分支(branch ma
Spring CSRF不适用于带有文件上传的多部分表单。它适用于其他请求。我的web.xml如下所示- 我已经在appcontext servlet中定义了这个bean。xml- 我还定义了csrfFilter。定义了filterMultipartResolver bean后,我被重定向到默认的404页面。如果没有bean,我会得到一个错误页面,说我发送了一个空令牌。当我在chrome devel
问题内容: 即使发布订阅队列不断增加(现在有100k未送达消息),我使用的流数据流job()也不会超过1个Worker-您有什么想法吗? 目前与和一起运行。 问题答案: 数据流工程师在这里。我在后端查看了该工作,发现它没有扩大规模,因为CPU利用率低,这意味着其他一些因素会限制管道的性能,例如外部限制。在这些情况下,升级很少有帮助。 我发现某些捆绑包可能要花费数小时才能处理。我建议调查您的管道逻辑
对于我的拓扑--我在一个任务上的负载为90%,而第二个任务的负载为0%。 为什么对多个工人来说这是不同的。