当前位置: 首页 > 知识库问答 >
问题:

为什么Kafka Streams应用程序(Spring Cloud Stream)会忽略自定义SerDe?

黄永怡
2023-03-14

因此,我实现了一个自定义SerDe,它从Confluent提供的SpecificAvroSerde扩展到每当与模式注册表通信超时时,都会尝试重试。我已将Spring Cloud Streams Kafka binders配置为默认使用:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=com.test.RetrySpecificAvroSerde

今天我在日志中看到了这个错误:

2020-12-14 01:31:53.006 ERROR 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [de7ait1214-x07-baseline-pc-data-s
torage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: stream-thread [de7ait1214-x07-baseline-pc-data-storage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] task [0_0] Exception caught while punctuating processor 'KSTREAM-TRANSFORM-0000000001'
        at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:449) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {...avro json...}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:365) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:357) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:343) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.3.jar:na]
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.3.jar:na]
        at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.3.3.jar:na]
        at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.3.3.jar:na]
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:na]
...

这告诉我Kafka Streams使用的SerDe不是我上面定义的SerDe,而是基类SpecificAvroSerde(它包装SpecificAvroSerializer)。

这与Spring Cloud Stream Kafka库尝试自动推断SerDe以使用的某种方式有关吗?覆盖和设置SerDe的“正确”方式是什么?

共有1个答案

孟品
2023-03-14

我在您的配置中看到了这一点:spring。云流动Kafka。流。粘合剂配置违约钥匙serde。这是默认键Serde。你的意思是把它用作值吗。serde。那么,这需要改变。

也就是说,您可以在单个绑定上设置Serde(具有更高的优先级)。

如果您的Kafka Streams函数是强类型的(即KStream泛型参数使用正确的类型),您还可以在您的应用程序中定义RetrySpecificAvroSerde类型的bean。此方法在绑定器中具有最高优先级。

更正后,如果仍然失败,请与我们共享一个小样本,然后我们可以查看。

 类似资料:
  • 我正在使用Spring Boot 2.0.1和WebFlux路由器函数(不是基于注释的!)编写一个应用程序。对于我的一些数据对象,我编写了扩展的自定义序列化程序。这些我在中注册并将该模块公开为bean。 当我运行应用程序时,这个设置就像一个魅力。bean被实例化,REST响应使用正确的序列化器被序列化。 现在我想编写一个测试来验证路由器功能及其背后的处理程序是否按预期工作。我想模拟处理程序背后的服

  • 问题内容: 我正在尝试在包级别使用Hibernate @TypeDef批注,这与Hibernate文档中所描述的完全相同。我正在使用和。代码可以编译,并且在类路径中,但是Hibernate仍然看不到它。 如果我上课,那是行得通的,但是如果我把放在那,那是没有用的。我试图用Google搜索,但找不到任何有用的信息。 谢谢! 问题答案: 您可能需要添加一个 到您的Hibernate配置文件,或调用co

  • 问题内容: 我有一个带有关系的实体,我想通过一个查询来检索它,因此使用。有时,Hibernate不尊重它,而是发出N + 1 秒。随着 有时 我的意思是,因为我不知道是什么触发它,我有案件对不同的查询,这可能发生,或者不一样的类。 这是带有我使用的注释的简化实体: 用 我希望单个查询能够同时获取其及其内容,例如 相反,我得到了第一选择所有N S和那么N 献给所有S(考虑没有缓存)。 我发现了许多类

  • 我正在使用JPA2.1(由Hibernate4.2.11支持)和Spring4.0.2开发一个应用程序。我们正在使用Envers审核项目实体中的更改。这很好。当我们尝试使用自定义修订实体时,问题就出现了,正如Envers文档所述:http://docs.jboss.org/hibernate/core/4.1/devguide/en-US/html/ch15.html#envers-修正日志 正如

  • 问题内容: 我在Spring 4 MVC + Security + Boot项目中设置了一个自定义身份验证过滤器。过滤器可以很好地完成工作,现在我想禁用某些URI(例如)的安全性。这是我的配置 不幸的是,当我在/ api / …下调用resource时,过滤器仍然被链接。我在过滤器中添加了println,并在每次调用时将其写入控制台。你知道我的配置有什么问题吗? 更新 过滤器代码: 问题答案: 删

  • 我对iOS布局约束的机制有误解。请参阅下面列出的我放在viewDidLoad中的代码。 在我看来,我的意图是明确的。我想在设备屏幕的中央看到一个按钮。但我只能看到下面的图片。 我在项目控制台中有一个输出,非常可怕,我无法从中理解任何东西。 无法同时满足约束。可能以下列表中至少有一个约束是您不想要的。尝试以下方法:(1)查看每个约束,并尝试找出您不期望的约束;(2) 查找添加了一个或多个不需要的约束