我正在尝试从同一个Kafka主题反序列化不同的JSON有效负载。这里问的其他问题引导我进行了第一次尝试,但我无法让它运行。
正如Gary所提到的(这里),有一些提示(JsonSerializer.ADD\u TYPE\u INFO\u HEADERS),但当我发送和接收这两条消息时,我会收到一个异常。
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.foo.message.ConsumerImpl.consumeSelf(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.util.Map<java.lang.String, java.lang.Object>,com.foo.message.KafkaMessage,org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)]
Bean [com.foo.message.ConsumerImpl@6df2a206]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.foo.message.KafkaMessageWithAdditionalField] to [com.foo.message.KafkaMessage] for GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}], failedMessage=GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1207) [spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
...
LoggingErrorHandler在ConsumerRecord中已经提到了一个(正确的)值。
2019-01-24 07:16:27.630 ERROR 27204 --- [ntainer#2-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = fromBar, partition = 0, offset = 22, CreateTime = 1548310583481, serialized key size = -1, serialized value size = 196, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.foo.bar.message.KafkaMessageWithAdditionalField@4e3168f7)
第一个我的配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, KafkaMessage> consumerFactoryMessage()
{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerMessageContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryMessage());
return factory;
}
@Bean
public ConsumerFactory<String, KafkaMessageWithAdditionalField> consumerFactoryMessageWithAdditionalField()
{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(KafkaMessageWithAdditionalField.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> kafkaListenerMessageWithAdditionalFieldContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryMessageWithAdditionalField());
return factory;
}
}
以下是听众:
@KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "${foo.kafka.topic-springBoot}"), containerFactory = "kafkaListenerMessageContainerFactory")
public void consumeSelf(@Headers Map<String, Object> map, KafkaMessage message, ConsumerRecord<String, Object> cr)
{
log.info("message received %s", message);
}
@KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "${foo.kafka.topic-springBoot}"), containerFactory = "kafkaListenerMessageWithAdditionalFieldContainerFactory")
public void consumeSelfAdd(@Headers Map<String, Object> map, KafkaMessageWithAdditionalField message, ConsumerRecord<String, Object> cr)
{
log.info("messageKafkaMessageWithAdditionalField received %s", message);
}
你不能那样做;您有两个不同的侦听器容器,其中的侦听器需要不同的对象。
对于接收不同类型的多个侦听器方法,您需要在类级别使用@KafkaListener
,在方法级别使用@KafkaHandler
。
请参阅@KafkaListener上的类。
在类级别使用@KafkaListener时,您可以在方法级别指定@KafkaHandler。传递消息时,转换后的消息有效负载类型用于确定调用哪个方法。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true`)
public void listenDefault(Object object) {
...
}
}
默认方法是可选的,用于未知的负载类型。
但这只适用于智能反序列化器(知道如何转换为不同的有效负载)。
或者,您可以将RecordFilterStrategy
添加到侦听器容器工厂以跳过每个侦听器中的其他记录。
JSON消息正在被字符串使用者消费。My Products发送两种类型的消息字符串和序列化JSON 消费者方面我有两个消费者1。正在侦听字符串消息%2。监听json并反序列化为对象 即使是json消息也由String Listener使用。
我的微服务需要通过HTTP与2个不同的服务进行通信。1个与snake_caseJSON有API契约,而另一个使用camelCase。如何配置WebFlux在一组功能endpoint上使用某个Jackson反序列化和序列化JSON,而在不同endpoint上使用另一个? WebFlux文档展示了如何连接另一个ObjectMapper,但这适用于我的API的所有endpoint。所以现在要么我所有的J
我有一个这样的JSON: 所有类型都可以包含可变时间,或者只能有一种类型只有一次。 在SpringBoot应用程序中,我有三个类,扩展了公共类。 我有两种方法来序列化和反序列化它。 虽然测试序列化是可以的,但是反序列化在默认情况下不起作用。所以我修改了<code>CommonObject</code>如下: 现在,反序列化工作正常,但序列化不行。json值中包含属性< code>type两次。 有
我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920
问题内容: 我正在尝试使用Gson 反序列化a ,值的类型可以变化,值“ in_wanted”可以是a 或a 。 in_wanted为: in_wanted为: 每当“ in_wanted”的值为布尔值时,我都需要该对象,并且需要一个反序列化器以返回null。用Gson做到这一点的最佳方法是什么? 问题答案: 您可以使用自定义解串器来完成此操作。首先,我们应该创建可以表示您的JSON的数据模型。
我们的拓扑使用从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。 在某个时刻,我在worker中观察到大量重复的警告消息。日志 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtil