当前位置: 首页 > 知识库问答 >
问题:

ApacheKafka:0.10版中只有一次

丌官积厚
2023-03-14

为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }

上面的代码将消息的处理异步委托给下面的另一个类。

@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}

但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?

共有3个答案

齐昆
2023-03-14

我认为使用kafka 0.10. x本身可以实现精确的一次处理。但是有一些问题。我正在分享这本书的高级思想。相关内容可以在第4章中的“寻找和精确的一次处理”部分找到:Kafka消费者-从Kafka读取数据。您可以使用(免费)safaribooksonline帐户查看那本书的内容,或者在它出来后购买,或者可能从其他来源获得它,我们不会谈论它。

想法:

想想这个常见的场景:您的应用程序从Kafka中读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想将相同的结果存储在数据库中两次。

如果有办法在一个原子动作中同时存储记录和偏移量,这是可行的。要么记录和偏移都已提交,要么两者都未提交。为了实现这一点,我们需要在一个事务中将记录和偏移量写入数据库。然后我们就知道要么我们处理完了记录,偏移量被提交了,要么我们没有,记录将被重新处理。

现在唯一的问题是:如果记录存储在数据库中而不是Kafka中,那么当分配给它一个分区时,我们的消费者如何知道从哪里开始读取?这正是seek()。当使用者启动或分配新分区时,它可以在数据库中查找偏移量并seek()到该位置。

书中的示例代码:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        commitDBTransaction(); 
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions)
        consumer.seek(partition, getOffsetFromDB(partition)); 
    }
}

consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);

for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        processRecord(record);
        storeRecordInDB(record);
        storeOffsetInDB(record.topic(), record.partition(), record.offset()); 
    }
    commitDBTransaction();
}

燕宏胜
2023-03-14

ApacheKafka 0.11.0.0刚刚发布,现在它只支持一次交付。

http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98-精确一次交付和事务消息传递

东郭承业
2023-03-14

0.10.2及更高版本的原始答案(0.11及更高版本请参阅答案)

目前,Kafka无法提供开箱即用的精确一次处理。如果您在成功处理消息后提交消息,您可以进行至少一次处理,或者如果您在开始处理之前直接在投票()之后提交消息,您可以进行最多一次处理。

(另请参见本节“交货保证”一段。)http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-(提交)

但是,如果您的处理是幂等的,则至少一次保证“足够好”,即即使您处理两次记录,最终结果也将是相同的。幂等处理的示例是将消息添加到键值存储中。即使您两次添加相同的记录,第二次插入也只会替换第一个当前键值对,KV存储中仍然会有正确的数据

在上面的示例代码中,您更新了一个HashMap,这将是一个幂等运算。即使在发生故障的情况下,如果在崩溃之前只执行了两个put调用,也可能会出现不一致的状态。然而,这种不一致的状态将被修复为再次重新处理同一记录。

println()的调用不是幂等的,因为这是一个具有“副作用”的操作。但我猜打印仅用于调试目的。

作为替代方案,您需要在用户代码中实现事务语义,在出现故障时需要“撤消”(部分执行)操作。总的来说,这是一个难题。

Apache Kafka 0.11的更新(关于0.11之前的版本,请参见上面的答案)

从0.11开始,Apache Kafka支持幂等生产者、事务性生产者和使用Kafka Streams的精确一次处理。它还向消费者添加了"read_committed"模式,以仅读取已提交的消息(并删除/过滤中止的消息)。

  • https://kafka.apache.org/documentation/#semantics
  • https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • https://www.confluent.io/blog/transactions-apache-kafka/
  • https://www.confluent.io/blog/enabling-exactly-kafka-streams/
 类似资料:
  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 项目中总是有很多图标,一般情况下都是切成很小的图片,但是有多个颜色就不好处理了,这时候字体图标就很有优势了,能够像修改文字那样修改图标的大小和颜色,而且还不会失真变模糊,简直是简化前端开发的一大工具。 如果想要直接使用它们自带的官方图标,选择我们想要的图标之后,点击右下角的 Generate Font F 这时会看到我们已经选择的图标,在这里可以修改名称等,然后点击右下角的下载,解压文件,可以查看

  • 我正在制作一个小的浏览器游戏,我有一个数据库,里面存储了用户的高分。这里是数据库的映像(name是用户名,M1_CPM是分数) 使用下面的代码,我试图获得前10个值,以便以后在排行榜上显示它们: 问题是,总是只呼应最高分,不呼应前十名。为什么?谢谢^^

  • 我需要在JavaFX中执行如下操作: 你能告诉我如何在GridPane布局中只改变一列(中间的一列)的大小吗?这些黑色边框的矩形是VBoxs。基本上,VBoxs的宽度必须保持不变,这个红色标记区域的宽度应该调整。有什么想法或建议吗?

  • 我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但

  • 我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。 将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。 我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对