当前位置: 首页 > 知识库问答 >
问题:

Spring kafka-多个消费者收到相同消息

卫高明
2023-03-14

我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。

@KafkaListener(topics = "topicName",
            groupId = "groupId",
            containerFactory = "kafkaListenerFactory")
    public void onMessage(ConsumerRecord record) {
       logger.info("Received Message from kafka topic " + record.topic() + " with record key " + kafkaRecordKey + " partition " + record.partition() + " offset " +record.offset());

   }

应用程序的单个实例,并发数为6。

该主题有6个分区。

Time: 5/27/22 6:28:52.864 PM
message: Received Message from kafka topic payment-topic with record key ti9:a1956769-28d2-4329-a0ff-9003003a3cde partition 4 offset 325
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69

Time: 5/27/22 6:28:52.864 PM
message: Received Message from kafka topic payment-topic with record key ti9:a1956769-28d2-4329-a0ff-9003003a3cde partition 4 offset 325
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1
threadId: 66

从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。

每个线程继续处理消息。最后,其中一个消费者失败了,错误如下

Time: 5/27/22 6:28:52.887 PM
message: [Consumer clientId=consumer-payment-consumer-5, groupId=payment-consumer] Offset commit failed on partition payment-topic-4 at offset 326: The coordinator is not aware of this member.
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69 

Time: 5/27/22 6:28:53.902 PM
message: Error handler threw an exception
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69
   threadPriority: 5
   thrown: { [-]
     cause: { [+]
     }
     commonElementCount: 0
     extendedStackTrace: [ [+]
     ]
     localizedMessage: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
     message: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
     name: org.springframework.kafka.KafkaException

我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条。

请告知为什么多个消费者收到相同的消息。

此外,错误日志显示“偏移量为 326 处的分区付款主题 4 上的偏移量提交失败”,用于偏移量为 325 处的消息

库版本

Spring boot - 2.5.7
org.springframework.kafka.spring-kafka - 2.7.8
org.apache.kafka.kafka-clients - 2.8.1

共有1个答案

井嘉胜
2023-03-14

记录的处理时间必须小于max.poll.interval。ms</code>否则会发生重新平衡,很可能当前处理的记录偏移量未提交,因此另一个分配的使用者仅从该分区的先前提交的偏移量中提取。

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_max.poll.interval.ms

 类似资料:
  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 我有一个主应用程序将消息发送到SQS队列,希望4个消费者应用程序使用相同的消息,并按自己的意愿进行处理 我不确定用于此目的的队列体系结构。 我看到标准SQS、SQS FIFO、(SQS SNSTopic)的选项 对于我想要的功能,似乎(SQS SNS主题)或Kenesis将是一条可行的道路。 但是我也有一个关于标准SQS的问题 我想我是混淆之间的所有选项和压倒了所有的信息可用的队列但仍然感到困惑哪

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。