当前位置: 首页 > 知识库问答 >
问题:

camel-rabbitMQ:在单个camel消费者中从多个rabbitmq队列进行消费

田鹤轩
2023-03-14

我有以下场景:有3个rabbitmq队列,生产者根据消息的优先级将消息推送到这些队列。(myqueue_high,myqueue_medium,myqueue_low)我希望有一个可以按顺序或优先级从这些队列中提取的单一使用者,即只要消息在那里,它就一直从高队列中提取。它是从介质中拉出来的。如果medium也是空的,它从Low拉出。

我如何实现这一点?我需要编写自定义组件吗?

共有1个答案

东郭宏朗
2023-03-14

将所有消息放入一个队列,但具有不同的优先级会更容易。这样,优先级排序将在代理中完成,并且Camel使用者将获得已经按优先级排序的消息。但是,RabbitMQ实现了FIFO原则,不支持优先级处理(目前为止)。

解决方案1

Camel允许您使用重新排序器根据某个比较器重新组织消息:https://Camel.apache.org/resequencer.html

from("rabbitmq://hostname[:port]/myqueue_high")
    .setHeader("priority", constant(9))
    .to("direct:messageProcessing");

from("rabbitmq://hostname[:port]/myqueue_medium")
    .setHeader("priority", constant(5))
    .to("direct:messageProcessing");

from("rabbitmq://hostname[:port]/myqueue_low")
    .setHeader("priority", constant(1))
    .to("direct:messageProcessing");

// sort by priority by allowing duplicates (message can have same priority)
// and use reverse ordering so 9 is first output (most important), and 0 is last
// (of course we could have set the priority the other way around, but this way
// we keep align with the JMS specification...)
// use batch mode and fire every 3th second
from("direct:messageProcessing")
    .resequence(header("priority")).batch().timeout(3000).allowDuplicates().reverse()
    .to("mock:result");

这样,所有传入消息都被路由到相同的子路由(direction:messageprocessing),在该子路由中,消息根据传入路由设置的priority头重新排序。

解决方案2

将SEDA与优先级队列一起使用:

final PriorityBlockingQueueFactory<Exchange> priorityQueueFactory = new PriorityBlockingQueueFactory<Exchange>();
priorityQueueFactory.setComparator(new Comparator<Exchange>() {
    @Override
    public int compare(final Exchange exchange1, final Exchange exchange2) {
        final Integer prio1 = (Integer) exchange1.getIn().getHeader("priority");
        final Integer prio2 = (Integer) exchange2.getIn().getHeader("priority");
        return -prio1.compareTo(prio2);  // 9 has higher priority then 0
    }
});

final SimpleRegistry registry = new SimpleRegistry();
registry.put("priorityQueueFactory", priorityQueueFactory);

final ModelCamelContext context = new DefaultCamelContext(registry);
// configure and start your context here...
from("rabbitmq://hostname[:port]/myqueue_high")
    .setHeader("priority", constant(9))
    .to("seda:priority?queueFactory=#priorityQueueFactory");  // reference queue in registry

from("rabbitmq://hostname[:port]/myqueue_medium")
    .setHeader("priority", constant(5))
    .to("seda:priority?queueFactory=#priorityQueueFactory");

from("rabbitmq://hostname[:port]/myqueue_low")
    .setHeader("priority", constant(1))
    .to("seda:priority?queueFactory=#priorityQueueFactory");

from("seda:priority")
    .to("direct:messageProcessing");

解决方案3

如果在发生故障时需要持久性,请使用Camel的ActiveMQ组件等JMS而不是SEDA。只需通过设置jmspriperation头,将来自RabbitMQ的传入消息转发到JMS目的地。

解决方案4

完全跳过RabbitMQ,只使用支持优先级的JMS代理,如ActiveMQ。

 类似资料:
  • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?

  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。

  • 使用RabbitMQ,有没有一种方法可以将消息从队列“推送”给使用者,而不是让使用者从队列“轮询并拉出”消息? 这也是我目前正在进行的一个项目引起一些争论的原因。一个方面的论点是,让使用者(即windows服务)“轮询”队列直到新消息到达,与将消息从队列自动“推送”到订户/使用者的想法相比,这种想法有些低效,也不太理想。 我似乎只能找到支持消费者从队列中“轮询并拉出”消息的信息(例如,使用wind

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • 我有一个RabbitMQ代理,它设置了多个队列。在客户端(Java ),我有多个消费者,他们都像这样监听他们的队列: 队列_1- 它们都使用一个连接但不同的通道。发生的情况是,当我加载所有队列并启动应用程序代理服务时,首先服务一个队列,而不是另一个队列,依此类推。因此,消息一次由各自的消费者一个队列接收。我还想提一下,我正在使用预取计数1来实现消费者流量的公平分配。 我怎样才能让它发生,让所有的队