当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ/AMQP:单个队列,同一消息的多个使用者?

孙泉
2023-03-14

我刚刚开始使用RabbitMQ和AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用相同的消息做不同的事情。

RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。

例如:生产者只有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这里有一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。我将在一个终端机看到1、3、5号信息,在另一个终端机看到2、4、6号信息。

我的问题是:

>

  • 我可以让每个消费者接收相同的消息吗?也就是说,两个消费者都得到消息1、2、3、4、5、6?这在AMQP/RabbitMQ Speak中叫什么?它通常是如何配置的?

    通常这样做吗?我是否应该让交换将消息路由到两个单独的队列中,并使用一个使用者?

  • 共有1个答案

    傅皓君
    2023-03-14

    我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1、2、3、4、5、6?这在AMQP/RabbitMQ Speak中叫什么?它通常是如何配置的?

    不,如果消费者在同一队列中,则不是。摘自RabbitMQ的AMQP概念指南:

    在AMQP0-9-1中,消息在使用者之间是负载平衡的,这一点很重要。

    这似乎意味着队列中的循环行为是给定的,而不是可配置的。即,为了使同一消息ID由多个使用者处理,需要单独的队列。

    通常这样做吗?我是否应该让交换将消息路由到两个单独的队列中,并使用一个使用者?

    不,它不是,单个队列/多个消费者,每个消费者处理相同的消息ID是不可能的。让exchange将消息路由到两个单独的队列中确实更好。

    由于我不需要太复杂的路由,扇出交换可以很好地处理这一点。我之前没有过多地关注交换,因为node-amqp有一个“默认交换”的概念,允许您直接将消息发布到连接,然而大多数AMQP消息都发布到特定的交换。

    这是我的扇出交换,发送和接收:

    var amqp = require('amqp');
    var connection = amqp.createConnection({ host: "localhost", port: 5672 });
    var count = 1;
    
    connection.on('ready', function () {
      connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
    
        var sendMessage = function(exchange, payload) {
          console.log('about to publish')
          var encoded_payload = JSON.stringify(payload);
          exchange.publish('', encoded_payload, {})
        }
    
        // Recieve messages
        connection.queue("my_queue_name", function(queue){
          console.log('Created queue')
          queue.bind(exchange, ''); 
          queue.subscribe(function (message) {
            console.log('subscribed to queue')
            var encoded_payload = unescape(message.data)
            var payload = JSON.parse(encoded_payload)
            console.log('Recieved a message:')
            console.log(payload)
          })
        })
    
        setInterval( function() {    
          var test_message = 'TEST '+count
          sendMessage(exchange, test_message)  
          count += 1;
        }, 2000) 
     })
    })
    
     类似资料:
    • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

    • RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用。我有一个需求,其中希望从一个队列接收到多个订阅的消费者的相同消息。 下面是我的示例消费者代码。这里有两个侦听器在侦听同一个队列,但是只有一个使用者接收到消息。如何配置它,以便将相同的消息传递给两个消费者?(Consumer1和Consumer2)。任何帮助都将得到高度赞赏。

    • 我有一个关于RabbitMQ队列的问题。我想在一个队列上发送两种类型的消息。 我知道,我可以创建两个不同的队列,并使用路由键将不同的消息发送到不同的队列。 但是我希望在一个队列上有两个消费者,并以某种方式将消费者与消息类型绑定。它是通过兔子队列驱动的事件,当客户端和核心是发布者和消费者时。 有可能吗?或者我应该使用不同的队列吗? 数据交换

    • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?

    • 是否可以使用topic将消息发送到队列,并有2个消费者接收和处理相同的消息?目前,我已经创建了两个消费者,他们正在观察与一个exchage主题绑定的队列,但是第一个消费者使用了该消息并删除了该队列,第二个消费者没有接收到该消息。

    • 我们最近遇到了由RabbitMQ支持的应用程序的意外行为。RabbitMQ版本为3.6.12,我们使用的是.NET客户端5.0.1 应用程序订阅了两个队列,一个用于命令,另一个用于事件--我们还使用手动确认。我们的应用程序配置为有7个消费者。每个通道都有自己的通道(IModel),每个通道都有自己的EventingBasicConsumer,当EventingBasicConsumer.Recei