当侦听器处理记录后返回时提交偏移量。
如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
验证偏移量的命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1.3.发行版
容器(通过ContainerProperties
)有一个属性AckonError
,默认情况下该属性为true...
/**
* Set whether or not the container should commit offsets (ack messages) where the
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is
* effective only when the kafka property {@code enable.auto.commit} is {@code false};
* it is not applicable to manual ack modes. When this property is set to {@code true}
* (the default), all messages handled will have their offset committed. When set to
* {@code false}, offsets will be committed only for successfully handled messages.
* Manual acks will be always be applied. Bear in mind that, if the next message is
* successfully handled, its offset will be committed, effectively committing the
* offset of the failed message anyway, so this option has limited applicability.
* Perhaps useful for a component that starts throwing exceptions consistently;
* allowing it to resume when restarted from the last successfully processed message.
* @param ackOnError whether the container should acknowledge messages that throw
* exceptions.
*/
public void setAckOnError(boolean ackOnError) {
this.ackOnError = ackOnError;
}
但是请记住,如果下一个消息成功,它的偏移量无论如何都将被提交,这实际上也会提交失败的偏移量。
编辑
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?