当前位置: 首页 > 知识库问答 >
问题:

在Kafka节点中创建主题分区

饶曦之
2023-03-14
const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
    const kafkaClient = new KafkaClient({ kafkaHost });
    const producer = new HighLevelProducer(kafkaClient);
    const options = {
        highWaterMark,
        kafkaClient,
        producer
    }; 

    kafkaClient.refreshMetadata([topic], err => {
        if (err) throw err; 
    }); 

    return new ProducerStream(options);
};

const transfrom = topic => new Transform({
    objectMode: true,
    decodeStrings: true,
    transform(obj, encoding, cb) {
        console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);

        cb(null, {
            topic,
            messages: JSON.stringify(obj)
        });
    }
});

const publisher = (topic, kafkaHost, highWaterMark) => {
    const myTransfrom = transfrom(topic);
    const producer = myProducerStream({ kafkaHost, highWaterMark, topic });

    myTransfrom.pipe(producer);

    return myTransform;
};


const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
    const consumerOptions = {
        kafkaHost,
        groupId,
        protocol: ['roundrobin'],
        encoding: 'utf8',
        id: uuidv4(),
        fromOffset: 'latest',
        outOfRangeOffset: 'earliest',
    };

    const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);

    consumerGroupStream.on('connect', () => {
        console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
    });

    consumerGroupStream.on('error', (err) => {
        console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
    });

    return consumerGroupStream; 
};

const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => { 
    const messageTransform = new AsyncMessageTransform(func, destTopic);

    const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })

    consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
}; 

共有1个答案

闽朝
2023-03-14

对于第一个问题:一个组中最大的工作使用者等于分区的数量。

因此,如果您有一个分区的TopicA,而您的消费者组中有5个消费者,其中4个将是空闲的。

如果您有5个分区的TopicA,并且您的消费者组中有5个消费者,那么他们都将是活跃的,并且消费来自您的主题的消息。

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

要更改已存在主题中的分区数,请执行以下操作:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test 
       --partitions 40 

请注意,只能增加分区数,不能减少分区数。

请参阅Kafka Docs https://Kafka.apache.org/documentation.html

此外,如果您想了解更多关于Kafaka的信息,请查阅免费书籍https://www.confluent.io/resources/kafka-the-definitive-guide/

 类似资料:
  • 使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。

  • null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?

  • 问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:

  • 按需运行时,我可以以编程方式(使用java)从Kafka Topic创建/删除分区吗? 我正在使用Java Apache Kafka客户端(0.10)和Apache Kafka安装(0.10)

  • 我正在学习使用Kafka在科特林的SpringKafka。我知道,当一个新主题发布时,如果不存在,它就会被创建。所以,当我向从Spring创建的新/旧主题发送一个值时,默认分区是0,但我想在另一个分区上写一条消息,比如分区1。 当我创建/写一个主题时,它是有效的: 但是,当我使用以下选项选择分区和密钥时: 我得到了以下错误: 我试着把钥匙换成,但也没用。显然,当我从Spring客户端创建一个主题时

  • Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。