我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗?
Producer.js
var config = require('./config.js');
var zk = require('node-zookeeper-client');
var kafkaConn = "130.8";
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client(kafkaConn),
producer = new HighLevelProducer(client),
payloads = [
{ topic: 'test', messages: 'second message' }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
consumer.js
function start () {
topics = [{topic: 'test'}];
var groupId = 'ulogGroup';
var clientId = "consumer-" + Math.floor(Math.random() * 10000);
var options = {autoCommit: true, fetchMaxWaitMs: 100, fetchMaxBytes: 10 * 1024 * 1024, groupId: groupId};
console.log("Started consumer: ", clientId);
var consumer_client = new kafka.Client(kafkaConn,clientId);
var client = new Client(consumer_client.connectionString,clientId);
var consumer = new HighLevelConsumer(client, topics, options);
console.log("Consumer topics:", getConsumerTopics(consumer).toString());
// startConsumer(consumer);
consumer.on('message', function (message) {
//var topic = message.data;
console.log('Message',message);
});
};
start();
可能是最微不足道的建议,但你确定你的第二条消息被发送到Kafaka队列吗?除了您的使用者之外,您可能希望在生产者发布消息时使用内置的命令行使用者,以确保所有消息都已发布。
bin/kafka-console-consumer.sh --zookeeper host:2181 --topic test --from-beginning
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”
我试图用wifi接口从一台电脑上的Kafka制作人发送消息到另一台电脑上的Kafka经纪人,但消息不出现在Kafka经纪人的指定主题中。 我用华硕无线路由器连接了两台PC机,并禁用了PC机和路由器上的所有防火墙。两台PC都成功地ping了对方。当我转向有线连接时,它工作了,消息被摄取到kafka broker PC上的指定主题。 Kafka制片人: null Listeners=明文://:909
在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。 我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。 同时,我还创建了两个分区。 但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在
如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢