当前位置: 首页 > 知识库问答 >
问题:

如何利用kafka-node控制已消费kafka消息的提交

潘向明
2023-03-14

我第一次在kafka中使用Node,使用Kafka-Node。使用消息需要调用外部API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,这样,如果一个消费者失败了,另一个将替换它的消费者将收到相同的消息,即它的工作没有完成。

我正在使用Kafka0.10并尝试使用ConsumerGroup。

我想到了在options中设置autocommit:false,并且只在消息的工作完成后提交消息(就像我以前对一些Java代码所做的那样)。

但是,我似乎不能确定,只有在消息完成之后,我应该如何正确地提交消息。我该怎么犯呢?

我的另一个担心是,由于回调,下一个消息似乎在上一个消息完成之前就被读取了。而且我担心,如果消息x+2在消息x+1之前完成,那么偏移量将被设置为x+2,因此在失败的情况下,x+1将永远不会被重新执行。

这里基本上是我到目前为止所做的:

var options = {
    host: connectionString,
    groupId: consumerGroupName,
    id: clientId,
    autoCommit: false
};

var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;

var consumerGroup = new ConsumerGroup(options, topic);

consumerGroup.on('connect', function() {
    console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});

consumerGroup.on('message', function(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
    console.log(message.value);
    doSomeStuff(function() {
        // HOW TO COMMIT????
        consumerGroup.commit(function(err, data) {
            console.log("------ Message done and committed ------");
        });
    });
});

consumerGroup.on('error', function(err) {
    console.log("Error in consumer: " + err);
    close();
});

process.once('SIGINT', function () {
    close();
});

var close = function() {
    // SHOULD SEND 'TRUE' TO CLOSE ???
    consumerGroup.close(true, function(error) {
        if (error) {
            console.log("Consuming closed with error", error);
        } else {
            console.log("Consuming closed");
        }
    });
};

共有1个答案

吴子昂
2023-03-14

在这里您可以做的一件事是为您处理的每个消息都有一个重试机制。

您可以在此线程上查阅我的答案:https://stackoverflow.com/a/44328233/2439404

我使用kafka-consumer使用来自Kafka的消息,使用async/cargo将它们批处理在一起,并将它们放入async/queue(内存队列)中。队列将一个工作函数作为一个争论点,我正在向该争论点传递async/retryable

对于您的问题,您可以使用retryable对您的消息进行处理。https://caolan.github.io/async/docs.html#可重试

这也许能解决你的问题。

 类似资料:
  • 我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?

  • 我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。

  • 所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认