当前位置: 首页 > 知识库问答 >
问题:

如何控制ConsumerGroup处理消息的并发性

周鸿云
2023-03-14

我正在使用kafka-node ConsumerGroup来消费来自主题的消息。ConsumerGroup在使用消息时需要调用外部API,这可能需要一秒钟的时间来响应。我希望控制从队列中消费下一条消息,直到我从API得到响应,这样消息就会被顺序地处理。

我该如何控制这种行为?

共有1个答案

钱和安
2023-03-14

这就是我们实现一次处理1条消息的方式:

var async = require('async'); //npm install async

//intialize a local worker queue with concurrency as 1 (only 1 event is processed at a time)
var q = async.queue(function(message, cb) {
          processMessage(message).then(function(ep) {
          cb(); //this marks the completion of the processing by the worker
        });
}, 1);

// a callback function, invoked when queue is empty. 
q.drain = function() {
    consumerGroup.resume(); //resume listening new messages from the Kafka consumer group
};

//on receipt of message from kafka, push the message to local queue, which then will be processed by worker
function onMessage(message) {
  q.push(message, function (err, result) {  
    if (err) { logger.error(err); return }      
  });
  consumerGroup.pause(); //Pause kafka consumer group to not receive any more new messages
}
 类似资料:
  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 我知道我可以通过以下方式将转换为。 我现在的问题是,我在那个序列中有700个未来,我希望能够控制并行解决其中的多少个,因为每个未来都将调用内部rest api,同时有700个请求就像对那个服务器发起dos攻击。 我宁愿一次只能解决10个期货。 我如何才能做到这一点? 尝试pamu的答案,我看到了错误:

  • 我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。 现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。 在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。 在当前版本中,我们使用了@enablebinding、@streamlistener

  • 在FLTK中是通过Fl_Widegt::handle(),虚拟函数来处理系统的消息。我们可以查看Fltk的源代码来分析系统是怎样处理一些系统消息的,如按钮的消息处理 /******************************************************* Fl_Button中处理消息的代码,省略了具体的处理代码 *******************************

  • 控制台-频道-聊天-发消息 接口URL {youke-url}/console/Index.php?c=live&a=setComment&timestamp=1607677497&access_key=abc&sign=5c734d046a244b27ffa74a4235a45a1b3ada5ec7 请求方式 POST Content-Type form-data 请求Query参数 参数 示例