当前位置: 首页 > 知识库问答 >
问题:

此错误处理程序无法直接处理“SerializationException”;请考虑配置“ErrorHandlingDeserializer”

应煌
2023-03-14

生产者属性

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

消费者财产

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

消费者服务

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}

生产性服务业

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}

创建主题的生产者配置

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}

生产商工作正常,但消费者给出的错误如下

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'Serializathtml" target="_blank">ionException's directly; please consider configuring an

组织中的值和/或键反序列化器中的“ErrorHandlingDeserializer”。springframework。Kafka。侦听器。DefaultErrorHandler。handleOtherException(DefaultErrorHandler.java:149)~[spring-kafka-2.8.0.jar:2.8.0]DefaultErrorHandler。java:149位于组织。springframework。Kafka。侦听器。KafkaMessageListenerContainer$ListenerConsumer。handleConsumerException(KafkaMessageListenerContainer.java:1760)~[spring-kafka-2.8.0.jar:2.8.0]KafkaMessageListenerContainer。java:1760,位于org。springframework。Kafka。侦听器。KafkaMessageListenerContainer$ListenerConsumer。运行(KafkaMessageListenerContainer.java:1283)~(spring-kafka-2.8.0.jar:2.8.0)KafkaMessageListenerContainer。java:1283在java。基本/java。util。同时发生的遗嘱执行人$RunnableAdapter。调用(Executors.java:539)~[na:na]Executors。爪哇:539在爪哇。基本/java。util。同时发生的未来任务。运行(FutureTask.java:264)~[na:na]FutureTask。java:264在java。基本/java。lang.Thread。运行(Thread.java:833)~[na:na]线程。java:833由:org引起。阿帕奇。Kafka。常见的错误。RecordDeserialization异常:在偏移量1处反序列化分区user-topic-0的键/值时出错。如果需要,请查看记录以继续消费。位于组织。阿帕奇。Kafka。客户。消费者内部构件。取数器。parseRecord(Fetcher.java:1429)~[kafka-clients-3.0.0.jar:na]Fetcher。java:1429 at org。阿帕奇。Kafka。客户。消费者内部构件。取数器。访问$3400(Fetcher.java:134)~[kafka-clients-3.0.0.jar:na]Fetcher。java:134位于组织。阿帕奇。Kafka。客户。消费者内部构件。Fetcher$CompletedFetch。fetchRecords(Fetcher.java:1652)~[kafka-clients-3.0.0.jar:na]Fetcher。java:1652,网址:org。阿帕奇。Kafka。客户。消费者内部构件。Fetcher$CompletedFetch。访问$1800(Fetcher.java:1488)~[kafka-clients-3.0.0.jar:na]Fetcher。java:1488 at org。阿帕奇。Kafka。客户。消费者内部构件。取数器。fetchRecords(Fetcher.java:721)~[kafka-clients-3.0.0.jar:na]Fetcher。java:721 at org。阿帕奇。Kafka。客户。消费者内部构件。取数器。fetchedRecords(Fetcher.java:672)~[kafka-clients-3.0.0.jar:na]Fetcher。java:672位于org。阿帕奇。Kafka。客户。消费者Kafka康萨默尔。pollForFetches(KafkaConsumer.java:1277)~[kafka-clients-3.0.0.jar:na]KafkaConsumer。java:1277,位于org。阿帕奇。Kafka。客户。消费者Kafka康萨默尔。投票(KafkaConsumer.java:1238)~[kafka-clients-3.0.0.jar:na]KafkaConsumer。java:1238 at org。阿帕奇。Kafka。客户。消费者Kafka康萨默尔。民意测验(KafkaConsumer.java:1211)~[kafka-clients-3.0.0.jar:na]KafkaConsumer。java:1211,网址:org。springframework。Kafka。侦听器。KafkaMessageListenerContainer$ListenerConsumer。pollConsumer(KafkaMessageListenerContainer.java:1507)~(spring-kafka-2.8.0.jar:2.8.0)KafkaMessageListenerContainer。java:1507 at org。springframework。Kafka。侦听器。KafkaMessageListenerContainer$ListenerConsumer。doPoll(KafkaMessageListenerContainer.java:1497)~(spring-kafka-2.8.0.jar:2.8.0)KafkaMessageListenerContainer。java:1497,网址:org。springframework。Kafka。侦听器。KafkaMessageListenerContainer$ListenerConsumer。pollAndInvoke(KafkaMessageListenerContainer.java:1325)~(spring-kafka-2.8.0.jar:2.8.0)KafkaMessage

我是Kafka的新手,如果你能帮助我,我会很高兴的,我想知道为什么会出现这个错误

共有1个答案

阎宾实
2023-03-14

错误消息是否没有告诉您任何信息?

此错误处理程序无法直接处理'SerializationException';请考虑在值和/或键反序列化器中配置'ErrorHandlingDeserializer'

请参阅文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-处理反序列化程序

当反序列化程序无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer。此反序列化程序将委托给真正的反序列化程序(键或值)。如果委托未能反序列化记录内容,ErrorHandlingDeserializer将在包含原因和原始字节的标头中返回null值和反序列化异常。使用记录级MessageListener时,如果ConsumerRecord包含键或值的反序列化异常标头,则会使用失败的ConsumerRecord调用容器的ErrorHandler。记录不会传递给侦听器。

您可以使用DefaultKafkanConsumerFactory构造函数,该构造函数接受键和值反序列化器对象,并连接到使用适当委托配置的相应ErrorHandlingDeserializer实例中。或者,您可以使用使用者配置属性(ErrorHandlingDeserializer使用这些属性)实例化委托。属性名是ErrorHandlingDeserializer。KEY\u DESERIALIZER\u类和ErrorHandlingDeserializer。VALUE\u DESERIALIZER\u类。属性值可以是类或类名。以下示例显示了如何设置这些属性:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

带启动:

...
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
...
 类似资料:
  • 我正在尝试为我的骆驼上下文中的所有路由抛出的所有异常配置一个异常处理程序。我的做法是这样的,没有运气: 实例化默认骆驼上下文 从spring上下文检索RouteDefinition列表 通过调用CTX.AddRouteDefinitions()将这些定义添加到camel上下文 从java dsl定义的RotueBuilder添加我的异常处理程序路由(ctx.addRoutes(new MyErro

  • 我正在尝试用Babel设置网页包来编译React文件。webpack配置是在VSCode中自动生成的,具有webpack扩展名。我尝试在预设部分添加“@babel/preset react”,但它不起作用,出现以下错误: 模块解析失败:意外令牌(11:16)您可能需要一个适当的加载程序来处理此文件类型。 似乎thar webpack忽略了“@Babel/预设反应”?配置如下。 非常感谢。

  • 我在Spring Kafka设置中使用Avro和模式注册表。 我想以某种方式处理,它可能在反序列化过程中引发。 我找到了以下两个资源: https://github.com/spring-projects/spring-kafka/issues/164 有人知道如何在Spring Boot中捕获吗? 我使用的是Spring Boot 2.0.2 编辑:我找到了解决办法。

  • 问题内容: 我正在发出ajax jsonp请求,但是失败错误处理无法正常工作。如果请求为404或500,则不会处理该错误。 我一直在四处寻找答案,但找不到任何东西。http://code.google.com/p/jquery- jsonp/ 似乎有一种解决方案,但是我找不到任何有关如何使用它的示例。 问题答案: 处理错误的两种方法 跨域JSONP请求没有错误处理。使用Github https:/

  • 0.3 新版功能. 应用会需要某种配置。你可能会需要根据应用环境更改不同的设置,比如切换调试模 式、设置密钥、或是别的设定环境的东西。 Flask 被设计为需要配置来启动应用。你可以在代码中硬编码配置,这对于小的应用 并不坏,但是有更好的方法。 跟你如何载入配置无关,会有一个可用的配置对象保存着载入的配置值: Flask 对象的 config 属性。这是 Flask 自己放置特定配置值的地方,也是

  • 我无法在我的Kotlin Spring Boot应用程序中正确地注入应用程序属性。在我的文件中定义并随后在文件中引用的属性(在resources->META-INF下)没有正确地添加到bean表达式上下文中。使用,当我将鼠标悬停在该属性上时,我会看到错误。试图运行应用程序(将配置属性值construction-inject到类中)会导致通过构造函数参数表示的