当前位置: 首页 > 知识库问答 >
问题:

Spring的 Kafka 消费者可以通过编程方式重新分配分区吗?

施敏达
2023-03-14

我是Kafka的新手,用< code > @ KafkaListener (spring)来定义Kafka消费者。我想检查是否有可能在运行时将分区手动分配给用户。

例如,当应用程序启动时,我不想“消耗”任何数据。我正在使用当前@KafkaListener(自动启动=假...)来实现此目的。

在某个时刻,我应该会收到一个通知(来自应用程序的另一部分),其中包含一个要处理的分区ID,所以我想“跳过”到该分区的最新可用偏移量,因为我不需要使用碰巧已经存在的数据,并将KafkaConsumer与该通知中的分区ID“关联”。

稍后,我可能会收到通知“停止监听此分区”,尽管存在于其他地方的生产者一直在向该主题和该分区写入,因此我应该从分区“解除关联”消费者并停止接收消息。

我看到有一个< code > org . spring framework . Kafka . annotation . topic partition ,但它提供了一种指定“静态”关联的方法,所以我正在寻找一种“动态”的方法来这样做。

我想我可以求助于底层的Kafka客户端API,但是我更喜欢在这里使用spring

更新

我使用主题<代码>cnp_multi_partition_test_topic有3个分区。

我的当前代码尝试从消费者动态管理分区,如下所示:

@Slf4j
public class SampleKafkaConsumer {   
    @KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
    public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
        Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
        Object sessionId    = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
        log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId,  data);
    }
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {

    private final KafkaListenerEndpointRegistry registry;
    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
    private final UUIDProvider uuidProvider;
    private ConcurrentMessageListenerContainer<String, String> container;

    public void assignPartitions(List<Integer> partitions) {
        if(container != null) {
            container.stop();
            container = null;
        }
        if(partitions.isEmpty()) {
            return;
        }
        var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
        container =
                factory.createContainer(newTopicPartitionOffsets);
        container.getContainerProperties().setMessageListener(
                registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
        // random group
        container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
        container.setConcurrency(1);
        container.start();
    }

    private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
        return partitions.stream()
                .map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
                .collect(Collectors.toList())
                .toArray(new TopicPartitionOffset[] {});
    }
}

两者都是通过java配置管理的Spring bean(singleton)。

生产者每秒生成3条消息,并将其发送到测试主题的3个分区中。我使用了kafka UI工具来确保所有消息都按预期到达。我使用@EventListener@Async使其同时发生。

以下是我如何尝试模拟工作:


@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
    @Autowired
    MultiPartitionKafkaConsumerManager manager;
    
    @Test
    public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
        log.info("Starting the test");
        sleep(5_000);
        log.info("Start listening on partition 0");
        manager.assignPartitions(List.of(0));
        sleep(10_000);
        log.info("Start listening on partition 0,2");
        manager.assignPartitions(List.of(0,2));
        sleep(10_000);
        log.info("Do not listen on partition 0 anymore");
        manager.assignPartitions(List.of(2));
        sleep(10_000);
        log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
        manager.assignPartitions(Collections.emptyList());
        sleep(10_000);

日志显示以下内容:

06:34:20.164 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped

因此,我确实看到消费者已经启动,它甚至试图在内部轮询记录,但我认为我看到WakeupException被代理抛出并“吞噬”。我不确定我是否理解为什么会发生这种情况?

共有1个答案

隗锐进
2023-03-14

您不能在运行时更改手动分配。有几种方法可以达到你想要的结果。

您可以在原型bean中声明侦听器;请参阅我可以在运行时将主题添加到我的@kafkalistener吗?

您可以使用侦听器容器工厂创建具有适当主题配置的新容器,并从静态声明的容器中复制侦听器。

如果需要,我可以提供后者的例子。

编辑

这是第二种技术的一个例子…

@SpringBootApplication
public class So69465733Application {

    public static void main(String[] args) {
        SpringApplication.run(So69465733Application.class, args);
    }

    @KafkaListener(id = "dummy", topics = "dummy", autoStartup = "false")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        return args -> {
            System.out.println("Hit Enter to create a container for topic1, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container1 =
                    factory.createContainer(new TopicPartitionOffset("topic1", 0, SeekPosition.END));
            container1.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container1.getContainerProperties().setGroupId("topic1-0-group2");
            container1.start();

            System.out.println("Hit Enter to create a container for topic2, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container2 =
                    factory.createContainer(new TopicPartitionOffset("topic2", 0, SeekPosition.END));
            container2.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container2.getContainerProperties().setGroupId("topic2-0-group2");
            container2.start();

            System.in.read();
            container1.stop();
            container2.stop();
        };
    }

}

编辑

从命令行创建器将记录发送到主题 1、topic2 后进行日志。

Hit Enter to create a container for topic1, partition0

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622966736
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Subscribed to partition(s): topic1-0

Hit Enter to create a container for topic2, partition0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Seeking to LATEST offset of partition topic1-0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Resetting offset for partition topic1-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622969071
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Subscribed to partition(s): topic2-0

Hit Enter to stop containers
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Seeking to LATEST offset of partition topic2-0
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Resetting offset for partition topic2-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
record from topic1
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
record from topic2
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
Application shutdown requested.
 类似资料:
  • 场景: 运行从名为“test”的具有10个分区的分区中消耗的Spring Boot项目。分区分配发生在13:00:00 在~13:00:30使用: 在~13:05:30触发分区重新分配。 我运行了几次这些步骤,看起来每5分钟就有一次重新分配 是否有办法更改重新分配检查操作频率 编辑: 我的用例如下:我们有引导微服务的集成测试。当主题的使用者首先引导时,如果主题不存在并且它创建的分区数等于配置的并发

  • 我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?

  • 我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。

  • 本文向大家介绍Kafka 消费者是否可以消费指定分区消息?相关面试题,主要包含被问及Kafka 消费者是否可以消费指定分区消息?时的应答技巧和注意事项,需要的朋友参考一下 Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的