当前位置: 首页 > 知识库问答 >
问题:

从PubSub读写Google Cloud Storage的数据流作业耗尽时的数据丢失

岳正浩
2023-03-14

当将固定数量的字符串(用于测试的800,000个1KB)放入PubSub主题并在Dataflow中运行以下Apache Beam(2.1.0)作业时,只要按照预期保留了语义。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGsSimpleJob {

    public static void main(String[] args) {
        PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(PubSubToGsPipelineOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
                .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
        p.run();
    }

}
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

public interface PubSubToGsPipelineOptions extends PipelineOptions {
    @Description("PubSub subscription")
    String getInput();
    void setInput(String input);

    @Description("Google Cloud Storage output path")
    String getOutput();
    void setOutput(String output);
}

共有1个答案

常乐
2023-03-14

我的猜测是,在drain and replacement job用剩余窗口覆盖某个窗口之前,该窗口可能已经被部分写入。您可以在WriteFiles中检查已耗尽作业和替换作业中的工作日志。如果使用波束头,当最终目标被覆盖时,它也会记录下来。

从概念上讲,耗尽作业和替换作业是完全不同的管道。使用相同的输出位置与对其他两个不相关的作业使用相同的输出位置没有什么不同。您可以尝试的另一件事是为第二个作业使用不同的输出路径,并验证所有记录都存在于两个目录中。

 类似资料:
  • 我已成功地将数据保存在SQLite DB中。但我在从SQLite数据库读取数据时出现了一个错误,然后我的应用程序崩溃了。 错误信息 如果你需要任何其他信息,请告诉我。

  • 有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 例如,如果我有一个Java应用程序一直在运行,并且它在到达时从Kafka队列中读取数据,目的是获取数据并将其转发到数据库...无限循环通过一个批处理作业而不是通过Kafka Streaming/Kafka Connect来完成它有什么大的坏处吗?考虑到我没有在每个循环中无谓地创建或浪费资源,并且我正确地处理了多线程,有没有什么主要的缺点或不这样做的原因?对于长时间运行的应用程序,它是一个可行的选项

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?