我有一个自定义的Kafka消费者,我用它向REST API发送一些请求。根据API的响应,我要么提交偏移量,要么跳过没有提交的消息。
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}
现在,当来自REST API的响应没有以2
开头时,偏移量不会提交,但消息不会被重新使用。如何强制使用者重新使用未提交的偏移信息?
如果计划使用seek(),请确保您的数据是幂等的。由于您是有选择地提交偏移量,所以被遗漏的记录可能会在提交(成功处理)记录之前。如果您执行seek()--它将GroupID的指针移动到未提交的偏移量并开始重播,您也将得到那些成功处理的消息。它也有可能成为一个无限循环。
或者,您可以将不成功记录的元数据保存在内存或db中,并从“poll(retention.ms)”开始重播主题,以便重播所有记录,但添加一个筛选器,只处理那些通过API进行的元数据与您先前保存的数据匹配的记录。每小时或几小时进行一次批处理。
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1