当前位置: 首页 > 知识库问答 >
问题:

Spring AMQP RabbitMQ实现优先级队列

彭浩穰
2023-03-14

在谷歌搜索了几天之后,我相信我完全迷路了。我想实现一种优先级队列,它大约有3个队列:

  1. 高优先级队列(每日),需要先处理
  2. 中等优先级队列(每周),如果队列#1中没有项目,将进行处理。(此队列中的ok消息根本不处理)
  3. 低优先级队列(每月),如果队列#1中没有项目,将进行处理

最初,我有以下流程,让消费者使用所有三个队列中的消息,并检查队列#1、#2和#3中是否有任何项目。然后我意识到这是错误的,因为:

  1. 我完全被一个问题弄糊涂了:“我怎么知道它来自哪个队列?”
  2. 我已经在使用来自任何队列的消息,所以如果我从低优先级队列中获取一个对象,如果我发现高优先级队列中有消息,我是否要将其放回队列

下面是我目前的配置,可见我是多么的白痴。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>

任何想法我应该如何解决这个与优先级队列?

ps:我还想知道,ApacheCamel是否有我可以依赖的东西?

更新1:我刚刚从Apache Camel中看到了这一点:https://issues.apache.org/jira/browse/CAMEL-2537“JMSPriority上的音序器似乎就是我要找的,以前有人试过吗?

更新2:假设我使用基于@Gary Russell推荐的RabbitMQ插件,我有以下spring RabbitMQ上下文XML配置,这似乎是有意义的(由guest..):

<rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
            <entry key="x-max-priority" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>

<bean id="Consumer" class="com.test.Consumer" />

上述xml配置已成功创建了一个队列,名称为:“ad_google_dfa_reporting_Queue”,参数参数为:x-max-priority:10

但当涉及到优先发送消息的代码时,我完全失去了它。如何在示例URL中定义优先级:https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?

    Random random = new Random();
    for (int i=0; i< 1000; i++){
        final int priority = random.nextInt(10 - 1 + 1) + 1;

        DfaReportingModel model = new DfaReportingModel();
        model.setReportType(DfaReportingModel.ReportType.FACT);
        model.setUserProfileId(0l + priority);
        amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(priority);
                return message;
            }
        });
    }

以下是消息使用者的代码:

    public void consume(DfaReportingModel message) {
        System.out.println(message.getUserProfileId());

        Thread.sleep(500);
    }

我得到的结果是:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,

更新4:问题解决了!从中了解示例代码https://github.com/rabbitmq/rabbitmq-priority-queue在我的环境中工作,我认为问题是围绕着spring上下文的。因此,经过无数次的尝试和错误与不同类型的配置,我引脚点的确切组合,将使这项工作!具体如下:

    <rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
        <entry key="x-max-priority">
            <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

如果没有具体定义该值是整数类型,则优先级队列不起作用。终于解决了。耶!

共有2个答案

南门焱
2023-03-14

RabbitMQ在3.5.0版本的核心中有优先级队列实现。

您可以使用x-max-priority参数声明优先级队列。此参数应为整数,指示队列应支持的最大优先级。例如,使用Java客户端:

Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);

然后,您可以使用basic的优先级字段发布按优先级排序的消息。属性。数字越大表示优先级越高。

淳于熙云
2023-03-14

RabbitMQ现在有一个优先级队列插件,其中消息按优先级顺序传递。最好使用这种方法,而不是使用在运行时非常昂贵的低优先级消息重新排队方案。

编辑:

使用rabbitTemplate时。convertAndSend(…)方法,如果要设置消息的优先级属性,则需要在模板中实现自定义的消息属性转换程序(将DefaultMessagePropertiesConverter子类化),或者使用接受消息后处理程序的convertAnSend变体;例如。:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(5);
        return message;
    }
});
 类似资料:
  • 我正在编写一个涉及堆实现的代码,在我的bubbleUp方法中,在我的while循环行中,我似乎遇到了一个取消引用的错误。这可能是一个相当基本的问题,但解决这个问题的最佳方法是什么?我在实现removeHigh方法时也遇到了一些问题,该方法旨在从队列中移除最高的元素。

  • 问题 怎样实现一个按优先级排序的队列? 并且在这个队列上面每次 pop 操作总是返回优先级最高的那个元素 解决方案 下面的类利用 heapq 模块实现了一个简单的优先级队列: import heapq class PriorityQueue: def __init__(self): self._queue = [] self._index = 0

  • 我目前正在尝试实现min heap PQ,但是我在实现的正确性方面遇到了一些问题,我似乎无法找出我做错了什么——它没有输出最低优先级,也没有对它们进行正确排序。 使用以下测试数据: 我得到以下结果: 我希望结果是按升序排列的——起初我认为这可能是因为交换了错误的孩子,但最后一个输出是最大的优先级,所以这没有意义。我花了几个小时试图研究堆优先级队列,但我找不到任何帮助。 以下是CMP要求的更好的代码

  • 问题内容: 总体而言,我正在尝试使用优先级队列来实现Dijkstra的算法。 根据golang-nuts的成员所述,Go中惯用的方法是将堆接口与自定义的基础数据结构一起使用。所以我像这样创建了Node.go和PQueue.go: 和PQueue.go: 和main.go :(动作在SolveMatrix中) 问题是,在编译时我收到错误消息: 注释掉PQ.Push(firstNode)行确实使编译器

  • 我正在使用优先级队列实现Dijkstra的算法,我想要一个函数从堆中删除一个元素,但我只能从Dijkstra的主节点索引中向它发送顶点索引,我找不到它在堆上的位置,我负担不起进行二进制搜索。有什么想法吗?

  • 我需要一个优先级队列,它首先获得具有最高优先级值的项目。我当前正在使用队列库中的PriorityQueue类。但是,这个函数只先返回值最小的项。我尝试了一些很难看的解决方案,比如(sys.maxint-priority)作为优先级,但我只是想知道是否存在更优雅的解决方案。