spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
Post post = consumerRecord.value();
// do some logic here
ack.acknowledge();
}
当使用Kafka时,客户端需要自己提交抵消。这与其他消息代理(如AMQP代理)形成鲜明对比,在其他消息代理中,代理跟踪客户机已经接收到的消息。
在您的情况下,您不会自动提交偏移量,因此Kafka希望您手动提交这些偏移量(因为此设置:spring.Kafka.consumer.enable-auto-commit=false
)。如果不在程序中手动提交偏移量,则所描述的行为与预期的基本一致。卡夫卡根本不知道你的程序成功地处理了什么消息。每次重新启动程序时,Kafka都将看到程序尚未提交任何偏移,并将应用spring.Kafka.consumer.auto-offset-reset=aresty
中提供的策略,这意味着队列中的第一条消息。
如果这对您来说都是新的,我建议阅读关于Kafka的这篇文档和这篇Spring文档,因为Kafka与其他消息代理有很大的不同。
我们有kafka集群,包含3个kafka代理节点和3个zookeepers服务器 Kafka版本- 10.1 ( hortonworks) 根据我的理解,因为所有的元数据都位于zookeeper服务器上,kafka代理正在使用这些数据(kafka通过端口2181与zookeeper服务器对话) 我只是想知道是否每台kafka机器都与集群中的其他kafka交谈,或者kafka可能只在动物园管理员服务
由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。
我试图将Kafka偏移量保存到文件中,我使用Spring Boot,似乎偏移量在文件中写入,但没有读取,所以事实上骆驼将在重新启动时从Kafka主题的开头开始读取
我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种
在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的
这个链接说:Apache Kafka 0.9.0.1 http://docs.spring.io/spring-kafka/docs/1.1.3.build-snapshot/reference/html/_integrate.html