我正在尝试为节点中的应用程序设置Kafka队列。我在一个消费者组中有3个消费者,并且它订阅了一个主题,但是当生产者发送消息到一个有3个分区的主题时,消费者组接收到重复的消息,例如,consumer1从partition1读取并获取消息,而consumer2也从相同的分区读取并获取相同的消息。
我给出了到git存储库的链接,其中有所有的可执行代码。我找不到它是设置问题吗?或节点包问题。
为创建使用者组提供的选项如下
consumerOptions = {
'kafkahost:'localhost:9092',
groupId: groupName,
autoCommit: true,
autoCommitIntervalMs: 1000,
sessionTimeout: 15000,
fetchMaxBytes: 10 * 1024 * 1024, // 10 MB
protocol: ['roundrobin'],
fromOffset: 'earliest'
};
var consumer = new kafka.ConsumerGroup(Object.assign({ id: 'consumer1' }, consumerOptions), topicName);
请参阅此链接了解可执行代码、broker配置和zookeeper配置https://github.com/raghavendralacharya/kafka-node.git
尝试将fromoffset
配置更改为:
fromOffset: 'latest'
想要从使用的Spring启动应用程序的不同集群上创建同质。 即想要为已经定义的类创建一个 Kafka Consumer 对象,该对象侦听动态定义的多个集群。 例如:假设一个Spring启动应用程序S,其中包含kafkaconsumer的
我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题
我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话
我需要在不同的机器上配置一个Kafka集群,但它不起作用,当我启动生产者和消费者时,将显示以下错误: 你能帮帮我吗。
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
有一个基本示例,它对1个消费者起作用。它接收消息。但是添加一个额外的消费者将被忽略。 consumer2的“22”事件从未引发问题。如果我使用命令行工具检查该主题,则该主题的数据存在