当前位置: 首页 > 知识库问答 >
问题:

Spring Boot Kafka消费者在循环中抛出错误

蓬弘
2023-03-14

我是Kafka的新手,在尝试一个示例场景时,Kafka生产者以JSON格式向消费者发送用户详细信息。我访问过类似的问题,但我无法得到我需要的答案。

如果我在终端中运行任何一个生产者或消费者,在spring boot中运行另一个生产者或消费者,我不会面临任何问题。错误发生在无限循环中(当生产者和消费者都从不同的spring boot项目启动时):

Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Example3-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.html" target="_blank">listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

我在下面提到了消费者配置中的反序列化和受信任包:

@EnableKafka
@Configuration
public class TestConfig {

    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
        
        return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
        
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }
    
}

我相信我在配置中遗漏了一些东西。我想把从Kafka收到的消息打印到我的Spring Boot控制台(我知道不建议在控制台中打印,这是一个实践项目),下面是消费者的监听器:

@Service
public class TestListener {

    @KafkaListener(topics = "Example3", groupId = "group_json", containerFactory = "kafkaLister")
    public void post(User user) {
        
        System.out.println("Consumed Message: " + user);
    }
    
}

我正在尝试使用的JSON:

{"name":"qaz","dept":"Aero"}

Spring版本:2.4.4

Kafka版本(根据控制台):2.6.7

提前非常感谢。

共有2个答案

柯波峻
2023-03-14

根据Gary Russell先生的回答,以下是解决问题的配置

生产者配置

@Bean
public ProducerFactory<String, User> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");

    return new DefaultKafkaProducerFactory<>(config);
}


@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

消费者配置:

@Configuration
@EnableKafka
public class TestConfig {

    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "edu.kafka.test.model.User");
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
        
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setMissingTopicsFatal(false);
        
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }
    
}
范振海
2023-03-14

原因:java。lang.IllegalArgumentException:班级的教育。Kafka。制作人模型“用户”不在受信任的包中:[java.util、java.lang、edu.consumer.test.model、edu.consumer.test.model.*]。如果您认为该类可以安全地反序列化,请提供其名称。如果序列化仅由受信任的源完成,则还可以启用trust all(*)

看起来反序列化程序正在从其他地方获取其属性。

config。put(JsonDeserializer.TRUSTED_包,“edu.kafka.producer.model.User,java.util,java.lang,edu.consumer.test.model,edu.consumer.test.model.*)

“埃杜。Kafka。制作人模型用户的

您正在尝试反序列化。。。制作人模型用户不是。。。消费者模型用户

。。。制片人 来自标题中的类型信息;如果你想映射一个。。。制片人 对象到。。。消费者 对象,您需要按照文档中的描述配置类型映射。

如果只反序列化用户对象,可以将“使用类型信息”设置为false,并设置默认值类型。查看配置选项。。。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json配置

配置属性

JsonSerializer。ADD_TYPE_INFO_HEADERS(默认为true):可以将其设置为false以禁用JsonSerializer上的此功能(设置addTypeInfo属性)。

JsonSerializer.TYPE_MAPPINGS(默认为空):请参见映射类型。

JsonDeserializer。使用_TYPE_INFO_HEADERS(默认为true):可以将其设置为false以忽略序列化程序设置的头。

JsonDeserializer。REMOVE_TYPE_INFO_HEADERS(默认为true):可以将其设置为false以保留序列化程序设置的头。

JsonDeserializer。KEY_DEFAULT_TYPE`:如果没有标题信息,则用于反序列化密钥的回退类型。

JsonDeserializer。VALUE_DEFAULT_TYPE:如果没有标题信息,用于反序列化值的回退类型。

JsonDeserializer。TRUSTED_PACKAGES(默认java.util,java.lang):允许反序列化的以逗号分隔的包模式列表。*意味着反序列化所有。

JsonDeserializer.TYPE_MAPPINGS(默认为空):请参见映射类型。

JsonDeserializer.KEY_TYPE_METHOD(默认为空):请参见使用方法确定类型。

JsonDeserializer。VALUE_TYPE_METHOD(默认为空):请参阅使用方法确定类型。

默认类型的包始终受信任。

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

键和值反序列化器必须是ErrorHandling反序列化器。你仍然有本地反序列化程序。

 类似资料:
  • 我有一个处于RPC模式的消费者(RabbitListner),我想知道是否有可能引发发布者可以处理的异常。 为了更清楚地说明我的情况如下: 发布者以RPC模式发送消息 消费者收到消息,检查消息的有效性,如果由于缺少参数,消息无法计数,那么我想抛出异常。异常可以是特定的业务异常,也可以是特定的AmqpException,但我希望发布者可以在未进入超时状态时处理该异常 我尝试使用AmqpRejectA

  • 我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p

  • 我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?

  • 我有5个独立的docker图像:1个用于kafka经纪人,1个动物园管理员,1个生产者和2个消费者。我通过生产者向主题发布消息。基本上,我希望消息将在循环算法中使用,因此,为此,我使用相同的< code>group.id定义了消费者,并将< code > partition . assignment . strategy 的配置添加为< code > org . Apache . Kafka .

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(