当前位置: 首页 > 知识库问答 >
问题:

GenericRecord的Apache波束编码器

岳京
2023-03-14

我正在构建一个读取Avro通用记录的管道。为了在各个阶段之间传递GenericRecord,我需要注册avrocoder。文档表明,如果我使用通用记录,模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/avrocoder.html#of-java.lang.class-org.apache.avro.schema-

但是,当我将空模式传递给方法avrocoder.of(Class,schema)时,它会在运行时引发异常。有没有一种方法可以为GenericRecord创建一个不需要模式的AvroCoder?在我的例子中,每个GenericRecord都有一个嵌入的模式。

异常和StackTrace:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)

共有1个答案

易弘亮
2023-03-14

在查看了avrocoder的代码之后,我认为这里的文档并不正确。您的avrocoder实例将需要一种方法来确定Avro记录的模式-可能唯一的方法是提供一种方法。

因此,我建议调用avrocoder.of(GenericRecord.class,schema),其中schema是pCollectionGenericRecord对象的正确模式。

 类似资料:
  • 在我的数据流作业中,我需要初始化配置工厂,并在实际处理开始之前将某些消息记录在审核日志中。 我将配置工厂初始化代码审计日志记录放在父类PlatformInitializer中,并在我的主管道类中扩展它。 因此,我还必须在我的管道类中实现可序列化接口,因为beam抛出了错误-<代码>java。io。NotSerializableException:组织。德维塔姆。自定义作业 在PlatformIni

  • 我正在将我的google dataflow java 1.9迁移到beam 2.0,并尝试使用BigtableIO。写 在大舞台前的巴黎公园里,我正在努力让它变得更容易接受。 上述代码引发以下异常InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配 v是对象列表(Vitals.class)。hbase api使用Put方法创建变异。如何创建将与Bigta

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。

  • 在spark中,如果我们必须重新洗牌数据,我们可以使用重新分区一个DataFrame。在apache beam中为pCollection做同样的事情的方法是什么? 在pyspark,

  • 我们正在为Apache Beam管道构建一个集成测试,并遇到了一些问题。有关上下文,请参见下文... 有关我们管道的详细信息: null null 这是一个简单的集成测试,它将验证我们的管道作为一个整体的行为是预期的。 我们目前面临的问题是,当我们运行管道时,它正在阻塞。我们使用的是和(而不是),但是测试似乎在运行管道后挂起。因为这是一个无界的(以流模式运行),所以管道不会终止,因此没有到达管道之

  • 这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr