我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
如果你想控制提交行为,你需要像这样禁用自动提交:
ReceiverOptions.create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
然后,一旦记录被处理,您需要提交:
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
.doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered
实现如下:
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, preparePublishOnQueueSize(prefetch))
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords) {
handler.acknowledge(r);
}
})));
}
因此,只有当它的Flux
被完全处理时(成功或出错),才会对每个消费者记录
进行ack'ed。因此,它不是每次记录提交。从技术上讲,它无论如何都不能是每条记录,因为我们只需要提交我们的消费者应用程序失败,我们需要从之前留下的偏移量继续。当前活动的Kafka消费者
将光标保留在内存中,并且不关心您是否提交。
如果确实需要“每记录”,请参见ReactiveKafkaConsumerTemplate。receive()
及其KafkaReceiver。receive()
委托:
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
default Flux<ReceiverRecord<K, V>> receive() {
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我正在设计一个ApacheStorm拓扑(使用streamparse),它由一个喷口(ApacheKafka喷口)和一个具有并行性的螺栓构建 螺栓分批读取信息。如果批量成功完成,我手动提交apache kafka偏移。 当mysql上的螺栓插入失败时,我不会在Kafka中提交偏移量,但是一些消息已经在喷口发送到螺栓的消息队列中。 应该删除队列中已经存在的消息,因为我无法在不丢失先前失败消息的情况下
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1