当前位置: 首页 > 知识库问答 >
问题:

Kafka如何保证消费者不会将一条信息读两遍?

谭泳
2023-03-14

Kafka如何保证消费者不会将一条信息读两遍?

或者上述情况是否可能?同一条信息可以被单个消费者或多个消费者阅读两次吗?

共有3个答案

艾安和
2023-03-14

确切地说,Kafka保证:

  1. Kafka为分区中的消息提供顺序保证
  2. 当生成的消息被写入分区的所有同步副本时,它们被视为已提交
  3. 只要至少有一个副本还活着,提交的消息就不会丢失
  4. 消费者只能读取提交的消息

关于消费消息,消费者通过保存内部压缩的Kafka主题中读取的最后一个偏移量来跟踪分区中的进度。

如果启用了enable.auto.commit,kafka消费者可以自动提交偏移量。然而,这将给“最多一次”语义学。因此,通常标志被禁用,开发人员在处理完成后显式提交偏移量。

邓欣德
2023-03-14

如果不使消费过程幂等,这是绝对可能的。

例如您正在实现至少一个传递语义,首先处理消息,然后提交偏移量。由于服务器故障或重新平衡,可能无法提交偏移量。(可能你的消费者在那个时候被撤销了)所以当你投票时,你会收到两条相同的消息。

文心思
2023-03-14

有许多情况下,导致消费者消费重复的消息

  1. 生产者已成功发布消息,但未能确认重试同一消息的原因

为了保证不使用重复的消息,作业的执行和提交偏移量必须是原子的,以保证在消费者端精确地传递一次语义。您可以使用下面的参数来实现一个语义。但是请你明白这是与性能相妥协的。

  1. 在生产者端启用幂等性,这将保证不发布相同的消息两次enable.idempotence=true
  2. 定义交易(isolation.level)read_committedisolation.level=read_committed

在Kafka流中,可以通过将语义设置为一次true,使其成为单元事务来实现上述设置

幂等元

幂等传递使制作者能够在单个制作者的生命周期内将消息准确地写入Kafka到主题的特定分区一次,而不会丢失数据和每个分区的顺序。

交易(isolation.level)

事务使我们能够自动更新多个主题分区中的数据。事务中包含的所有记录都将被成功保存,或者没有记录。它允许您在同一交易中提交您的消费者补偿以及您处理的数据,从而允许端到端的完全一次语义学。

生产者不等待向Kafka写入消息,而生产者使用beginTransaction、commitTransaction和abortTransaction(如果失败),消费者使用隔离。级别为read_committed或read_uncommitted

  • read_committed:消费者将始终只读取已提交的数据。
  • read_uncommitted:按偏移顺序读取所有消息,而无需等待事务提交

请参阅更多详细参考资料

 类似资料:
  • 我们有一个Kafka制作人,偶尔会制作一些信息。 我写了一个消费者来消费这些消息。问题是,只有当两个消息叠加时,它们才会被使用。例如,如果消息是在13:00产生的,消费者不做任何事情。如果另一条消息是在13:01生成的,则消费者会使用这两条消息。在kafkaTool中,在消费者属性中有一个名为LAG的列,当消息未被消费时,该列为1。我缺少的这个东西有什么配置吗? 消费者配置:

  • 我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 我有一个简单的Kafka消费者微服务应用程序,它使用来自某个主题的消息,同一个应用程序运行在两个不同的池中。 所以,当消息由制作人生成,而我的应用程序尝试使用来自主题的消息时,它只被一个池中的一个人使用。 如何停止从消费者Kafka读取并发消息。我想在两个池中使用相同的消息。 这种情况下可能的解决方案是什么