有时,kafka节点消费者从偏移量0开始消费,而其默认行为是仅消费较新的消息。然后它将不会切换回默认行为。你知道如何解决这个问题吗?会发生什么,它的行为会突然改变?代码非常简单,而且不会改变代码。
var kafka = require("kafka-node") ;
Consumer = kafka.Consumer;
client = new kafka.KafkaClient();
consumer = new Consumer(client, [{ topic: "Topic_23", partition: 0}
]);
consumer.on("message", function(message) {
console.log(message)
});
到目前为止,我找到的唯一解决办法是改变Kafka话题。然后一切又正常了。有什么想法吗?
在Kafka,补偿与特定消费者无关,而是与消费者群体相关。在代码中,您不提供消费者组,因此,每次启动消费者时,它都会被分配到不同的消费者组,因此,偏移量从0
开始。
以下内容应该能起到作用(很明显,这是你第一次阅读所有的信息):
var kafka = require("kafka-node") ;
Consumer = kafka.Consumer;
client = new kafka.KafkaClient();
payload = [{
topic: "Topic_23",
partition: 0
}]
var options = {
groupId: 'test-consumer-group',
fromOffset: 'latest'
};
consumer = new Consumer(client, payload, options);
consumer.on("message", function(message) {
console.log(message)
});
我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。 我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。 消息和记录有什么区别[多个/部分消息是否构成记录?] 当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来? 或 1条消息=1条记录=1个偏移量 之所以会出现这个问题,是
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>
简而言之,我想从一开始就对Kafka的数据重新运行Flink管道。 Flink0.10.2,Kafka0.8.2。 我在Kafka中有一个保留2小时的推文主题,以及Flink中的一个管道,该管道以每10秒5分钟的滑动窗口计算推文。 如果我中断管道并重新运行它,我希望它重新读取旧推文,从而发出价值5分钟的推文计数。相反,它似乎从新到达的推文重新开始,因此需要5分钟才能计数为“处于状态”。 我已经尝试