读完这个问题后,我仍然对DataFlow/Apache Beam如何分配工作负载有一些疑问。我遇到的问题可以用下面的代码演示:
package debug;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class DebugPipeline {
@SuppressWarnings("serial")
public static PipelineResult main(String[] args) throws IOException {
/*******************************************
* SETUP - Build options.
********************************************/
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setAutoscalingAlgorithm(
DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
// Autoscaling will scale between n/15 and n workers, so from 1-15 here
options.setMaxNumWorkers(15);
// Default of 250GB is absurdly high and we don't need that much on every worker
options.setDiskSizeGb(32);
// Manually configure scaling (i.e. 1 vs 5 for comparison)
options.setNumWorkers(5);
// Debug Pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings()
.fromSubscription("your subscription"))
// this is the transform that I actually care about. In production code, this will
// send a REST request to some 3rd party endpoint.
.apply("sleep", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
Thread.sleep(500);
c.output(c.element());
}
}));
return pipeline.run();
}
}
比较使用1个worker和5个worker时的最大吞吐量,而不是后者的效率高5倍,它只是稍微高一点。这让我对以下问题产生了疑问:
异步“作业”
。这是否意味着每个DoFn实例都是异步处理的?thread.sleep
被替换为对第三方API的同步http请求。异步进程是否意味着它将把同步客户机转换为异步?更新
还有一个额外的问题:数据流文档对PubSubIO有一个评论:
在极端情况下(例如,具有大发布批次的云发布/子订阅或具有非常高延迟的接收器),自动伸缩已知会变得粗粒度。
你能详细说明一下吗:
>
大的出版批量意味着什么?即批量大还是批量多?
高延迟接收器是否包括接收器之前转换中的高延迟?
不好意思,之前描述不准确,现更正下: 数据集如上,想要结果为:每个RQ对应的WERKS 列都有:2021 2022 2023 想自动扩展(新增)行 对应JE 新增出来的赋值0 比如: 2023-05-08 2021 0 2023-05-08 2022 106.57 2023-05-08 2023 0 请问下sql如何编写
问题内容: 即使发布订阅队列不断增加(现在有100k未送达消息),我使用的流数据流job()也不会超过1个Worker-您有什么想法吗? 目前与和一起运行。 问题答案: 数据流工程师在这里。我在后端查看了该工作,发现它没有扩大规模,因为CPU利用率低,这意味着其他一些因素会限制管道的性能,例如外部限制。在这些情况下,升级很少有帮助。 我发现某些捆绑包可能要花费数小时才能处理。我建议调查您的管道逻辑
我正在尝试扩展com.day.cq.dam.core.process.ThumbnailProcess类。 我使用的是Java JDK 1.8.0_65、Maven 3.0.5和Adobe AEM6.1 当我编译代码时,我会得到以下错误: [错误]无法在项目my.pack.common.process上执行目标org.apache.felix:maven-scr-plugin:1.20.0:scr
在设计和开发扩展包时,我们总是希望扩展包在我们给予一定的输入时,完成特定的工作并返回结果。这个过程可以由以下几种工作模式来完成: 入口程序完成全部工作 如果我们的插件不需要任何用户输入,而且只要一次性的执行一些主进程逻辑,我们可以将所有工作放在 main.js 的 load 生命周期回调里: // main.js module.exports = { load () { let fs
下面的章节展示了 Sublime Text 各种各样被扩展的附加功能。 指令 Sublime Text 中指令是无处不在的:按键绑定、菜单项、宏等都可以通过指令来工作。有些指令是在 Sublime Text 编辑器的核心实现的,但是他们当中的很多都是作为 Python 插件,每一个指令都可以由一个通过 Python 插 件来调用。 指令调度 通常情况,指令是被绑定到应用程序对象、一个窗口对象或是一
我希望能够在我的应用程序中使用此颜色选择器: http://wpftoolkit.codeplex.com/wikipage?title=ColorPicker 我正在使用安装了. NET 4的Visual Studio 2010 Ultimate。我正在用C#和WPF(XAML)编码。 到目前为止我所做的: > 试图使用 谷歌搜索解决方案、教程或示例,但没有取得太大成功。 请解释扩展WPF工具包