当前位置: 首页 > 知识库问答 >
问题:

使用Avro模式将Hudi与Kafka集成的问题

樊熠彤
2023-03-14

我正在尝试将Hudi与Kafka主题集成。

接下来的步骤是:

  1. 在ConFluent中创建了Kafka主题,并在模式注册表中定义了模式。
  2. 使用kafka-avro控制台生产者,我正在尝试生成数据。
  3. 以连续模式运行Hudi Delta Streamer以消耗数据。

基础设施:

  1. AWS EMR
  2. 火花2.4.4
  3. Hudi实用程序(用0.6.0和0.7.0尝试)
  4. Avro(已尝试avro-1.8.2、avro-1.9.2和avro-1.10.0)

我得到了下面的错误堆栈跟踪。有人能帮我解决这个问题吗?

21/02/24 13:02:08 ERROR TaskResultGetter: Exception while getting task result
org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use.  This could happen if you registered additional schemas after starting your spark context.
    at org.apache.spark.serializer.GenericAvroSerializer$$anonfun$4.apply(GenericAvroSerializer.scala:141)
    at org.apache.spark.serializer.GenericAvroSerializer$$anonfun$4.apply(GenericAvroSerializer.scala:138)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
    at org.apache.spark.serializer.GenericAvroSerializer.deserializeDatum(GenericAvroSerializer.scala:137)
    at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:162)
    at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:47)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:371)
    at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:88)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:72)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/02/24 13:02:08 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
21/02/24 13:02:08 INFO YarnScheduler: Cancelling stage 1
21/02/24 13:02:08 INFO YarnScheduler: Killing all running tasks in stage 1: Stage cancelled
21/02/24 13:02:08 INFO DAGScheduler: ResultStage 1 (isEmpty at DeltaSync.java:380) failed in 1.415 s due to Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use.  This could happen if you registered additional schemas after starting your spark context.
21/02/24 13:02:08 INFO DAGScheduler: Job 5 failed: isEmpty at DeltaSync.java:380, took 1.422265 s
21/02/24 13:02:08 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use.  This could happen if you registered additional schemas after starting your spark context.
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
    at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
    at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:587)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

共有2个答案

祁权
2023-03-14

我能够解决的问题与正确版本的jars在火花命令。

--packages org.apache.spark:spark-avro_2.12:3.0.0,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.avro:avro:1.10.1

当我在火花命令中添加上述内容时,我不再看到错误。

周麒
2023-03-14

请打开一个github问题(https://github.com/apache/hudi/issues)以便及时得到回复。

 类似资料:
  • 我们使用Apache Kafka(不是confluent Kafka)0.10。我们想用Kafka设置AVRO模式。我有如下的avro模式。 序列化消息, 这正像预期的那样起作用。 但是,希望在主题级别设置一个Avro模式,这样,如果消息不符合Avro模式,主题将拒绝消息。 不管怎么说,我可以用阿帕奇Kafka0.10做到这一点。

  • 我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?

  • 有没有办法从Apache spark生成无模式的avro?我可以看到一种使用apache avro库通过Java/Scala和融合avro生成它的方法。当我用下面的方式从Spark编写Avro时,它用模式创建了Avro。我想在没有模式的情况下创建,以减少最终数据集的大小。

  • 我有一个带有日期的简单POJO,在导入Google BigQuery之前,它将作为Avro存储在存储器中。日期转换为long,我试图使用@AvroSchema覆盖日期字段的模式生成,以便BigQuery了解字段的类型。 简单的POJO: 这最终得到以下AVRO-Schema: 这些似乎是错误的,应该是简单的{“name”:“tm”,“type”:“long”,“logicalType”:“time

  • 我有一个简单的案例类: 我正在添加字段“name” java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumer

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问