const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
const kafkaClient = new KafkaClient({ kafkaHost });
const producer = new HighLevelProducer(kafkaClient);
const options = {
highWaterMark,
kafkaClient,
producer
};
kafkaClient.refreshMetadata([topic], err => {
if (err) throw err;
});
return new ProducerStream(options);
};
const transfrom = topic => new Transform({
objectMode: true,
decodeStrings: true,
transform(obj, encoding, cb) {
console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);
cb(null, {
topic,
messages: JSON.stringify(obj)
});
}
});
const publisher = (topic, kafkaHost, highWaterMark) => {
const myTransfrom = transfrom(topic);
const producer = myProducerStream({ kafkaHost, highWaterMark, topic });
myTransfrom.pipe(producer);
return myTransform;
};
const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
const consumerOptions = {
kafkaHost,
groupId,
protocol: ['roundrobin'],
encoding: 'utf8',
id: uuidv4(),
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
};
const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);
consumerGroupStream.on('connect', () => {
console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
});
consumerGroupStream.on('error', (err) => {
console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
});
return consumerGroupStream;
};
const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => {
const messageTransform = new AsyncMessageTransform(func, destTopic);
const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })
consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
};
对于第一个问题:一个组中最大的工作使用者等于分区的数量。
因此,如果您有一个分区的TopicA,而您的消费者组中有5个消费者,其中4个将是空闲的。
如果您有5个分区的TopicA,并且您的消费者组中有5个消费者,那么他们都将是活跃的,并且消费来自您的主题的消息。
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
要更改已存在主题中的分区数,请执行以下操作:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test
--partitions 40
请注意,只能增加分区数,不能减少分区数。
请参阅Kafka Docs https://Kafka.apache.org/documentation.html
此外,如果您想了解更多关于Kafaka的信息,请查阅免费书籍https://www.confluent.io/resources/kafka-the-definitive-guide/
使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。
null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?
问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:
按需运行时,我可以以编程方式(使用java)从Kafka Topic创建/删除分区吗? 我正在使用Java Apache Kafka客户端(0.10)和Apache Kafka安装(0.10)
我正在学习使用Kafka在科特林的SpringKafka。我知道,当一个新主题发布时,如果不存在,它就会被创建。所以,当我向从Spring创建的新/旧主题发送一个值时,默认分区是0,但我想在另一个分区上写一条消息,比如分区1。 当我创建/写一个主题时,它是有效的: 但是,当我使用以下选项选择分区和密钥时: 我得到了以下错误: 我试着把钥匙换成,但也没用。显然,当我从Spring客户端创建一个主题时
Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。