我对SpringBoot中的Kafka批处理侦听器有问题。
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, this.maxFetchBytesMaxPartition);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.maxFetchBytesMax);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, ByteArrayDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBuffer);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minFetch);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitFetch);
return props;
}
@Bean
public DefaultKafkaConsumerFactory<String, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
try {
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
} catch(Exception e) {
logger.error("Error KafkaListenerContainerFactory: {}", e.getMessage());
}
return factory;
}
这是@KafkaListener
@KafkaListener(autoStartup = "${kafka-startup}", groupId = "${kafka-group}", topics = "${queue}",
containerFactory = "kafkaListenerContainerFactory", concurrency = "${concurrency}")
public void listen(@Payload List<byte[]> messages,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
Acknowledgment ack) throws Exception {
int indexQueue = new Random().nextInt(queues.size());
for (int i = 0; i < messages.size(); i++) {
//Do somethings
ack.acknowledge();
}
}
对于我的问题,这个解决方案不起作用,因为ack。确认()
提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。
我尝试使用Kafka消费者
例如:信息1-
脚本正在阅读:
信息1(偏移量10)-
在下一个循环中,脚本重新读取偏移量为10的消息1,但我希望看到偏移量为12的消息3。
你有什么想法吗?你能帮我吗?
非常感谢。
批处理侦听器的确认
只应调用一次。
现在(从2.3开始)可以调用确认。nack(Thisone失败,睡眠)
看见https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-抵消
从2.3版开始,
确认
界面有两个附加方法nack(长睡眠)
和nack(int-index,long-sleep)
。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发IllegalStateException。
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个