当前位置: 首页 > 面试题库 >

避免在Apache Kafka Consumer中重复消息的有效策略

滑文昌
2023-03-14
问题内容

我已经学习apache
kafka一个月了。但是,我现在陷入了困境。我的用例是,我有两个或多个在不同计算机上运行的使用者进程。我进行了一些测试,在其中我在kafka服务器中发布了10,000条消息。然后,在处理这些消息时,我杀死了一个使用者进程并重新启动了它。消费者正在将处理后的消息写入文件中。因此,使用结束后,文件显示了超过1万条消息。因此,某些消息是重复的。

在使用者过程中,我已禁用自动提交。消费者手动批量提交偏移量。因此,例如,如果将100条消息写入文件,使用者将提交偏移量。当单个使用者进程正在运行并且崩溃并恢复时,可以通过这种方式避免重复。但是,当一个以上的使用者运行时,其中一个崩溃并恢复时,它将重复的消息写入文件。

有没有有效的策略来避免这些重复的消息?


问题答案:

最简洁的答案是不。

您要查找的是一次精确的处理。尽管它似乎经常可行,但永远不要依赖它,因为总会有一些警告。

即使为了防止重复,您也需要使用简单的使用者。对于每个使用者,此方法的工作方式是:从某个分区使用一条消息时,将使用的消息的分区和偏移量写入磁盘。当使用者在故障后重新启动时,请从磁盘读取每个分区的上一个消耗的偏移量。

但是,即使采用这种模式,使用者也无法保证在失败后不会重新处理消息。如果使用者使用一条消息然后在将偏移量刷新到磁盘之前失败,该怎么办?如果在处理消息之前先写磁盘,如果在实际处理消息之前先写偏移量然后失败,该怎么办?即使您在每条消息之后将偏移量提交给ZooKeeper,也将存在相同的问题。

但是,在某些情况下,更精确的一次处理是可以实现的,但仅适用于某些用例。这仅要求将偏移量存储在与单元应用程序输出相同的位置。例如,如果编写一个对消息进行计数的使用者,则通过将最后计数的偏移量与每个计数一起存储,可以保证该偏移量与使用者的状态同时存储。当然,为了保证处理一次,这将要求您只消耗一条消息并为每条消息更新一次状态,这对于大多数Kafka消费者应用程序来说是完全不切实际的。从本质上来说,Kafka出于性能原因而批量使用消息。

通常,如果仅将其设计为幂等的,则您的时间将花费更多,并且应用程序将更加可靠。



 类似资料:
  • 问题内容: 有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息? 我尝试手动定义JMSMessageID((message.setJMSMessageID(“ uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。 根据规范,我没有找到有关如何删除邮件重复数据的参考。 在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性or

  • 我正在使用Spring Boot中的。Java 8 我的主要目的是,消费者不应重复使用信息。 1)调用表获取100行并将其发送到kafka 2) 假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在RETRY机制计时内无法恢复) 因此,当我重新启动Spring启动应用程序时,我如何确保不再发送这70条消息。 一种选择是我可以在数据库表消息 中使用标志。 还有其他有效的方法吗?

  • 本文向大家介绍避免在MongoDB中重复输入?,包括了避免在MongoDB中重复输入?的使用技巧和注意事项,需要的朋友参考一下 为了避免在MongoDB中重复输入,可以使用。语法如下- 让我们实现以上语法。避免在MongoDB中重复条目的查询如下- 现在在上面的集合中插入一些记录。插入记录的查询如下- 每当您尝试再次插入相同记录时,都会出现此错误- 让我们插入另一条记录。查询如下- 在method

  • 你好,我有一个简单的mysql查询,我需要显示唯一的文件名,现在查询显示重复的文件名,我想避免这种情况

  • 钱箱类: 商户类: 输入数据: 我的任务 计算每个商家的总金额并返回商家列表 我正在尝试使用Stream API解决这个任务。并编写了以下代码: 结果 但显然,流返回四个对象,而不是所需的两个对象。我意识到,地图(第二行)为每个cashBoxId创建了四个对象。而且我不知道如何通过进行过滤,也不知道如何获得没有重复的结果。

  • 我使用for循环将学生详细信息添加到ArrayList。当我给第二个学生提供详细信息时,它会覆盖第一个数据。螺柱类 将数据添加到ArrayList的主类。 输出:第二个数据[2,2]后的实际输出辊数[1]。预期输出应为卷号: A1姓名: F1 L1性别:男性年龄: 11体育赛事:标枪 报名号:A2姓名:F2 L2性别:女年龄:14体育项目:100米跑 报名号:A3姓名:F3 L3性别:男性年龄:1