当前位置: 首页 > 知识库问答 >
问题:

Apache Beam/Google Dataflow PubSub到BigQuery管道:处理插入错误和意外重试行为

司徒鸿文
2023-03-14

我已经从Google的github存储库中下载了pub/sub到BigQuery数据流模板的副本。我正在本地机器上使用直接运行程序运行它。

在测试中,我确认只有在UDF处理或从JSON到TableRow的转换过程中发生错误时,模板才会将失败写入“deadletter”表。

我还希望通过将它们发送到单独的TupleTag中来更优雅地处理在插入BigQuery时发生的失败,这样它们也可以发送到deadleter表或其他输出以进行检查和处理。当前,当使用dataflow-runner执行时,这些错误只会写入Stackdriver日志,并且会无限期地重试,直到问题解决为止。

问题一:在本地测试并发布一条格式与目标表模式不匹配的消息时,insert被重试5次,然后管道崩溃,出现RuntimeException以及从Google API的HTTP响应返回的错误。我认为BigQueryServices.Impl中设置了此行为:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

然而,根据谷歌的文档,

“在流模式下运行时,包含失败项的包将被无限期重试,这可能导致您的管道永久停顿。”

作为Beam Pub/Io,

创建和使用无界的PCollections

我的印象是,当从pub/sub读取时,应该默认启用流模式。我甚至在对writeTableRows()的调用中添加了Streaming_Inserts方法,但它并没有影响这种行为。

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. 这种行为是不是受到了我使用的跑步者的某种影响?如果不是,我的理解有什么缺陷?

问题二:

我这样问是因为我不知道如何捕获与插入相关的错误,而不创建我自己的静态类,这个静态类重写了expand方法,并使用ParDo和DoFn,我可以在其中添加我自己的自定义逻辑,为成功记录和失败记录创建单独的tupleTag,类似于在JavascriptTextTransformer中为FailSafeJavaScriptUDF所做的操作。

更新:

public static PipelineResult run(DirectOptions options) {

options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<PubsubMessage, String> coder =
        FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

     PCollectionTuple transformOut =
        pipeline
             //Step #1: Read messages in from Pub/Sub
            .apply(
                "ReadPubsubMessages",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))

             //Step #2: Transform the PubsubMessages into TableRows
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult = null;

    try {
      writeResult = 
            transformOut
        .get(TRANSFORM_OUT)
        .apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .to("myproject:MyDataSet.MyTable"));
    } catch (Exception e) {
        System.out.print("Cause of the Standard Insert Failure is: ");
        System.out.print(e.getCause());
    }

    try {
        writeResult
            .getFailedInserts()
            .apply(
                    "WriteFailedInsertsToDeadLetter",
                    BigQueryIO.writeTableRows()
                        .to(options.getOutputDeadletterTable())
                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    } catch (Exception e) {
        System.out.print("Cause of the Error Insert Failure is: ");
        System.out.print(e.getCause());
    }

     PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
        .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

错误:

Cause of the Error Insert Failure is: null[WARNING] 
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
    at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

共有1个答案

施永宁
2023-03-14

在Beam的最新版本中,BigQueryIO.Write转换返回WriteResult对象,该对象使您能够检索未能输出到BigQuery的TableRows的PCollection。使用这一点,您可以轻松地检索失败,将它们格式化为死信输出的结构,并将记录重新提交给BigQuery。这样就不需要单独的类来管理成功和失败的记录。

下面是一个例子,说明你的管道可能是什么样子的。

// Attempt to write the table rows to the output table.
WriteResult writeResult =
    pipeline.apply(
        "WriteRecordsToBigQuery",
        BigQueryIO.writeTableRows()
            .to(options.getOutputTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

/*
 * 1) Get the failed inserts
 * 2) Transform to the deadletter table format.
 * 3) Output to the deadletter table.
*/
writeResult
  .getFailedInserts()
    .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
    .apply(
        "WriteFailedInsertsToDeadletter",
        BigQueryIO.writeTableRows()
            .to(options.getDeadletterTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));

此外,要回答您的问题:

  1. 根据beam文档,必须将DirectRunner的Streaming选项设置为true
  2. 不应存在性能差异。无论哪种情况,都需要将输入记录转换为TableRow对象。如果您事先在ParDo中或在使用BigQueryIO.Write.WithFormatFunction的可序列化函数中这样做,则不会有任何区别。
 类似资料:
  • 大家好,我们使用的是Spring kafka 1.3.3,我们的应用程序是消耗-进程-发布管道。 如果在生产阶段流水线出现任何故障,我们如何处理重试并寻求返回。例如:应用程序正在消耗消息,处理它们并以异步方式发布到另一个主题中。但如果在发布中有任何错误

  • 问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请

  • 我有一个索引php页面,其中包含一个打印数组值的代码

  • 我正在制作一个专用于电子邮件订阅者的组件...喜欢这个 然而,我只得到这样的回应 你知道我如何解决这个问题吗?我让它变得像人们输入他们的电子邮件一样简单,只需单击按钮,就这样。 非常感谢您的任何帮助!

  • 作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题

  • 我们经常会看到很多程序员大部分的"编程"时间都花费在检查bug和修复bug上。无论你是在编写修改代码还是重构系统,几乎都是花费大量的时间在进行故障排除和测试,外界都觉得我们程序员是设计师,能够把一个系统从无做到有,是一项很伟大的工作,而且是相当有趣的工作,但事实上我们每天都是徘徊在排错、调试、测试之间。当然如果你有良好的习惯和技术方案来直面这些问题,那么你就有可能将排错时间减到最少,而尽可能的将时