@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Value("${kafka.consumer.group.id}")
private String kafkaGroupId;
@Bean
public ConsumerFactory<String, String> consumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put("message.assembler.buffer.capacity", 33554432);
props.put("max.tracked.messages.per.partition", 24);
props.put("exception.on.message.dropped", true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("segment.deserializer.class", DefaultSegmentDeserializer.class.getName());
return new DefaultKafkaConsumerFactory(props, null, new StringDeserializer());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(consumerConfig());
return listener;
}
}
@Service
public class KafkaReciever {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);
@KafkaListener(topics = "${kafka.topic.name}", group = "${kafka.consumer.group.id}")
public void recieveData(@Payload Student student, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
LOGGER.info("Data - " + student + " recieved");
}
}
[{
"studentId": "Q45678123",
"firstName": "Anderson",
"lastName": "John",
"age": "12",
"address": {
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
},
{
"studentId": "Q45678123",
"firstName": "abc",
"lastName": "xyz",
"age": "12",
"address": {
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
}]
[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8]
无法反序列化START_ARRAY中的com.springboot.model.student实例
如果使用json反序列化程序,则只有一个列表,而不是一个学生
@Payload List<Student> student
或者如果使用string desailizer,则您有一个JSON字符串,并且必须手动解析它
@Payload String student
我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。 有人能帮我吗?
我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
我在学Kafka春靴。我想在我的consumer类中添加第二个consumer,它订阅了与第一个主题相同的主题,但具有不同的groupID。这些类不是很复杂,当我只有第一个消费Json的消费者(或者至少输出是Json?)时,它们就可以工作。还有一点需要说明的是,我从一个生产者和消费者开始,他们使用@EnableBindings方法,但这种方法不受欢迎,所以我正在学习正确/新的方法。 任何提示!请让
下面是我对kafka侦听器的方法定义,如果接收空或空的有效负载字符串,我想我会得到下面的错误...你能帮帮我吗。
我一直在Kafka消费者方面面临下面的异常。令人惊讶的是,这个问题不一致,旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。有人能帮助确定是什么导致了这种情况吗? 我的应用程序使用以下内容: 自定义侦听器类com。我的公司。听众。Kafka巴奇列斯特纳 附加查询:即使设置了,异常堆栈跟踪仍然包含我省略的完整有效负载。知道为什么吗? 提前感谢! 更新: KafkaBatchLis