当前位置: 首页 > 知识库问答 >
问题:

Spring cloud stream-Kafka消费者使用StreamListener消费重复消息

仲俊豪
2023-03-14

在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。

我们使用的是spring cloud stream/spring cloud stream活页夹kafka-2.1.2,配置如下:

spring:
  cloud:
    stream:
      default.consumer.concurrency: 1 
      default-binder: kafka
      bindings:
        channel:
          destination: topic
          content_type: application/json
          autoCreateTopics: false
          group: group
          consumer:
            maxAttempts: 1
      kafka:
        binder:
          autoCreateTopics: false
          autoAddPartitions: false
          brokers: brokers list
        bindings:
          channel:
            consumer:
              autoCommitOnError: true
              autoCommitOffset: true
              configuration:
                max.poll.interval.ms: 1000000
                max.poll.records: 1 
                group.id: group

我们使用@Streamlisteners来使用消息。

下面是我们收到的重复实例和服务器日志中捕获的错误消息。

错误46---[container-0-C-1]o.a.k.C.C.内部构件。ConsumerCoordinator:[Consumer clientId=Consumer-3,groupId=group]在偏移量1291358处对分区主题0的偏移量提交失败:协调器不知道此成员。错误46---[container-0-C-1]o.s.kafka。听众。LoggingErrorHandler:处理时出错:清空组织。阿帕奇。Kafka。客户。消费者CommitFailedException:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着后续调用poll()的间隔时间比配置的max.poll长。间隔ms,这通常意味着轮询循环花费了太多时间来处理消息。您可以通过增加会话超时或使用max.poll减少poll()中返回的批处理的最大大小来解决此问题。记录。在org。阿帕奇。Kafka。客户。消费者内部。消费者协调员$OffsetCommitResponseHandler。handle(ConsumerCoordinator.java:871)~[kafka-clients-2.0.1.jar!/:na]

没有崩溃,所有实例在重复时都是健康的。此外,由于消息被成功处理了两次,错误日志-处理时错误:null也很混乱。max.poll.interval.ms: 100000大约是16分钟,应该有足够的时间来处理系统的任何消息,会话超时和心位配置是默认的。在大多数实例中,重复在2秒内收到。我们缺少什么配置?非常感谢任何建议/帮助。

共有2个答案

鲍向笛
2023-03-14

此外,请记住StreamListener和基于注释的编程模型已经被弃用了3年,并且已经从当前的main中删除,这意味着下一个版本将不会有它。请将您的解决方案迁移到基于函数的编程模型

孔弘盛
2023-03-14

提交无法完成,因为组已经重新平衡

重新平衡是因为您的侦听器花费了太长时间;您应该调整max.poll.recordsmax.poll.interval.ms,以确保您总是能够在时限内处理收到的记录。

在任何情况下,Kafka都不保证完全一次交付,只保证至少一次交付。您需要在应用程序中添加幂等性,并检测/忽略重复。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。