当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Connect读取AVRO编码消息(由KSQL流创建)时出现问题

潘辰龙
2023-03-14

当我们通过KSQL创建AVRO消息并试图使用Kafka Connect来消费这些消息时,会发生一些奇怪的事情。有点上下文:

源数据一个第三方提供商在我们的一个Kafka集群上以JSON的形式生成数据(到目前为止,还算不错)。我们实际上看到了数据。

数据转换由于我们的内部系统要求在AVRO中对数据进行编码,我们创建了一个KSQL集群,通过在KSQL中创建以下流将传入数据转换为AVRO:

{
    "ksql": "
        CREATE STREAM src_stream (browser_name VARCHAR)
        WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');

        CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO',  PARTITIONS=1, REPLICAS=3) AS
        SELECT * FROM src_stream;
    ",
    "streamsProperties": {
        "ksql.streams.auto.offset.reset": "earliest"
    }
}

随着偏移量的增加,我们看到数据从JSON主题生成到AVRO主题。

然后我们在(新的)Kafka Connect集群中创建一个Kafka连接器。在某些上下文中,我们使用了多个Kafka Connect集群(这些集群具有相同的属性),因此,我们有一个Kafka Connect集群为该数据运行,但为其他AVRO数据运行该集群的精确副本(1用于分析,1用于我们的业务数据)。

这个连接器的接收器是BigQuery,我们使用的是Webay BigQuery接收器连接器1.2.0。再说一遍,到目前为止,还不错。我们的业务集群使用该连接器运行良好,并且业务集群上的AVRO主题流式传输到BigQuery中。

但是,当我们尝试使用前面由KSQL语句创建的AVRO主题时,我们看到抛出了一个异常:/

例外情况如下:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
 at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

这表明Kafka Connect正在读取消息,对AVRO进行解码,并尝试从模式注册表中获取ID为0的模式。显然,模式注册表中的模式ID总是>0。

帕特里克·布雷

更新:我们已经为AVRO消息实现了一个基本的消费者,并且该消费者正确地识别了AVRO消息中的模式(ID:3),因此它似乎被重新定义为Kafka Connect,而不是实际的KSQL/AVRO消息。

共有1个答案

罗允晨
2023-03-14

显然,架构注册表中的架构ID总是>0....看起来KSQL正在对模式ID为0的消息进行编码,但我们无法找到原因

AvroConverter执行一个“哑检查”,仅显示所消耗的字节以0x0的神奇字节开始。接下来的4个字节是ID。

如果您使用的是key.converter=avroconverter,并且您的键以十六进制开头,如0x00000,那么在日志中ID将显示为0,查找将失败。

 类似资料:
  • 我们正在将我们的事件系统迁移到函数式编程模型。我们遵循了下一个“指南”,它对消费者非常有效,但使用StreamBridge的生产者没有正确创建消息。 我们有下一个错误: 我们正在使用< code > 2021 . 0 . 0 spring-cloud版本。 通过简单的配置: 这是我们的自定义,它用于我们所有的微服务,因此,保持消息的格式很重要: 我们还使用< code>StreamBridge来生

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本

  • 根据业务功能,我们需要在多线程环境中的不同位置读取多个excel文件(包括.xls和.xlsx格式)。每个线程负责读取一个文件。为了测试性能,我们用。xls和。xlsx格式创建了2个文件集。一个文件集只有20行数据,而另一个文件集包含300,000行数据。我们能够成功地读取。xls格式的两个文件,并将数据加载到表中。即使对于20行的data.xlsx文件,我们的源代码也运行良好。但是当执行流开始读