当前位置: 首页 > 知识库问答 >
问题:

为什么同一组id的Kafka消费者不平衡?

端木乐语
2023-03-14

我正在编写一个概念验证应用程序来使用Apache Kafka0.9.0.0中的消息,看看是否可以使用它而不是通用的JMS消息代理,因为Kafka提供了好处。这是我的基本代码,使用新的消费者API:

public class Main implements Runnable {

    public static final long DEFAULT_POLL_TIME = 300;
    public static final String DEFAULT_GROUP_ID = "ltmjTest";

    volatile boolean keepRunning = true;
    private KafkaConsumer<String, Object> consumer;
    private String servers;
    private String groupId = DEFAULT_GROUP_ID;
    private long pollTime = DEFAULT_POLL_TIME;
    private String[] topics;

    public Main() {
    }

    //getters and setters...

    public void createConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configs.put("enable.auto.commit", "true");
        configs.put("auto.commit.interval.ms", "1000");
        configs.put("session.timeout.ms", "30000");

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(asList(topics));
    }

    public static void main(String[] args) {
        Main main = new Main();
        if (args != null && args.length > 0) {
            for (String arg : args) {
                String[] realArg = arg.trim().split("=", 2);
                String argKey = realArg[0].toLowerCase();
                String argValue = realArg[1];
                switch (argKey) {
                case "polltime":
                    main.setPollTime(Long.parseLong(argValue));
                    break;
                case "groupid":
                    main.setGroupId(argValue);
                    break;
                case "servers":
                    main.setServers(argValue);
                    break;
                case "topics":
                    main.setTopics(argValue.split(","));
                    break;
            }
        }
        main.createConsumer();
        new Thread(main).start();
        try (Scanner scanner = new Scanner(System.in)) {
            while(true) {
                String line = scanner.nextLine();
                if (line.equals("stop")) {
                    main.setKeepRunning(false);
                    break;
                }
            }
        }
    }
}

我使用默认设置启动了一个kafka服务器,并使用shell工具kafka-console-producer.sh启动了一个kafka生产者,以便将消息写入我的主题。然后,我使用这段代码与两个使用者连接,发送正确的服务器来连接,发送主题来订阅,其他一切都使用默认值,这意味着两个使用者都有相同的组ID。我注意到只有我的一个消费者使用所有的数据。我从官方教程中读到默认行为应该是服务器必须平衡消费者:

如果所有的使用者实例都有相同的使用者组,那么这就像传统的队列平衡使用者的负载一样。

我如何修复消费者的行为像默认的?也许我漏掉了什么?

共有1个答案

微生俊名
2023-03-14

有一个trait kafka.consumer.PartitionAssignor说明如何为每个消费者分配分区。它有两个模块:RoundRobinAssignor和RangeSignor。默认的是RangeAsSignor。

可以通过设置参数“Partition.Assignment.Strategy”来更改。

循环赛文档:

范围分配器在每个主题的基础上工作。对于每个主题,我们按数字顺序列出可用分区,按词典顺序列出使用者。然后,我们将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它没有均匀划分,那么前几个消费者将有一个额外的分区。例如,假设有两个使用者C0和C1,两个主题t0和t1,每个主题有3个分区,产生分区t0p0、t0p1、t0p2、t1p0、t1p1和T1P2。赋值将是:C0:[t0p0,t0p1,t1p0,t1p1]C1:[t0p2,t1p2]

所以,如果我们所有的主题只有一个分区,那么只有一个使用者可以工作

 类似资料:
  • 我以前认为设置我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。 由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此策略将始终适用。我的疑虑就从这里开始了。假设以下场景: 我有两个客户端应用程序,A和B,每个客

  • 本文向大家介绍什么是kafka消费者组?相关面试题,主要包含被问及什么是kafka消费者组?时的应答技巧和注意事项,需要的朋友参考一下 答:消费者组的概念是Apache Kafka独有的。基本上,每个Kafka消费群体都由一个或多个共同消费一组订阅主题的消费者组成。

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我看不出我做错了什么。如有任何帮助,不胜感激。

  • 我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多

  • 根据Kafka的文件: kafka保证主题分区只分配给组中的一个消费者。 但我在服务中观察到了不同的行为。以下是一些细节: 我用的是Kafka2.8和SpringKafka2.2.13。 最初我有一个Kafka主题包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的注释,并发性=5。这个配置对我来说很好。 后来,我开始