当前位置: 首页 > 知识库问答 >
问题:

获取到达两个队列之一的第一条消息

陶树
2023-03-14

我有一个应用程序,我想使用队列A中的消息,但是如果队列A是空的,我希望它使用队列B中的消息,而不是闲置。我只希望它一次处理一条消息,如果使用者正忙于处理一条消息(来自任一队列),那么其他挂起的消息(在任一队列中)应该保留在他们的队列中,直到使用者再次空闲(这给其他使用者一个机会来处理新的消息)。

例如,如果一条消息在使用者空闲(等待)时到达队列a或队列B,那么使用者就会拾取它。如果在使用者当前正在处理消息时消息到达任一队列,则在使用者完成当前消息的处理之前不会发生任何事情。如果消息在队列a和队列B上都可用,则使用者更喜欢从队列a中选择消息。

Edit:实际上,我同样乐意得知消息可以在A和B上使用,并实现我自己的逻辑来选择我想从队列中提取的消息。

在单独的线程中同时订阅两个队列似乎不起作用,原因是:

  • 如果消息M到达队列a,则使用者必须在处理消息M时从队列B退订,然后在消息M处理完毕后重新订阅。否则,如果消息到达队列B,它可能会被同时拾取和分派--我不想一次处理多条消息。
  • 如果一条消息同时到达两个队列(或者当使用者启动或完成处理前一条消息时,两个队列中都有一条消息可用),那么在使用者有机会从队列B中退订之前,这两条消息都有可能被拾取。如果发生这种情况,我必须将其中一条消息回滚,但如果经常发生这种情况,则消息将超过回滚计数并被移到失败队列。

理想情况下,我想将其推广到N个队列的情况。我对理想情况下通过JMS工作的解决方案很感兴趣,但是如果我必须使用IBM MQ特定的API来实现这一点,也可以。是否有一个使用模式或库,将有助于实现这一点?是否有可供选择的排队方法/技术/技术允许它?

将所有消息从所有队列中取出并放入单个队列是不可行的;我们需要能够独立清除单个队列,不同的消费者可能订阅这N个队列的不同子集。

共有1个答案

祁高格
2023-03-14

我不知道这对您的代码是否会是一个很大的改变,但是您有没有想过在您的项目中加入Spring框架,特别是Spring集成?您所描述的这些问题已经在那里得到了处理。

通过两个endpoint和两个队列上的事件驱动使用者以及一个线程池,可以解决这个问题。

http://www.eaipatterns.com/eventdrivenconsumer.html

http://en.wikipedia.org/wiki/thread_pool_pattern

[编辑]

 类似资料:
  • 我通过命名的JMS队列将JMS请求发送到Weblogic 10.3服务器,并通过临时队列接收回复。 客户(裸体): 服务器MDB(消息驱动bean): 问题是第一个服务器回复经常丢失!也就是说,每4-5个回复都以超时结束。当消费者收到第一个答案时,它将继续收到所有剩余的答案,所以问题只出在临时队列创建后通过临时队列发送的第一条消息上。 我的问题是:我是否必须为临时队列设置一些特殊的设置,以便它在创

  • 问题内容: 我有一个包含以下各列的模型: 我有索引和。 现在,我想获取由排序的每个对象的第一个日志。 一种方法是对每个用户名运行以下查询: 但这显然会查询数据库很多次(等于用户名的数量)。有什么方法可以只执行一个数据库查询吗? 问题答案: 另一种情况: 返回每个“第一”条目 的整行。 这种Ruby语法应该可以工作:

  • 我主要在RPC模式下使用rabbitMq,但我还想将请求和响应消息复制到另一个队列。 最后,我想实现的是,外部消费者可以通过听一个队列来查看所有流量,我们称之为“日志队列”。 复制传入消息是可以的,我只需要使用扇出交换,或者使用与RPC调用使用的路由密钥相同的路由密钥将日志队列绑定到使用过的交换。 但我无法找到通过直接回复功能“扇出”发送的消息的方法。 到目前为止,我了解到响应消息以amqp的形式

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • 问题内容: 如何获取与流中的条件匹配的第一个元素?我已经尝试过了但是没用 该条件不起作用,在除Stop之外的其他类中调用filter方法。 问题答案: 这可能是您要寻找的: 一个例子: 输出为:

  • 我对ActiveMQ有一个奇怪的问题。我有一个队列,似乎有一个挂起的消息,但当我打开队列时,没有消息。 这里怎么了?真的有消息等待处理吗?我怎样才能把信息带回来,或者至少能看到内容? 编辑:刚刚发现ActiveMQ 5.6.0的这两个错误。这可能是那个问题的根源吗? 不正确的报告挂起QueueSize的持久子后重新连接与未破解 OrderPendingList中的问题可能导致在持久子重新连接后无法