当前位置: 首页 > 知识库问答 >
问题:

使用幂等Kafka生产者时的订购保证

充栋
2023-03-14

我在应用程序中使用了Kafka 1.0.1,我已经开始使用0.11中引入的幂等生产者功能,在使用幂等生产者功能时,我很难理解排序保证。

我的生产者的配置是:

启用。幂等性=真

max.in。航班请求。每连接=5

重试50次

acks=all

根据文件:

重试

设置一个大于零的值将导致客户端重新发送任何记录,如果该记录的发送失败,可能会出现暂时性错误。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。允许在不设置最大值的情况下重试。航班请求。每连接到1可能会改变记录的顺序,因为如果两个批发送到单个分区,第一个失败并重试,但第二个成功,那么第二个批中的记录可能会首先出现。

使可能幂等性

当设置为“true”时,生产者将确保流中写入每条消息的一个副本。如果为“false”,则由于代理失败等原因导致的生产者重试可能会在流中写入重试消息的副本。请注意,启用幂等性需要max.in。航班请求。每连接数小于或等于5,重试次数大于0,确认数必须为“全部”。如果用户未明确设置这些值,将选择合适的值。如果设置了不兼容的值,将引发ConfigException。

我的配置似乎是按照要求的,但它们似乎不对齐。

我还有一个问题与OutOfOrderSequenceException有关:根据文档,如果我得到这个例外,这意味着生产商有发生故障的风险。但如果我的制作人配置了max.in。航班请求。每connection=5假设第二个请求得到了无序异常,那么下面已经在运行的所有请求会发生什么情况?这是否意味着我肯定会出问题?

共有1个答案

简景焕
2023-03-14

在KafkaProducer中启用幂等性时,可以保证排序。

即使你有max.in。航班请求。每连接大于或等于1幂等KafkaProducer仍将确保主题分区内的排序。与max.in.相关的“重试次数”说明。航班仅在幂等性被禁用时适用。

幂等KafkaProducer使用一个内部递增序列号,以确保最多5英寸的顺序。拉取请求#3743中描述的航班请求:

"[...] 我们保留了5个旧批次的记录元数据。"

此外,还提供了有关启用的文档。幂等性通知最多有5个飞行请求。否则会出现ConfigException

更多细节见Kafka-5494和max.in的相关设计文件。航班

根据上述假设,解决办法如下:

>

  • 我们跟踪发送到分区的最后确认序列。这会在每次成功确认时更新,因此应始终增加。

    我们跟踪给定分区的批处理绑定的下一个序列。

    当批处理被耗尽时,我们分配了nextSequence。我们还通过批处理的记录计数来增加nextSequence。

    如果生产请求成功,我们将最后一个确认序列设置为批次的最后一个序列。

    如果生产请求失败,成功的飞行批次也将失败,出现OutOfOrderSequenceException。

    这样,如果批的序列号不是最后一个ACK序列的继承者,并且如果它与OutOfOrthRealStEngExcess失败,我们认为这是可重复的。

    当一个批次被重新排队时,我们会在将其插入队列之前删除生产者id和序列号。

    当第一批飞行中失败时(无论出于何种原因),我们将下一个序列重置为lastAckdSequence 1。

    因此,如果一个批次致命失败,那么在重试时,后续批次的序列号将不同。这很好,因为之前的失败是一个OutOfSequence异常,这绝对意味着请求被拒绝。

    "[...] 如果我的制作人配置了max.in。航班请求。每connection=5,假设第二个请求得到了无序异常,那么下面已经在运行的所有请求会发生什么情况?这是否意味着我肯定会出问题?"

    如第5步和第6步所述,所有后续的飞行中请求也将失败,并出现可重试的OutofOrderSequenceException。由于重试次数大于0,幂等KafkaProducer将能够保持顺序。

  •  类似资料:
    • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

    • 我想利用Kafka 0.11中引入的幂等生产者。根据这篇融合的博客文章,添加了一个新属性来支持这一点: 幂等性:每个分区仅一次有序语义 要启用此功能并在每个分区中准确获取一次语义,即没有重复,没有数据丢失,并且为了语义,请将生产者配置为设置“enable.idemponence=true”。 这一点既不是Spring Cloud Stream,也不是Spring Kafka文档对该属性的使用。我们

    • 我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。

    • 我的应用程序有5个以上的消费者在使用一个kafka主题的5个分区。(使用kafka版本11)我的消费者每个都产生一个消息到另一个主题,然后保存一些状态到数据库,然后做一个手动立即确认,并移动到下一个消息。 我试图解决当他们向出站主题发出成功时的场景。那么我们就会失败/失去消费者。当另一个使用者接管分区时,它将向出站主题发出另一条消息。这很糟糕:( 我发现Kafka现在有了幂等制作人,但从我读到的内

    • 我想使用Mongo变更流,使用kafka Connect将变更事件从mongoDB推送到kafka Topic中。好消息是: > Kafka在分区内维持排序。 Mongo使用全局时钟维护排序。 但是,中间呢?kafka connect怎么样?它维持订购吗?这种订购是如何运作的?我找不到他们说Kafka维持秩序的地方。 这是一个场景: 在Mongo中-更新用户Bob以获得授权 在Mongo中-将用户

    • 我有一个Spring-boot应用程序,可以听Kafka。为了避免重复处理,我尝试手动提交。为此,我在阅读主题后异步提交了一条消息。但是我被困在如何实现消费者幂等,这样记录就不会被处理两次。