当前位置: 首页 > 知识库问答 >
问题:

Apache分束DoFn不会在数据流上将工作分散到多个工作人员

鞠凌龙
2023-03-14

我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn?

我的beam版本是2.16.0

    class Calculate extends DoFn<String, String> {

    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
            try {
                c.output(i + "_" + InetAddress.getLocalHost().getHostName() + "_" + Math.random() + "_" +  c.element());
                for (int j = 0; j < 10000000; j++ ) {
                    Math.sqrt(j);
                }
            } catch (UnknownHostException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) {
        return new OffsetRange(1L, 50000000L);
    }
    @SplitRestriction
    public void splitRestriction(String element, OffsetRange restriction, OutputReceiver<OffsetRange> receiver) {

        for (long i = restriction.getFrom(); i < restriction.getTo(); i += 100000) {
            receiver.output(new OffsetRange(i, i + 100000));
        }

    }
   }
public static void main(String[] args) {

    //PipelineOptions options = PipelineOptionsFactory.create();
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://mybucket/EMEA/20200525/singleposition.csv"))
     .apply(ParDo.of(new Calculate()))
     .apply(TextIO.write().to("gs://mybucket/EMEA/20200525/obligor.csv"));

    p.run().waitUntilFinish();
  }

共有1个答案

乐正宏深
2023-03-14

我在你的理解中发现了两个主要的差距。

  1. 如何控制云数据流中的工人数量(例如,您希望为您的Calculate()DoFn()生成500个工人)?

数据流提供2种缩放算法。

b)None(--autoscaling_algorithm=None):在此中,您可以手动提供希望为您的作业生成的员工数量。您可以使用--num_worker参数指定员工数量。

在您的用例中,您使用的是默认模式。因此,您只能得到2个工人取决于您的负荷。

请阅读此链接以获得更详细的信息。

-读取kafka主题的所有分区列表,并从每个分区读取数据(有界源)

上面的用例在使用ParDo(阅读旧ParDo)实现时非常棘手,因为

ParDo fucknitions是单块的--单个processElement()调用,只能输出有限数量的元素,但不能提供进度或大小调整信息,不能接受并行化提示、拆分或检查点。

现在,在您的场景中,我看不到任何非单片处理的需求。在这里,使用SDF不会获得任何额外的性能好处。

 类似资料:
  • 我的目标是使用Kafka作为源设置一个高吞吐量集群 我在主服务器和辅助服务器上设置了一个2节点集群,配置如下。 flink-conf.yaml大师 Worker flink-conf.yaml 主节点上的文件如下所示: 两个节点上的 flink 设置位于具有相同名称的文件夹中。我通过运行 这将启动Worker节点上的任务管理器。 我的输入源是Kafka。以下是片段。 这是我的水槽功能 这是我的po

  • 在正常情况下,一个工作流是否会由多个工作流工作人员同时执行?因为多个工作流工作者可以投票决定任务来执行,如果没有,他该怎么做?

  • 到目前为止,我们还没有在我们项目上使用过分支。然而你并不知道,我们实际已经工作在了一个分支上了。这是因为在 Git 上的分支功能并不是可选的,你永远会工作在一个分支中的(当前的 “active”,或者 “checked out”,或者 “HEAD”分支)。 那么 Git 是如何知道你当前在哪个分支上工作的呢? “git status” 命令输出的第一行会向我们显示出 “在主分支(branch ma

  • Spring CSRF不适用于带有文件上传的多部分表单。它适用于其他请求。我的web.xml如下所示- 我已经在appcontext servlet中定义了这个bean。xml- 我还定义了csrfFilter。定义了filterMultipartResolver bean后,我被重定向到默认的404页面。如果没有bean,我会得到一个错误页面,说我发送了一个空令牌。当我在chrome devel

  • 问题内容: 即使发布订阅队列不断增加(现在有100k未送达消息),我使用的流数据流job()也不会超过1个Worker-您有什么想法吗? 目前与和一起运行。 问题答案: 数据流工程师在这里。我在后端查看了该工作,发现它没有扩大规模,因为CPU利用率低,这意味着其他一些因素会限制管道的性能,例如外部限制。在这些情况下,升级很少有帮助。 我发现某些捆绑包可能要花费数小时才能处理。我建议调查您的管道逻辑

  • 对于我的拓扑--我在一个任务上的负载为90%,而第二个任务的负载为0%。 为什么对多个工人来说这是不同的。