为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示
public void commitOneRecordConsumer(long seconds) {
KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
processingService.process(record);
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));
System.out.println("Committed Offset" + ": " + record.offset());
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
}
上面的代码将消息的处理异步委托给下面的另一个类。
@Service
public class ProcessingService {
@Async
public void process(ConsumerRecord<String, String> record) throws InterruptedException {
Thread.sleep(5000L);
Map<String, Object> map = new HashMap<>();
map.put("partition", record.partition());
map.put("offset", record.offset());
map.put("value", record.value());
System.out.println("Processed" + ": " + map);
}
}
但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?
我认为使用kafka 0.10. x本身可以实现精确的一次处理。但是有一些问题。我正在分享这本书的高级思想。相关内容可以在第4章中的“寻找和精确的一次处理”部分找到:Kafka消费者-从Kafka读取数据
。您可以使用(免费)safaribooksonline帐户查看那本书的内容,或者在它出来后购买,或者可能从其他来源获得它,我们不会谈论它。
想法:
想想这个常见的场景:您的应用程序从Kafka中读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想将相同的结果存储在数据库中两次。
如果有办法在一个原子动作中同时存储记录和偏移量,这是可行的。要么记录和偏移都已提交,要么两者都未提交。为了实现这一点,我们需要在一个事务中将记录和偏移量写入数据库。然后我们就知道要么我们处理完了记录,偏移量被提交了,要么我们没有,记录将被重新处理。
现在唯一的问题是:如果记录存储在数据库中而不是Kafka中,那么当分配给它一个分区时,我们的消费者如何知道从哪里开始读取?这正是seek()。当使用者启动或分配新分区时,它可以在数据库中查找偏移量并
seek()
到该位置。
书中的示例代码:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}
ApacheKafka 0.11.0.0刚刚发布,现在它只支持一次交付。
http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98-精确一次交付和事务消息传递
0.10.2及更高版本的原始答案(0.11及更高版本请参阅答案)
目前,Kafka无法提供开箱即用的精确一次处理。如果您在成功处理消息后提交消息,您可以进行至少一次处理,或者如果您在开始处理之前直接在投票()
之后提交消息,您可以进行最多一次处理。
(另请参见本节“交货保证”一段。)http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-(提交)
但是,如果您的处理是幂等的,则至少一次保证“足够好”,即即使您处理两次记录,最终结果也将是相同的。幂等处理的示例是将消息添加到键值存储中。即使您两次添加相同的记录,第二次插入也只会替换第一个当前键值对,KV存储中仍然会有正确的数据。
在上面的示例代码中,您更新了一个HashMap
,这将是一个幂等运算。即使在发生故障的情况下,如果在崩溃之前只执行了两个put
调用,也可能会出现不一致的状态。然而,这种不一致的状态将被修复为再次重新处理同一记录。
对println()
的调用不是幂等的,因为这是一个具有“副作用”的操作。但我猜打印仅用于调试目的。
作为替代方案,您需要在用户代码中实现事务语义,在出现故障时需要“撤消”(部分执行)操作。总的来说,这是一个难题。
Apache Kafka 0.11的更新(关于0.11之前的版本,请参见上面的答案)
从0.11开始,Apache Kafka支持幂等生产者、事务性生产者和使用Kafka Streams的精确一次处理。它还向消费者添加了"read_committed"
模式,以仅读取已提交的消息(并删除/过滤中止的消息)。
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
项目中总是有很多图标,一般情况下都是切成很小的图片,但是有多个颜色就不好处理了,这时候字体图标就很有优势了,能够像修改文字那样修改图标的大小和颜色,而且还不会失真变模糊,简直是简化前端开发的一大工具。 如果想要直接使用它们自带的官方图标,选择我们想要的图标之后,点击右下角的 Generate Font F 这时会看到我们已经选择的图标,在这里可以修改名称等,然后点击右下角的下载,解压文件,可以查看
我正在制作一个小的浏览器游戏,我有一个数据库,里面存储了用户的高分。这里是数据库的映像(name是用户名,M1_CPM是分数) 使用下面的代码,我试图获得前10个值,以便以后在排行榜上显示它们: 问题是,总是只呼应最高分,不呼应前十名。为什么?谢谢^^
我需要在JavaFX中执行如下操作: 你能告诉我如何在GridPane布局中只改变一列(中间的一列)的大小吗?这些黑色边框的矩形是VBoxs。基本上,VBoxs的宽度必须保持不变,这个红色标记区域的宽度应该调整。有什么想法或建议吗?
我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但
我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。 将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。 我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对