当前位置: 首页 > 知识库问答 >
问题:

如何使用Kafka制作人将消息发送到同一主题?

钦耀
2023-03-14

我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗?

Producer.js

var config = require('./config.js');
var zk = require('node-zookeeper-client');
var kafkaConn = "130.8";
var kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    client = new kafka.Client(kafkaConn),
    producer = new HighLevelProducer(client),
    payloads = [
        { topic: 'test', messages: 'second message' }
    ];
producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
    });
});

consumer.js

function start () {
    topics = [{topic: 'test'}];
    var groupId = 'ulogGroup';
    var clientId = "consumer-" + Math.floor(Math.random() * 10000);
    var options = {autoCommit: true, fetchMaxWaitMs: 100, fetchMaxBytes: 10 * 1024 * 1024, groupId: groupId};
    console.log("Started consumer: ", clientId);
    var consumer_client = new kafka.Client(kafkaConn,clientId);
    var client = new Client(consumer_client.connectionString,clientId);
    var consumer = new HighLevelConsumer(client, topics, options);
    console.log("Consumer topics:", getConsumerTopics(consumer).toString());
   // startConsumer(consumer);
    consumer.on('message', function (message) {
        //var topic = message.data;
        console.log('Message',message);
    });
};
start();

共有1个答案

颜文昌
2023-03-14

可能是最微不足道的建议,但你确定你的第二条消息被发送到Kafaka队列吗?除了您的使用者之外,您可能希望在生产者发布消息时使用内置的命令行使用者,以确保所有消息都已发布。

bin/kafka-console-consumer.sh --zookeeper host:2181 --topic test --from-beginning
 类似资料:
  • 我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!

  • 我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”

  • 我试图用wifi接口从一台电脑上的Kafka制作人发送消息到另一台电脑上的Kafka经纪人,但消息不出现在Kafka经纪人的指定主题中。 我用华硕无线路由器连接了两台PC机,并禁用了PC机和路由器上的所有防火墙。两台PC都成功地ping了对方。当我转向有线连接时,它工作了,消息被摄取到kafka broker PC上的指定主题。 Kafka制片人: null Listeners=明文://:909

  • 在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。 我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。 同时,我还创建了两个分区。 但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢