我第一次在kafka中使用Node,使用Kafka-Node。使用消息需要调用外部API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,这样,如果一个消费者失败了,另一个将替换它的消费者将收到相同的消息,即它的工作没有完成。
我正在使用Kafka0.10并尝试使用ConsumerGroup。
我想到了在options中设置autocommit:false
,并且只在消息的工作完成后提交消息(就像我以前对一些Java代码所做的那样)。
但是,我似乎不能确定,只有在消息完成之后,我应该如何正确地提交消息。我该怎么犯呢?
我的另一个担心是,由于回调,下一个消息似乎在上一个消息完成之前就被读取了。而且我担心,如果消息x+2在消息x+1之前完成,那么偏移量将被设置为x+2,因此在失败的情况下,x+1将永远不会被重新执行。
这里基本上是我到目前为止所做的:
var options = {
host: connectionString,
groupId: consumerGroupName,
id: clientId,
autoCommit: false
};
var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('connect', function() {
console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});
consumerGroup.on('message', function(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
console.log(message.value);
doSomeStuff(function() {
// HOW TO COMMIT????
consumerGroup.commit(function(err, data) {
console.log("------ Message done and committed ------");
});
});
});
consumerGroup.on('error', function(err) {
console.log("Error in consumer: " + err);
close();
});
process.once('SIGINT', function () {
close();
});
var close = function() {
// SHOULD SEND 'TRUE' TO CLOSE ???
consumerGroup.close(true, function(error) {
if (error) {
console.log("Consuming closed with error", error);
} else {
console.log("Consuming closed");
}
});
};
在这里您可以做的一件事是为您处理的每个消息都有一个重试机制。
您可以在此线程上查阅我的答案:https://stackoverflow.com/a/44328233/2439404
我使用kafka-consumer
使用来自Kafka的消息,使用async/cargo
将它们批处理在一起,并将它们放入async/queue
(内存队列)中。队列将一个工作函数作为一个争论点,我正在向该争论点传递async/retryable
。
对于您的问题,您可以使用retryable对您的消息进行处理。https://caolan.github.io/async/docs.html#可重试
这也许能解决你的问题。
我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?
我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。
所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递
由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。
我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认