当前位置: 首页 > 知识库问答 >
问题:

数据流如何自动扩展和分配工作负载?

秦昊穹
2023-03-14

读完这个问题后,我仍然对DataFlow/Apache Beam如何分配工作负载有一些疑问。我遇到的问题可以用下面的代码演示:

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
    @SuppressWarnings("serial")
    public static PipelineResult main(String[] args) throws IOException {

        /*******************************************
         * SETUP - Build options.
         ********************************************/

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        options.setRunner(DataflowRunner.class);
        options.setAutoscalingAlgorithm(
                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        // Autoscaling will scale between n/15 and n workers, so from 1-15 here
        options.setMaxNumWorkers(15);
        // Default of 250GB is absurdly high and we don't need that much on every worker
        options.setDiskSizeGb(32);
        // Manually configure scaling (i.e. 1 vs 5 for comparison)
        options.setNumWorkers(5);

        // Debug Pipeline
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply(PubsubIO.readStrings()
                    .fromSubscription("your subscription"))
            // this is the transform that I actually care about. In production code, this will
            // send a REST request to some 3rd party endpoint.
            .apply("sleep", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws InterruptedException {
                    Thread.sleep(500);
                    c.output(c.element());
                }
            }));

        return pipeline.run();
    }
}

比较使用1个worker和5个worker时的最大吞吐量,而不是后者的效率高5倍,它只是稍微高一点。这让我对以下问题产生了疑问:

  1. 假设每个工作线程使用4个vCPU,那么每个线程是否绑定到特定的DoFn,或者如果需要提高性能,可以在给定时刻对所有线程调用相同的DoFns?
  2. 假设有多个worker,每个worker是否会获得完整的管道,即每个转换的至少一个实例,包括源?
  3. dataflow/apache Beam如何确定要更频繁地调用哪个转换?是否会创建更多占用更多CPU资源的DoFn实例?更长的墙时间?还是每个转换都复制相同的时间?
  4. 根据Apache编程指南,后端等同于异步“作业”。这是否意味着每个DoFn实例都是异步处理的?
  5. 同样注意,在提供的示例代码中,如何异步处理“睡眠”转换?
  6. 在生产代码中,thread.sleep被替换为对第三方API的同步http请求。异步进程是否意味着它将把同步客户机转换为异步?

更新

还有一个额外的问题:数据流文档对PubSubIO有一个评论:

在极端情况下(例如,具有大发布批次的云发布/子订阅或具有非常高延迟的接收器),自动伸缩已知会变得粗粒度。

你能详细说明一下吗:

>

  • 大的出版批量意味着什么?即批量大还是批量多?

    高延迟接收器是否包括接收器之前转换中的高延迟?

  • 共有1个答案

    令狐灿
    2023-03-14
    1. DOFNs可以在给定时刻在所有线程上调用,(请看蓝星)
    2. 是的,每个工人将处理一个完整的管道
    3. 云数据流服务执行各种优化:融合和组合
    4. 那些可以的步骤,没有数据依赖项
    5. “Hibernate”可以在不同的工人上同时处理,并按顺序发送到队列中。
    6. 取决于数据依赖关系
     类似资料:
    • 不好意思,之前描述不准确,现更正下: 数据集如上,想要结果为:每个RQ对应的WERKS 列都有:2021 2022 2023 想自动扩展(新增)行 对应JE 新增出来的赋值0 比如: 2023-05-08 2021 0 2023-05-08 2022 106.57 2023-05-08 2023 0 请问下sql如何编写

    • 问题内容: 即使发布订阅队列不断增加(现在有100k未送达消息),我使用的流数据流job()也不会超过1个Worker-您有什么想法吗? 目前与和一起运行。 问题答案: 数据流工程师在这里。我在后端查看了该工作,发现它没有扩大规模,因为CPU利用率低,这意味着其他一些因素会限制管道的性能,例如外部限制。在这些情况下,升级很少有帮助。 我发现某些捆绑包可能要花费数小时才能处理。我建议调查您的管道逻辑

    • 我正在尝试扩展com.day.cq.dam.core.process.ThumbnailProcess类。 我使用的是Java JDK 1.8.0_65、Maven 3.0.5和Adobe AEM6.1 当我编译代码时,我会得到以下错误: [错误]无法在项目my.pack.common.process上执行目标org.apache.felix:maven-scr-plugin:1.20.0:scr

    • 在设计和开发扩展包时,我们总是希望扩展包在我们给予一定的输入时,完成特定的工作并返回结果。这个过程可以由以下几种工作模式来完成: 入口程序完成全部工作 如果我们的插件不需要任何用户输入,而且只要一次性的执行一些主进程逻辑,我们可以将所有工作放在 main.js 的 load 生命周期回调里: // main.js module.exports = { load () { let fs

    • 下面的章节展示了 Sublime Text 各种各样被扩展的附加功能。 指令 Sublime Text 中指令是无处不在的:按键绑定、菜单项、宏等都可以通过指令来工作。有些指令是在 Sublime Text 编辑器的核心实现的,但是他们当中的很多都是作为 Python 插件,每一个指令都可以由一个通过 Python 插 件来调用。 指令调度 通常情况,指令是被绑定到应用程序对象、一个窗口对象或是一

    • 我希望能够在我的应用程序中使用此颜色选择器: http://wpftoolkit.codeplex.com/wikipage?title=ColorPicker 我正在使用安装了. NET 4的Visual Studio 2010 Ultimate。我正在用C#和WPF(XAML)编码。 到目前为止我所做的: > 试图使用 谷歌搜索解决方案、教程或示例,但没有取得太大成功。 请解释扩展WPF工具包