当前位置: 首页 > 知识库问答 >
问题:

Avro GenericRecord反序列化无法通过SpringKafka工作

相化
2023-03-14

我正在尽可能地简化我的消费者。问题是,当我看到Kafka监听器中的记录时:

<代码>列表

spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_ACCESS_KEY}" password="${SASL_SECRET}";
spring.kafka.consumer.auto-offset-reset=earliest

#### Consumer Properties Configuration
spring.kafka.properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

spring.kafka.bootstrap-servers=
spring.kafka.properties.schema.registry.url=
spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.spring.json.trusted.packages=*

logging.level.org.apache.kafka=TRACE
logging.level.io.confluent.kafka.schemaregistry=TRACE
    @KafkaListener(topics = "${topic}", groupId = "${group}")
    public void processMessageBatch(List<GenericRecord> incomingRecords,
                                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        currentMicroBatch = Stream.of(currentMicroBatch, incomingRecords).flatMap(List::stream).collect(Collectors.toList());
        if (currentMicroBatch.size() >= maxRecords || validatedElapsedDuration(durationMonitor)) {
            System.out.println("ETL processing logic will be done here");
        }
        clearBatch();
    }

我在使用时注意到:

SpringKafka。消费者值反序列化器=io。汇合的。Kafka。序列化程序。KafkaavroderializerSpring。Kafka。消费者键反序列化器=组织。阿帕奇。Kafka。常见的序列化。字符串反序列化器

我得到以下错误:

2020-12-02 17:04:42.745调试51910---[ntainer#0-0-C-1]i.C.k.s.客户端。RestRestService:将输入为null的GET发送到https://myschemaregistry.com

2020-12-02 17:04:42.852 ERROR 51910 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-avro-32 at offset 7836. If needed, please seek past the record to continue consumption.
java.lang.IllegalArgumentException: argument "src" is null
    at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4735)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3502)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:270)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:573)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:557)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:149)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:241)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1062)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1018)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)

共有2个答案

郝昊东
2023-03-14

有同样的问题
对我来说,我只需要将模式注册表url的协议从http:///code>设置为https:///code>,就可以了。

@Ryan的回答给了我一个线索。

陆俊迈
2023-03-14

我发现了问题。在为confluent深入调试rest客户端时,我遇到了一个401(糟糕的日志btw)

我需要添加以下内容:spring。Kafka。属性。基本的授权。资格证书source=SASL\u INHERIT

因为我使用SASL身份验证,需要注册表来继承我上面添加的SASL配置。有趣的东西...

 类似资料:
  • 问题内容: 我试图序列化和反序列化内部对象的数组列表: HairList对象也是一个可序列化的对象。 此代码执行返回以下错误: 排队 我不知道我在做什么错。你能给个小费吗? 更新: 解决: 仅使用HairBirt的本机数组而不是ArrayList即可工作: 代替 感谢大家的帮助。 问题答案: 不要使用-而是使用二进制数据并对它进行base64编码,以将其转换为字符串而不会丢失信息。 我强烈怀疑这是

  • 我试图通过CLI反序列化保存为protobuf的文件(似乎是最容易做到的事情)。我不希望使用protoc进行编译,将其导入编程语言,然后读取结果。 我的用例:一个TensorFlow lite工具以原型格式输出了一些数据。我也在TensorFlow存储库中找到了原型定义。我只想快速阅读输出。具体来说,我正在从工具中获取消息。

  • 问题内容: 作为一个小项目,我一直在尝试做一个小事,它可以读取序列化的lambda(从本地或从FTP)并调用它们的运行函数作为测试的一部分,以测试Windows中的文件关联(即打开某些文件类型)使用特定程序打开它们),但不管如何,无论如何,它似乎从未正确地反序列化。 lambda被这样声明 并使用由ObjectOutputStream包装的[n可选] BufferedOutputStream包装的

  • 问题内容: 在hibernate状态下执行条件查询时,出现以下异常: 可能是什么问题呢? PS:虽然可能不相关,但我的hibernate版本是hibernate-4.0.1 final。 问题答案: 问题在于被引用的实体对实体有另一个引用,并且该关系未由任何-like注释进行注释。

  • 问题内容: 我正在使用Hibernate的两个表,但我不明白为什么对于特定查询我有此问题。我希望有人意识到这个问题。 我有一个桌子用户 和一个桌子区域 日志说: 问题答案: 我建议仅在字段或getter上设置注释。我更喜欢田野,但那只是我的口味。 请参阅Hibernate中有关字段和属性访问的奇怪案例: 因此,要么将注释仅放置在字段上,要么仅放置在getters(properties)上。混合它们

  • 作为一个小项目,我一直在尝试制作一个小东西来读取序列化的lambda(本地或从FTP)并调用它们的run函数,作为测试的一部分,以在Windows中试验文件关联(即打开某些文件类型会用某个程序打开它们)等等,但无论我尝试什么,它似乎都无法正确地反序列化。 lambda是这样宣布的 并使用由ObjectOutputStream包装的[n可选]BufferedOutputStream包装的FileOu