当前位置: 首页 > 知识库问答 >
问题:

Kafka中的提交偏移量

阎祖鹤
2023-03-14

我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。

public Flux<String> consume(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
    }

我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

共有2个答案

滕璞瑜
2023-03-14

如果你想控制提交行为,你需要像这样禁用自动提交:

ReceiverOptions.create()
    .commitInterval(Duration.ZERO)             
    .commitBatchSize(0)

然后,一旦记录被处理,您需要提交:

final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
                .doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered
端木涵润
2023-03-14

实现如下:

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
    return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
        .receive()
        .filter(it -> !it.isEmpty())
        .publishOn(scheduler, preparePublishOnQueueSize(prefetch))
        .map(consumerRecords -> Flux.fromIterable(consumerRecords)
            .doAfterTerminate(() -> {
                for (ConsumerRecord<K, V> r : consumerRecords) {
                    handler.acknowledge(r);
                }
            })));
}

因此,只有当它的Flux被完全处理时(成功或出错),才会对每个消费者记录进行ack'ed。因此,它不是每次记录提交。从技术上讲,它无论如何都不能是每条记录,因为我们只需要提交我们的消费者应用程序失败,我们需要从之前留下的偏移量继续。当前活动的Kafka消费者将光标保留在内存中,并且不关心您是否提交。

如果确实需要“每记录”,请参见ReactiveKafkaConsumerTemplate。receive()及其KafkaReceiver。receive()委托:

/**
 * Starts a Kafka consumer that consumes records from the subscriptions or partition
 * assignments configured for this receiver. Records are consumed from Kafka and delivered
 * on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
 * when the returned Flux terminates.
 * <p>
 * Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
 * to commit the offset corresponding to the record. Acknowledged records are committed
 * based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
 * Records may also be committed manually using {@link ReceiverOffset#commit()}.
 *
 * @return Flux of inbound receiver records that are committed only after acknowledgement
 */
default Flux<ReceiverRecord<K, V>> receive() {
 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我正在设计一个ApacheStorm拓扑(使用streamparse),它由一个喷口(ApacheKafka喷口)和一个具有并行性的螺栓构建 螺栓分批读取信息。如果批量成功完成,我手动提交apache kafka偏移。 当mysql上的螺栓插入失败时,我不会在Kafka中提交偏移量,但是一些消息已经在喷口发送到螺栓的消息队列中。 应该删除队列中已经存在的消息,因为我无法在不丢失先前失败消息的情况下

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1