当前位置: 首页 > 知识库问答 >
问题:

避免重复Kafka生产者消息

柯耀
2023-03-14

我正在使用Spring Boot中的Kafka模板。Java 8

我的主要目的是,消费者不应重复使用信息。

1)调用表获取100行并将其发送到kafka

2) 假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在RETRY机制计时内无法恢复)

因此,当我重新启动Spring启动应用程序时,我如何确保不再发送这70条消息。

一种选择是我可以在数据库表消息 is_sent = Y 或 N 中使用标志。

还有其他有效的方法吗?

共有2个答案

孙明德
2023-03-14

我将使用JDBC源连接器(取决于您当前使用的数据库)和Kafka Connect来正确处理此场景。

如果你仍然想编写自己的制作人,Kafka FAQ 的这一部分应该很有用:

我如何从Kafka那里得到一次信息?

恰好一次语义有两个部分:避免数据生产期间的重复和避免数据消费期间的重复。

有两种方法可以在数据生产过程中获得一次语义:

    < li >对每个分区使用单写入程序,并且每次出现网络错误时,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功 < li >在消息中包含一个主键(UUID或其他),并在消费者端进行重复数据删除。

如果您执行其中一项操作,那么Kafka所承载的日志将不会重复。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是以原子方式写入的,那么这里也可能会出现重复。此问题是存储系统特有的。例如,如果您正在使用数据库,则可以在事务中一起提交这些数据库。LinkedIn编写的HDFS加载器Camus为Hadoop加载提供了类似的功能。另一种不需要事务的方法是使用主题/分区/偏移量组合来存储偏移量和加载的数据并进行重复数据消除。

我认为有两个改进会让这变得容易得多:

  1. 通过可选地在服务器上集成对生产者幂等的支持,生产者幂等可以自动完成,而且成本更低
  2. 现有的高级消费者没有暴露出很多更精细的偏移控制(例如,重置您的位置)。我们很快就会着手
墨承泽
2023-03-14

对于Kafka,我看到了存储指向id的指针以跟踪您在主题中的位置的实现,以及使用某种分布式存储以在集群级别跟踪这一点的实现。我还没有做太多的工作,所以我将尝试提供一个我们与SQS一起用于重复检测的解决方案。很可能Kafka有一个更好的解决方案,这是一个解决重复问题的方案,只是想添加到那里,以便您也可以查看其他解决方案。

我在与AWS SQS合作点对点消息传递用例时遇到了同样的问题,因为它提供了至少一次交付保证,而不是一次且仅一次。

我们最终使用 Redis 及其分布式锁定策略来解决这个问题。我 https://angularthinking.blogspot.com/ 这里有一篇文章。

高级方法是创建一个分布式锁,将一个条目放入缓存中,并根据您的用例使用适当的TTL。我们使用LUA脚本执行putIfNotExists()方法,如上面的博客所示。规模是我们关注的问题之一,通过上述实现,我们能够每秒处理10万条消息,而SQS和redis的规模都非常好。我们必须根据吞吐量和缓存增长将TTL调整为最佳值。我们确实有24小时或更短的复制窗口的好处,因此,根据redis进行此决定是可以的。如果您有更长的窗口,重复可能会在几天或几个月内发生,redis选项可能不适用。

我们还研究了DynamoDB来实现putIfNotExists(),但redis似乎在这个用例中性能更好,尤其是它使用LUA脚本实现了本机putIfNotExists。

祝你的搜索好运。

 类似资料:
  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht

  • 问题内容: 有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息? 我尝试手动定义JMSMessageID((message.setJMSMessageID(“ uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。 根据规范,我没有找到有关如何删除邮件重复数据的参考。 在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性or

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?