我的节点应用程序使用kafka节点模块。
我有一个Kafka主题,有三个分区,如下所示:
Topic: NotifierTemporarye3df:/opPartitionCount: 3in$ kafReplicationFactor: 3ibe Configs: segment.bytes=1073741824 --topic NotifierTemporary
Topic: NotifierTemporary Partition: 0 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003,1002
Topic: NotifierTemporary Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
Topic: NotifierTemporary Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001
当我向主题写入一系列键控消息时,它们似乎都被写入同一分区。我希望我的一些不同的密钥消息被发送到分区1和分区2。
这是我从消费者onMessage事件函数中为几条消息输出的日志:
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":345,"partition":0,"highWaterOffset":346,"key":"66","timestamp":"2020-03-19T00:16:57.783Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":346,"partition":0,"highWaterOffset":347,"key":"222","timestamp":"2020-03-19T00:16:57.786Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":347,"partition":0,"highWaterOffset":348,"key":"13","timestamp":"2020-03-19T00:16:57.791Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":348,"partition":0,"highWaterOffset":349,"key":"316","timestamp":"2020-03-19T00:16:57.798Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":446,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":349,"partition":0,"highWaterOffset":350,"key":"446","timestamp":"2020-03-19T00:16:57.806Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":350,"partition":0,"highWaterOffset":351,"key":"66","timestamp":"2020-03-19T00:17:27.918Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":351,"partition":0,"highWaterOffset":352,"key":"222","timestamp":"2020-03-19T00:17:27.920Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":352,"partition":0,"highWaterOffset":353,"key":"13","timestamp":"2020-03-19T00:17:27.929Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":353,"partition":0,"highWaterOffset":354,"key":"316","timestamp":"2020-03-19T00:17:27.936Z"}
以下是发送消息的kafka节点生产者代码:
* @description Adds a notification message to the Kafka topic that is not saved in a database.
* @param {Int} recipientId - accountId of recipient of notification message
* @param {Object} message - message payload to send to recipient
*/
async sendTemporaryNotification(recipientId, subject, message) {
const notificationMessage = {
recipient: recipientId,
subject,
message,
};
// we need to validate this message schema - this will throw if invalid
Joi.assert(notificationMessage, NotificationMessage);
// partition based on the recipient
const payloads = [
{ topic: KAFKA_TOPIC_TEMPORARY, messages: JSON.stringify(notificationMessage), key: notificationMessage.recipient },
];
if (this.isReady) {
await this.producer.sendAsync(payloads);
}
else {
throw new ProducerNotReadyError('Notifier Producer not ready');
}
}
}
如您所见,它们都不是来自分区1
Scarysize说我没有指定分区器类型是正确的。对于任何想知道完整的分区生产者是什么样子的人,您可以参考以下代码。我已经验证过,它根据提供的密钥分发消息。我在这里使用了HighLevelProducer,因为kafka节点库的主要贡献者之一建议其他人使用它来解决分区问题。我还没有验证这个解决方案是否适用于常规生产商,而不是高级生产商。
在本例中,我根据用户的用户ID向用户发送通知消息。这是划分消息的关键。
const { KafkaClient, HighLevelProducer, KeyedMessage } = require('kafka-node');
const Promise = require('bluebird');
const NotificationMessage = require(__dirname + '/../models/notificationMessage.js');
const ProducerNotReadyError = require(__dirname + '/../errors/producerNotReadyError.js');
const Joi = require('@hapi/joi');
const KAFKA_TOPIC_TEMPORARY = 'NotifierTemporary';
/**
* @classdesc Producer that sends notification messages to Kafka.
* @class
*/
class NotifierProducer {
/**
* Create NotifierProducer.
* @constructor
* @param {String} kafkaHost - address of kafka server
*/
constructor(kafkaHost) {
const client = Promise.promisifyAll(new KafkaClient({kafkaHost}));
const producerOptions = {
partitionerType: HighLevelProducer.PARTITIONER_TYPES.keyed, // this is a keyed partitioner
};
this.producer = Promise.promisifyAll(new HighLevelProducer(client, producerOptions));
this.isReady = false;
this.producer.on('ready', async () => {
await client.refreshMetadataAsync([KAFKA_TOPIC_TEMPORARY]);
console.log('Notifier Producer is operational');
this.isReady = true;
});
this.producer.on('error', err => {
console.error('Notifier Producer error: ', err);
this.isReady = false;
});
}
/**
* @description Adds a notification message to the Kafka topic that is not saved in a database.
* @param {Int} recipientId - accountId of recipient of notification message
* @param {String} subject - subject header of the message
* @param {Object} message - message payload to send to recipient
*/
async sendTemporaryNotification(recipientId, subject, message) {
const notificationMessage = {
recipient: recipientId,
subject,
message,
};
// we need to validate this message schema - this will throw if invalid
Joi.assert(notificationMessage, NotificationMessage);
// partition based on the recipient
const messageKM = new KeyedMessage(notificationMessage.recipient, JSON.stringify(notificationMessage));
const payloads = [
{ topic: KAFKA_TOPIC_TEMPORARY, messages: messageKM, key: notificationMessage.recipient },
];
if (this.isReady) {
await this.producer.sendAsync(payloads);
}
else {
throw new ProducerNotReadyError('Notifier Producer not ready');
}
}
}
/**
* Kafka topic that the producer and corresponding consumer will use to send temporary messages.
* @type {string}
*/
NotifierProducer.KAFKA_TOPIC_TEMPORARY = KAFKA_TOPIC_TEMPORARY;
module.exports = NotifierProducer;
创建制作人时,需要配置正确的partitionerType
:
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
new Producer(client, {paritionerType: 3});
参见文档:https://www.npmjs.com/package/kafka-node#producerkafkaclient-选项自定义分区器
我在使用Discord。js和一个长时间运行的Discord bot最近由于消息而突然停止工作。成员对象始终为空。 例如: 这始终会产生:TypeError:无法读取null的属性“roles” 在我们查看消息的代码库中,这种错误也随处可见。成员 即使控制台完全记录消息对象,我们也可以肯定地看到成员属性为空。 Discord.js?有什么变化吗?
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
我想知道,在什么情况下,具有相同分区键的消息会进入不同的分区。 我使用下面给出的命令运行了属于同一组的两个消费者在控制台中监听一个主题: 我使用“纳米/Kafka-php”库将消息放入带有键 的主题。当我发送多个这样的消息时,我发现很少有消息转到第二个消费者,而大多数消息都发送给消费者1。 由于我对所有消息使用相同的密钥,因此我希望所有消息都由同一个使用者使用。每个使用者都绑定到每个分区。 我使用
我试图用不同的密钥将消息存储到不同的分区。 例如: 但是当我尝试运行我的Producer类时,它总是存储在单个分区中。 根据文档,使用查找分区。我还看到这个问题Kafka分区键工作不正常, 但我在Kafka Client库的0.9.x版本中找不到<code>ByteArrayPartitioner</code>类。 更新:我正在使用代码动态创建主题。 如果我手动创建一个带有分区的主题,那么它可以
我在Kafka Topic内部有500万条消息。 我必须加入具有相同分区密钥的消息作为单个消息的一部分,并发送给消费者主题[例如:对于密钥1234-Messge1,消费者应该收到单个消息而不是100万消息] Kafka端是否有可用的Kafka API,使用它我可以读取组中具有相同Partition键的所有消息,而不是像传统的spring boot Kafka Listener那样一次读取单个消息。
我用下面的代码给Kafka写信: 我们使用0.8.1.1版本的Kafka。 当多个线程正在写入时,其中一些线程(具有不同的负载)是否使用相同的分区键进行写入,因此Kafka会覆盖这些消息(由于相同的分区密钥)? 让我们朝这个方向思考的文献是:http://kafka.apache.org/documentation.html#compaction