我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。
我知道有两种方法可以解决这个问题-
>
将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.acknowledge()API
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
@KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
logger.info("===*** batch size : " + batch.size() + "***===");
batch.forEach(System.out::println);
acknowledgment.acknowledge();
}
实现消费者重新平衡监听器。
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
}
});
然而,使用这种方法,我不知道如何获得消费者的引用,以调用消费者。commitSync()
或使用者。commitASync()
API。由于一些技术限制,我无法使用最新版本的spring kafka,该版本支持ConsumerAwareRebalanceListener
,并引用了Consumer
。
那么,如何利用ConsumerBalanceListener
向Kafka提交偏移量呢?
此外,我正在使用ConcurrentKafkAlisterContainerFactory启动多个消费者侦听器线程。setConcurrency()
API,那么如果特定的使用者线程死亡,它是否有自己的ConsumerBalanceListener实例?
消费者重新平衡侦听器不能用于提交偏移量;使用1. x;唯一的方法是通过确认
参数。
1.1.0非常古老;全部1。建议x用户至少升级到1.3.5;得益于KIP-62,它的线程模型简单多了——请参见项目页面。
当前版本2.1.6(2.1.7将于今天发布)有更多选项,包括ConsumerRawareMessageListener
,您可以在其中完全访问消费者。
您不能从带有版本的侦听器直接与消费者交互
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1