当前位置: 首页 > 知识库问答 >
问题:

对使用新的Kafka幂等生产者API防止重复感到困惑

罗毅
2023-03-14

我的应用html" target="_blank">程序有5个以上的消费者在使用一个kafka主题的5个分区。(使用kafka版本11)我的消费者每个都产生一个消息到另一个主题,然后保存一些状态到数据库,然后做一个手动立即确认,并移动到下一个消息。

我试图解决当他们向出站主题发出成功时的场景。那么我们就会失败/失去消费者。当另一个使用者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(

我发现Kafka现在有了幂等制作人,但从我读到的内容来看,它只保证了一个制作人会话。

“当生产者重新启动时,将分配新的PID。因此只对单个生产者会话保证幂等性”-(blog)-https://hevodata.com/blog/kafka-精确-只对一次

这在我看来很大程度上是无用的。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会重复出站消息。

是不是我漏了什么?

共有1个答案

唐信瑞
2023-03-14

当使用事务时,您不应该使用任何基于使用者的机制(手动或其他方式)来提交偏移。

相反,您使用生成器将偏移量发送到事务,因此偏移量提交是事务的一部分。

如果配置了KafkatransactionManagerChainedKafkatransactionManager,Spring侦听器容器将在侦听器正常退出时向事务发送偏移量。

null

使用消费者提交偏移量不是事务的一部分,因此事情不会像预期的那样工作。

当使用事务管理器时,侦听器容器将生产者绑定到线程,因此任何下游的kafkatemplate操作都参与到使用者启动的事务中。请参阅文档。

 类似资料:
  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • 我在应用程序中使用了Kafka 1.0.1,我已经开始使用0.11中引入的幂等生产者功能,在使用幂等生产者功能时,我很难理解排序保证。 我的生产者的配置是: 重试50次 根据文件: 重试 设置一个大于零的值将导致客户端重新发送任何记录,如果该记录的发送失败,可能会出现暂时性错误。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。允许在不设置最大值的情况下重试。航班请求。每连接到1可能会改

  • 所以我一直在读Kafka的语义学,我对它的工作原理有点困惑。 我理解生产者如何避免发送重复的消息(以防代理的ack失败),但我不明白的是,在消费者处理消息但在提交偏移量之前崩溃的情况下,一次是如何工作的。Kafka不会在这种情况下重试吗?

  • 我想利用Kafka 0.11中引入的幂等生产者。根据这篇融合的博客文章,添加了一个新属性来支持这一点: 幂等性:每个分区仅一次有序语义 要启用此功能并在每个分区中准确获取一次语义,即没有重复,没有数据丢失,并且为了语义,请将生产者配置为设置“enable.idemponence=true”。 这一点既不是Spring Cloud Stream,也不是Spring Kafka文档对该属性的使用。我们

  • 我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。

  • ActiveMQ是否支持幂等生产者?我知道Camel有一个幂等消费者模式来检测和处理重复消息,但我想知道是否可以从源头(生产者)防止这种情况。 这里有一点背景。我有水平扩展的应用程序访问同一个数据库。有一个特定的表维护特定进程的状态。这些水平应用程序应该能够读取状态并调用另一个进程,但是只有一个应用程序能够调用它。一旦满足所需条件,该应用程序会定期轮询数据库并将消息发布到消息代理。但我希望其中一个