我已经从Google的github存储库中下载了pub/sub到BigQuery数据流模板的副本。我正在本地机器上使用直接运行程序运行它。
在测试中,我确认只有在UDF处理或从JSON到TableRow的转换过程中发生错误时,模板才会将失败写入“deadletter”表。
我还希望通过将它们发送到单独的TupleTag中来更优雅地处理在插入BigQuery时发生的失败,这样它们也可以发送到deadleter表或其他输出以进行检查和处理。当前,当使用dataflow-runner执行时,这些错误只会写入Stackdriver日志,并且会无限期地重试,直到问题解决为止。
问题一:在本地测试并发布一条格式与目标表模式不匹配的消息时,insert被重试5次,然后管道崩溃,出现RuntimeException以及从Google API的HTTP响应返回的错误。我认为BigQueryServices.Impl中设置了此行为:
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
然而,根据谷歌的文档,
“在流模式下运行时,包含失败项的包将被无限期重试,这可能导致您的管道永久停顿。”
作为Beam Pub/Io,
创建和使用无界的PCollections
我的印象是,当从pub/sub读取时,应该默认启用流模式。我甚至在对writeTableRows()的调用中添加了Streaming_Inserts方法,但它并没有影响这种行为。
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
问题二:
我这样问是因为我不知道如何捕获与插入相关的错误,而不创建我自己的静态类,这个静态类重写了expand方法,并使用ParDo和DoFn,我可以在其中添加我自己的自定义逻辑,为成功记录和失败记录创建单独的tupleTag,类似于在JavascriptTextTransformer中为FailSafeJavaScriptUDF所做的操作。
更新:
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
错误:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
在Beam的最新版本中,BigQueryIO.Write转换返回WriteResult对象,该对象使您能够检索未能输出到BigQuery的TableRows的PCollection。使用这一点,您可以轻松地检索失败,将它们格式化为死信输出的结构,并将记录重新提交给BigQuery。这样就不需要单独的类来管理成功和失败的记录。
下面是一个例子,说明你的管道可能是什么样子的。
// Attempt to write the table rows to the output table.
WriteResult writeResult =
pipeline.apply(
"WriteRecordsToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
/*
* 1) Get the failed inserts
* 2) Transform to the deadletter table format.
* 3) Output to the deadletter table.
*/
writeResult
.getFailedInserts()
.apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
.apply(
"WriteFailedInsertsToDeadletter",
BigQueryIO.writeTableRows()
.to(options.getDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
此外,要回答您的问题:
Streaming
选项设置为true
。大家好,我们使用的是Spring kafka 1.3.3,我们的应用程序是消耗-进程-发布管道。 如果在生产阶段流水线出现任何故障,我们如何处理重试并寻求返回。例如:应用程序正在消耗消息,处理它们并以异步方式发布到另一个主题中。但如果在发布中有任何错误
问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请
我有一个索引php页面,其中包含一个打印数组值的代码
我正在制作一个专用于电子邮件订阅者的组件...喜欢这个 然而,我只得到这样的回应 你知道我如何解决这个问题吗?我让它变得像人们输入他们的电子邮件一样简单,只需单击按钮,就这样。 非常感谢您的任何帮助!
作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题
我们经常会看到很多程序员大部分的"编程"时间都花费在检查bug和修复bug上。无论你是在编写修改代码还是重构系统,几乎都是花费大量的时间在进行故障排除和测试,外界都觉得我们程序员是设计师,能够把一个系统从无做到有,是一项很伟大的工作,而且是相当有趣的工作,但事实上我们每天都是徘徊在排错、调试、测试之间。当然如果你有良好的习惯和技术方案来直面这些问题,那么你就有可能将排错时间减到最少,而尽可能的将时