当前位置: 首页 > 知识库问答 >
问题:

如何轮询RabbitMQ以连续获取优先级顺序的消息?

滕祯
2023-03-14

我们可以使RabbitMQ分布式优先级队列通过安装插件Rabbitmq-优先级队列从https://www.rabbitmq.com/community-plugins.html.我将元素推送到队列中(每个元素都有一个优先级),并且我能够根据需要在消费者中接收队列的内容-较高优先级的元素首先出现。

问题是,当这种情况持续发生时,优先级轮询概念不起作用:

  1. 运行发布服务器以填充队列中具有不同优先级的3个项目
  2. 使用队列中的消息-工作正常-按优先级使用。现在消费者等待队列中的任何消息,此时队列为空
  3. 我再次运行publisher来填充大约5个元素
  4. 使用者不会优先使用队列中的5项,而是按照步骤3发布者发布的顺序使用

我需要的是,在队列的所有内容中具有最大优先级的队列项目的每次轮询都应该首先出现。

有人能告诉我这里发生了什么错误吗?谢谢

以下是出版商和消费者(Java)的片段:

出版商

public class RabbitMQPublisher {
    private static final String QUEUE = "my-priority-queue-3";
    public static void main(String[] argv) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        final Connection conn = factory.newConnection();
        final Channel ch = conn.createChannel();
        final Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", 100);
        ch.queueDeclare(QUEUE, true, false, false, args);
        publish(ch, 24);
        publish(ch, 11);
        publish(ch, 75);
        //second run
        //publish(ch, 27);
        //publish(ch, 77);
        //publish(ch, 12);
        conn.close();
    }

    private static void publish(Channel ch, int priority) throws IOException {
        final BasicProperties props = MessageProperties.PERSISTENT_BASIC.builder().priority(priority).build();
        final String body = "message with priority " + priority;
        ch.basicPublish("", QUEUE, props, body.getBytes());
    }

消费者

while (true) {
        final QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        final String message = new String(delivery.getBody());
        System.out.println(message);
    }

输出:

message with priority 75
message with priority 24
message with priority 11
message with priority 27
message with priority 77
message with priority 12

共有1个答案

冷吉星
2023-03-14

我能够使用basicGet来轮询队列而不是消费者来解决这个问题。nextDelivery()<代码>最终字符串消息=新字符串(channel.basicGet(QUEUE_NAME,true))。getBody()) 这将从队列中提取具有最高优先级的项目。

 类似资料:
  • RabbitMQ有消息优先级的概念吗?我有一个问题,一些更重要的消息由于队列中不太重要的消息而被拖慢。我希望高优先级的优先,并移动到队列的前面。 我知道我可以用两个队列来近似计算,一个是“快”队列,另一个是“慢”队列,但这看起来像是一个黑客。 有人知道使用RabbitMQ的更好的解决方案吗?

  • 在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.

  • 如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。 我想不断地寻找信息,然后处理它。

  • 问题内容: 我想了解express.js中的顺序优先级。例如,下面的代码 如果请求来自客户端localhost:3000 / api / abc和localhost:3000 / user / abc,则来自api和用户模块的响应。但是,如果我发出类似localhost:3000 / myName / xyz的请求,则应用程序模块将返回响应。此行为使我担心expressjs的优先顺序以及路由器模块

  • Netty中是否有任何嵌入式优先级机制可以帮助我决定哪些消息比其他消息发送得更频繁?

  • 我有2个RabbitMQ队列: = 正如您对其名称所设想的那样,队列使用死信交换功能,这意味着当消息过期时,它将被重新调用到我的。 我试图实现的是在每次处理失败并将消息推送到DLX队列时增加消息的。 问题是,即使消息过期,当它不在队列的底部(头部)时,它也不会请求我的。因此,如果DLX队列中有到期时间为7天的消息,并且我们将到期时间为5秒的新消息加入队列,则该消息将仅在7天5秒后请求到。... 我