当前位置: 首页 > 知识库问答 >
问题:

Kafka正在将同一分区分配给同一组中的使用者

杜楚
2023-03-14

Kafka-来自同一组的多个使用者分配了相同的分区

我刚刚开始学习Kafka和诺德。我已经写了一篇关于消费者的文章如下

// consumer.js
const kafka = require('kafka-node');
var client = new kafka.Client('localhost:2181');
var topics = [{
    topic: 'topic-4'
}];

var options = {
    groupId: 'kafka-node-group-2',
    autoCommit: true,
    fetchMaxWaitMs: 1000,
    fetchMaxBytes: 1024 * 1024,
    encoding: 'buffer'
};
var consumer = new kafka.HighLevelConsumer(client, topics, options);

// consumer.payloads has only one entry
console.log('Topic', consumer.payloads[0].topic);
console.log('Group', consumer.options.groupId);
console.log('Assigned Partition:', consumer.payloads[0].partition);

输出

Topic topic-4
Group kafka-node-group-2
Assigned Partition: 0

tope-4有四个分区。

./desc_topic.sh topic-4
Topic:topic-4   PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: topic-4  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-4  Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic-4  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-4  Partition: 3    Leader: 2   Replicas: 2 Isr: 2

编辑

我使用了ConsumerGroup,如下所示。

var options = {
    host: 'localhost:2181',  // zookeeper host omit if connecting directly to broker (see kafkaHost below)
    groupId: 'Group-1',
    sessionTimeout: 15000,
    // // An array of partition assignment protocols ordered by preference.
    // // 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
    protocol: ['roundrobin']
};
var consumer = new kafka.ConsumerGroup(options, ['topic-4']);

生产者正在发送100条消息,收到的消息如下。这就是我如何知道分配的分区(不是从consumer对象)。

{
    topic: 'topic-4',
    value: '{"subject":"Message Id 30 "}',
    offset: 172,
    partition: 0,
    highWaterOffset: 173,
    key: null
}

当我运行两个这样的使用者实例(相同的主题和组)时,其中只有一个接收来自分区0的所有内容。这不是问题吗?

这是生产商代码。

const kafka = require('kafka-node');
const Client = kafka.Client;
var client = new Client('localhost:2181', 'my-client-id', {
  sessionTimeout: 300,
  spinDelay: 100,
  retries: 2
});

// For this demo we just log client errors to the console.
client.on('error', function(error) {
  console.error(error);
});

var producer = new kafka.HighLevelProducer(client);

producer.on('ready', function() {
    for (var i = 0; i <= 30; i++) {
        let id = 'Message Id ' + i + ' ';
        let msg = {
            'subject': id
        };
        var messageBuffer = Buffer.from(JSON.stringify(msg));

        // Create a new payload
        var payload = [{
            // topic: 'topic-', + (i%2+2),
            topic: 'topic-4',
            messages: messageBuffer,
            timestamp: Date.now(),
            attributes: 1 /* Use GZip compression for the payload */
        }];

        //Send payload to Kafka and log result/error
        producer.send(payload, function(error, result) {
            console.info('Sent payload to Kafka: ', payload);
            if (error) {
                console.error('Error', error);
            } else {
                var formattedResult = result[0];
                console.log('result: ', result)
            }
        });
    }
});

// For this demo we just log producer errors to the console.
producer.on('error', function(error) {
    console.error(error);
});

共有1个答案

孙渝
2023-03-14

这是一个众所周知的问题。我也遇到过。如果您使用的Kafka版本比发布的版本更新,则可能值得重新检查并可能重新打开此问题。

https://issues.apache.org/jira/browse/KAFKA-6681

 类似资料:
  • 这是我拥有的数据帧的简化版本: 在这个 df 中,row.names 是唯一的 ID(我知道它打破了整洁数据的规则)。 在示例中,我们可以看到行id1和行id2是重复的。 我想做的是确定它们是重复的,并为这些重复项分配一个唯一的组名称。但请注意,将有多行彼此重复。 我希望的产出是: 有什么想法吗? 编辑: 我的原始数据示例:

  • 多台机器生成事件。这些事件被发送到我们的Kafka集群,其中每台机器都有自己的主题(app.machine-events.machine-name)。因为顺序在每台机器的基础上很重要,而分区大小现在不是问题,所以所有主题都由一个分区组成。因此,目前,N个主题也意味着N个分区。 消费/处理应用程序使用了kafka-streams,我们给出了/“machine-event-processor”,它对每

  • 根据Kafka的文件: kafka保证主题分区只分配给组中的一个消费者。 但我在服务中观察到了不同的行为。以下是一些细节: 我用的是Kafka2.8和SpringKafka2.2.13。 最初我有一个Kafka主题包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的注释,并发性=5。这个配置对我来说很好。 后来,我开始

  • 我试图用不同的密钥将消息存储到不同的分区。 例如: 但是当我尝试运行我的Producer类时,它总是存储在单个分区中。 根据文档,使用查找分区。我还看到这个问题Kafka分区键工作不正常‏, 但我在Kafka Client库的0.9.x版本中找不到<code>ByteArrayPartitioner</code>类。 更新:我正在使用代码动态创建主题。 如果我手动创建一个带有分区的主题,那么它可以

  • 我在分布式模式下使用 Kafka Connect。我现在多次观察到的一个奇怪行为是,一段时间后(可能是几个小时,可能是几天),似乎发生了平衡错误:相同的任务被分配给多个工人。因此,它们同时运行,并且根据连接器的性质,失败或产生“不可预测”的输出。 我能够用来重现该行为的最简单配置是:两个 Kafka Connect 工作线程,两个连接器,每个连接器只有一个任务。Kafka Connect 已部署到

  • 我有两个kafka consumer实例,配置了相同的消费者组,并监听相同主题中的分区0。问题是我发消息到题目的时候。消息由两个实例使用,这两个实例应该不会发生,因为它们在同一个组中。我使用Spring Boot配置类来配置它们。 以下是配置: 以下是听众: