嗨,我们一直在使用旧的spring版本和kafka 1.1,并有以下依赖项
+--- org.springframework.boot:spring-boot-starter-web: -> 1.5.4.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-config: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-bus-kafka: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-stream-binder-kafka: -> 1.2.1.RELEASE (*)
+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
+--- org.apache.kafka:kafka_2.11:0.11.0.2 (*)
我在应用程序中有以下配置。yaml文件
spring:
kafka:
properties.security.protocol: SSL
ssl:
key-password: ${ABC_KEY_PASSWORD}
keystore-location: file:${ABC_KEYSTORE}
keystore-password: ${ABC_KEYSTORE_PASSWORD}
truststore-location: file:${ABC_TRUSTSTORE}
truststore-password: ${ABC_TRUSTSTORE_PASSWORD}
consumer:
enable-auto-commit: false
cloud:
stream:
bindings:
feed_fcpostprocessor_event:
destination: app-fc-notification
binder: abckafka
group: app-fc-group
consumer:
partitioned: true
concurrency: 2
kafka:
binder:
brokers: ${ABC_BROKER_HOST:abc-kafka.aws.local:9092}
autoCreateTopics: true
autoAddPartitions: true
replicationFactor: 3
partitionCount: 2
bindings:
feed_fcpostprocessor_event:
consumer:
autoCommitOffset: false
binders:
abckafka:
type: kafka
我在stackoverflow中找到了链接。答案有
首先,我不确定您对SpringApplication.ext(applicationContext,())的期望是什么-
这是否意味着Kafka不知道消费者已关闭,在这种情况下没有来自消费者的通信。我认为会发生重新平衡,此外,由于kafka级别的enable-自动提交和绑定级别的自动提交偏移设置为假。偏移量将不会提交,如果应用程序再次启动,它将从它错过的最后一个偏移量开始,或者如果重新平衡发生,那么其他消费者将从其他消费者失败的分区偏移量中读取。
我相信我的问题不是重复的,必须提出中肯的问题。如果有问题,请告诉我答案。否则,请澄清行为。谢谢
首先,即使Boot 1.5不再受支持;您应该使用Springkafka 1.3.11而不是1.1;由于KIP-62,它有一个更简单的线程模型。在1.3之前,为了避免重新平衡,事情非常复杂。
但是,更好的是,您应该升级到所有项目的受支持版本,但至少,将sping-kafka升级到1.3。
你的期望是正确的;具有未提交偏移量的记录将被重新提交。
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1
谢了。