当前位置: 首页 > 知识库问答 >
问题:

数据流模板“Pub/Sub Avro to Bigquery”解码失败

锺离森
2023-03-14

我试图通过数据流模板“Pub/Sub Avro to Bigquery”,将数据从Pub/Sub流到Bigquery。发布/订阅中的数据采用AVRO格式,来自Kafka主题。我从架构注册表中获得的对应架构文件。它看起来是这样的:

{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"ID","type":["null","string"],"default":null},{"name":"TIMESTAMP","type":["null","string"],"default":null}]}

保存的schema.avsc中没有换行符,我在数据流中收到这个错误:

2021-01-22 10:31:28.231 MEZError message from worker: java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null, "TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

当我手动使用主题中的消息时,我能够使用完全相同的模式进行解码,但是我需要处理消息前面的五个额外字节。发布/订阅的原始消息如下所示:

b'\x00\x00\x00\x00\x0c\x02\x1656173684800\x02:2021-01-22T10:21:40.384+01:00'

我怀疑,我需要改变我的schema.avsc文件为了数据流正确处理额外的字节,但我不确定如何,也许不确定它是否是正确的方法。

我希望有人能给我指出正确的方向,提前谢谢。

共有1个答案

葛俊
2023-03-14

因此,根据异常和您提供的信息,问题似乎存在于您从发布/订阅主题中使用的输入数据中。

模板最终使用readAvroGenericRecords方法读取。https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L515

这最终使用Beam的AvroCoder对数据进行二进制解码,以使用JavaAvro库生成AvroGenericRecord对象。https://github.com/apache/beam/blob/9e0920c401258ec38bddbd9298747378126b8fe4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java#L327

因此,作为手动测试,您可以尝试使用JavaAvro库解码数据,以确定是否可以生成Avro GenericRecord对象。

 类似资料:
  • 我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设

  • 由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗

  • 问题内容: 我有一些大型的base64编码数据(存储在hadoop文件系统中的快照文件中)。该数据最初是压缩的文本数据。我需要能够读取此编码数据的大块,对其进行解码,然后将其刷新到GZIPOutputStream。 关于如何执行此操作而不是将整个base64数据加载到数组中并调用Base64.decodeBase64(byte [])的任何想法? 如果我读了直到’\ r \ n’分隔符并逐行解码的

  • 我曾经使用过SpringCloudDataFlow、rabbitmq和kafka,但我想知道是否可以使用GooglePub/sub安装scdf。 我不想创建一个流(新的应用程序spring cloud stream),将源或接收器连接到gcp,我希望google pub/sub over spring cloud data flow server用作中间消息代理。 有什么建议吗?

  • 我想在HTML中显示一个请求的对象,我得到了一个错误,我不知道是什么原因导致了我的错误。谷歌帮不上忙,现在我试着问你。我认为错误不是来自我的表,因为我把它注释掉了,错误仍然是Occour。该错误也不是来自“http://localhost:8081/simulation”,因为我使用有效值重新接收了一个有效的JSON。谢谢你的帮助:)。 下面是我的代码: 我通过调用“http://localhos

  • 如何在谷歌VPC项目中运行的谷歌数据流模板中传递/设置“usepublicips”作为运行时参数?