Spring kafka-多个消费者收到相同消息


我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。

@KafkaListener(topics = "topicName",
            groupId = "groupId",
            containerFactory = "kafkaListenerFactory")
    public void onMessage(ConsumerRecord record) {
       logger.info("Received Message from kafka topic " + record.topic() + " with record key " + kafkaRecordKey + " partition " + record.partition() + " offset " +record.offset());




Time: 5/27/22 6:28:52.864 PM
message: Received Message from kafka topic payment-topic with record key ti9:a1956769-28d2-4329-a0ff-9003003a3cde partition 4 offset 325
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69

Time: 5/27/22 6:28:52.864 PM
message: Received Message from kafka topic payment-topic with record key ti9:a1956769-28d2-4329-a0ff-9003003a3cde partition 4 offset 325
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1
threadId: 66



Time: 5/27/22 6:28:52.887 PM
message: [Consumer clientId=consumer-payment-consumer-5, groupId=payment-consumer] Offset commit failed on partition payment-topic-4 at offset 326: The coordinator is not aware of this member.
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69 

Time: 5/27/22 6:28:53.902 PM
message: Error handler threw an exception
thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1
threadId: 69
   threadPriority: 5
   thrown: { [-]
     cause: { [+]
     commonElementCount: 0
     extendedStackTrace: [ [+]
     localizedMessage: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
     message: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
     name: org.springframework.kafka.KafkaException



此外,错误日志显示“偏移量为 326 处的分区付款主题 4 上的偏移量提交失败”,用于偏移量为 325 处的消息


Spring boot - 2.5.7
org.springframework.kafka.spring-kafka - 2.7.8
org.apache.kafka.kafka-clients - 2.8.1





