当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ批量消费消息解决方案

西门良才
2023-03-14

我使用RabbitMQ作为不同消息的队列。当我使用来自一个队列的两个不同消费者的消息时,我会处理它们并将处理结果插入数据库:

def consumer_callback(self, channel, delivery_tag, properties, message):
    result = make_some_processing(message)
    insert_to_db(result)
    channel.basic_ack(delivery_tag)

我想大量使用队列中的消息,这将减少数据库负载。由于RabbitMQ不支持消费者批量读取消息,我将这样做smth:

some_messages_list = []
def consumer_callback(self, channel, delivery_tag, properties, message):
    some_messages_list.append({delivery_tag: message})
    if len(some_messages_list) > 1000:
        results_list = make_some_processing_bulk(some_messages_list)
        insert_to_db_bulk(results_list)
        
        for tag in some_messages_list:
            channel.basic_ack(tag)
        some_messages_list.clear()
  1. 消息在全部完全处理之前处于队列中
  2. 如果消费者跌倒或断开连接 - 消息保持安全

你认为这个解决方案怎么样?如果可以的话,如果消费者摔倒了,我怎样才能重新得到所有未识别的信息?

共有1个答案

谢清野
2023-03-14

我已经测试这个解决方案几个月了,可以说它相当不错。直到AMPQ不提供批量消费的功能,我们不得不使用这样的一些漫游。

注意:如果您决定使用此解决方案,请注意使用多个消费者(线程)进行并发消费,或者使用一些锁(我使用过pythonthreading.Lock模块)来保证没有竞争条件发生。

 类似资料:
  • 我是一个新的学习者,试图理解拉雷维尔的拉比MQ。我已找到驱动程序vyuldashev/laravel队列rabbitmq 我已经配置应用程序/queue.php,并运行驱动程序与此语法"php工匠队列:工作Rabbitmq"。控制器。我不会在我的控制器中调度作业,因为laravel只是监听消息并处理消息。谁能帮我解释一下这是怎么回事?谢啦

  • 我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者

  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。