根据Kafka的文件:
kafka保证主题分区只分配给组中的一个消费者。
但我在服务中观察到了不同的行为。以下是一些细节:
我用的是Kafka2.8和SpringKafka2.2.13。
最初我有一个Kafka主题主题。1
包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的@KafkaListener
注释,并发性=5。这个配置对我来说很好。
后来,我开始使用topic.2
,在同一个服务中使用相同的CafkaListenerContainerFactory和相同的组ID,其中包含3个分区。
在一段时间内,它也正常工作,但在其中一个重新平衡过程中,一个使用者线程没有包括在重新平衡过程中,并且由于某些原因,继续处理以前分配的分区,即,一个新的使用者被分配到该分区,而旧的使用者继续处理同一分区,因此相同的记录被处理了两次。在日志中,我看到来自该分区的记录在几天内在两个不同的使用者线程中被消耗和处理了两次。
服务重启后,消费者再次被正确分配。
以下是我的代码,ConcurrentKafkaListenerContainerFactory
bean创建:
@Configuration
public class Config {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> transactionalKafkaContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTransactionManager kafkaTransactionManager,
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
//If an exception is thrown, then we want to seek back for the whole batch
factory.setBatchErrorHandler(new FixedBackoffSeekToCurrentBatchErrorHandler());
//Enable transactional consumer for exactly-once transitivity
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
//Enable batch-processing
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.setBatchListener(true);
factory.setConcurrency(5);
return factory;
}
@Bean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(
@Qualifier("transactionalProducerFactory") ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
@Bean
public ProducerFactory<?, ?> transactionalProducerFactory() {
DefaultKafkaProducerFactory<?, ?> producerFactory = new DefaultKafkaProducerFactory<>(
kafkaProperties.buildProducerProperties()
);
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
}
@kafkaListener
-s:
@Service
public class Processor {
@KafkaListener(topics = "topic.1",
groupId = "my-group",
containerFactory = "transactionalKafkaContainerFactory")
public void listener1(List<ConsumerRecord<String, MyObject>> records) {
// process records
}
// Listener for topic.2, added later
@KafkaListener(topics = "topic.2",
groupId = "my-group",
containerFactory = "transactionalKafkaContainerFactory")
public void listener2(List<ConsumerRecord<String, MyObjec>> records) {
// process records
}
}
以下是一些日志,其中一个消费者(消费者-7)未包括在重新平衡过程中,并继续使用他的旧分区(主题1-3),而新消费者(消费者-4)被分配到同一分区:
03/30/2022 10:23:47.484 [Consumer clientId=consumer-8, groupId=my-group] Setting newly assigned partitions [topic.1-1, topic.1-0]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-4, groupId=my-group] Setting newly assigned partitions [topic.1-3]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-2, groupId=my-group] Setting newly assigned partitions [topic.2-0]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-6, groupId=my-group] Setting newly assigned partitions [topic.1-4]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-5, groupId=my-group] Setting newly assigned partitions [topic.2-2]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-1, groupId=my-group] Setting newly assigned partitions [topic.1-2]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-3, groupId=my-group] Setting newly assigned partitions [topic.2-1]
...
03/30/2022 10:53:55.728 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
注意:业务逻辑细节在这里并不重要,所以我使用了简单的名称:topic.1
,Procencer
,my-group
等
我的问题是如何解释这种行为?为什么consumer-7能够在重新平衡后使用其旧分区中的消息?使用相同的组ID添加新的@KafkaListener
是否会导致此问题?(至少在我只有一个@KafkaListener
的时候,我没有看到这种行为)
在那些日志中我看不到任何关于你假设的证据。
不过,有一种可能性是,consumer-7花了太长时间来处理他的民意调查记录,他被强行驱逐出该群体(直到下一次民意调查),所以,是的,这是可能的;他将继续处理上一次投票的所有记录,直到下一次投票时,他将检测到重新平衡。您应该确保可以处理max.poll。在
max.poll中记录
。间隔ms
以避免此类情况。
在任何情况下,在这种情况下使用同一组都不是好的做法,因为一个主题上的再平衡会导致另一个主题上不必要的再平衡,这是不可取的。
您还应该升级到受支持的Spring kafka版本;2.2. x长期不支持。
https://spring.io/projects/spring-kafka#support
目前的版本是2.8.5;2.7.x很快就不再支持OSS了。
我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。 CLI还显示 bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。 然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于
我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。 在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。 当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。 e、 由instanceA(partition1-5)使用的g分
我正在编写一个概念验证应用程序来使用Apache Kafka0.9.0.0中的消息,看看是否可以使用它而不是通用的JMS消息代理,因为Kafka提供了好处。这是我的基本代码,使用新的消费者API: 我使用默认设置启动了一个kafka服务器,并使用shell工具启动了一个kafka生产者,以便将消息写入我的主题。然后,我使用这段代码与两个使用者连接,发送正确的服务器来连接,发送主题来订阅,其他一切都
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “