当前位置: 首页 > 知识库问答 >
问题:

Apache波束在数据流上的部署

何建中
2023-03-14

嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。

 SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in 
    [jar:file:/C:/Users/ThakurG/.m2/repository/org/slf4j/slf4j-
    jdk14/1.7.14/slf4j-jdk14-1.7.14.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/C:/Users/ThakurG/.m2/repository/org/slf4j/slf4j-nop/1.7.25/slf4j-nop-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
    Jan 08, 2018 5:33:22 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ main
    INFO: starting the process...
    Jan 08, 2018 5:33:25 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ 
   createStream
    INFO: pipeline created::Pipeline#73387971
    Jan 08, 2018 5:33:27 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ main
    INFO: pie crated::Pipeline#73387971
    Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply
    INFO: Message received::1884408,16/09/2017,A,2007156,CLARK RUBBER FRANCHISING PTY LTD,A ,5075,6,Y,296,40467910,-34.868095,138.683535,66 SILKES RD,,,PARADISE,5075,0,7.4,5.6,18/09/2017 2:09,0.22
    Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply
    INFO: Payload from msg::1884408,16/09/2017,A,2007156,CLARK RUBBER FRANCHISING PTY LTD,A ,5075,6,Y,296,40467910,-34.868095,138.683535,66 SILKES RD,,,PARADISE,5075,0,7.4,5.6,18/09/2017 2:09,0.22
    Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply

这是我在cmd提示符中使用的maven命令,

`mvn compile exec:java -Dexec.mainClass=com.trial.apps.gcp.df.ReceiveAndPersistToBQ -Dexec.args="--project=analyticspoc-XXX --stagingLocation=gs://analytics_poc_staging --runner=DataflowRunner --streaming=true"`

这是我用来创建管道和设置选项的代码段。

java prettyprint-override">PipelineOptions options = PipelineOptionsFactory.create();

DataflowPipelineOptions dfOptions = options.as(DataflowPipelineOptions.class);
dfOptions.setRunner(DataflowRunner.class);
dfOptions.setJobName("gcpgteclipse");
dfOptions.setStreaming(true);

// Then create the pipeline.
Pipeline pipeL = Pipeline.create(dfOptions);

共有1个答案

顾靖
2023-03-14

你能澄清一下“控制台显示它正在运行”和“无法使用Dataflow UI找到作业”到底是什么意思吗?

如果程序的输出打印消息

To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/.../dataflow/job/....

那么您的作业就在Dataflow服务上运行。一旦运行,关闭主程序并不会停止作业--主程序所做的只是周期性地轮询Dataflow服务以获取作业的状态和新的日志消息。按照打印的链接应该会把你带到Dataflow UI。

如果没有打印此消息,那么可能您的程序在实际启动数据流作业之前在某个地方卡住了。如果包含程序的输出,这将有助于调试。

 类似资料:
  • 这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr

  • 我对GCP、Dataflow、Apache Beam、Python和一般的OOP都是新手。我来自函数式javascript领域,对于上下文。 现在,我已经用Apache Beam python sdk构建了一个流管道,并将其部署到GCP的数据流中。管道的源是pubsub订阅,接收器是数据存储。 管道从pubsub订阅中获取消息,根据配置对象+消息内容做出决定,然后根据做出的决定将其放在数据存储中的

  • 我们有一个波束/数据流管道(使用数据流SDK 2.0.0-beta3 但是,我们正在设置 参数,我们可以看到所有二进制文件/jar 等都已上传到我们在 参数中指定的存储桶。 但是,Beam/Dataflow 随后会在我们项目的 GCS 中创建以下僵尸存储桶: 为什么会发生这种情况,如果我们清楚地设置参数?

  • 我们在实验中发现,在DataFlow/Apache Beam管道中设置显式的输出碎片#会导致更差的性能。我们的证据表明,Dataflow在最后秘密地做了另一个GroupBy。我们已经转向让Dataflow自动选择碎片数(shards=0)。但是,对于某些管道,这会导致大量相对较小的输出文件(~15K文件,每个<1MB)。

  • null 我注意到这太慢了--CPU资源只被利用了几%。我怀疑每个节点都得到了一个zip文件,但是工作不是在本地CPU之间分配的--所以每个节点只有一个CPU在工作。我不明白为什么会这样,因为我使用了平面地图。

  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢