当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ:预取消息处理

杨高翰
2023-03-14

我正在使用Spring AMQP与RabbitMQ一起工作。以下是我的配置:

<rabbit:connection-factory id="connectionFactory"
    host="${queue.host}" port="${queue.port}" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="${queue.names}" durable="true"
    exclusive="false" />
<rabbit:listener-container
    connection-factory="connectionFactory" acknowledge="auto" error-handler="airbrakeHandler" prefetch="1000" >
    <rabbit:listener ref="consumer" queue-names="${queue.names}" />
</rabbit:listener-container>

正如您所看到的,prefetchCount是1000。

我想知道预取的消息是否在消费者中并行处理;也就是说,多个线程调用onMessage(消息消息)方法。或者消息是按顺序处理的;也就是说,一个线程迭代预取的消息,并以连续的方式调用每个消息的onMessage(消息消息消息)方法。

我应该注意到,处理的顺序对我来说并不重要。事实上,它们一次处理一个。

提前感谢。

共有1个答案

司马庆
2023-03-14

SimpleMessageListenerContainer配置包括一个concurrency参数,用于设置使用者线程的最大数量:

concurrency: The number of concurrent consumers to start for each listener.

对于单线程操作,可以将其设置为“1”-例如:

<rabbit:listener-container ... prefetch="1000" concurrency="1">
    <rabbit:listener ref="consumer" queue-names="${queue.names}" />
</rabbit:listener-container>

有关详细信息,请参阅Spring Docs和SimpleMessageListenerContainer的javadoc。

 类似资料:
  • 我有以下兔子听者: 我需要将listener配置为在它处理一条消息后等待15分钟,然后再接收下一条消息。不需要在此方法中等待。我所需要的只是在处理完一条后不接收任何消息。可以通过来完成,但我不确定这是否是实现这一点的最佳方法。对于这种情况有没有rabbitmq的配置?

  • 我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我正在尝试让JMS消费者致力于ActiveMQ服务器,但是我遇到了一些问题,这些解决方案似乎只会导致更多问题。 我正在使用Spring的DMLC(cacheLeve设置为CACHE\u连接)和CachingConnectionFactory(cachingConsumers设置为true),将Activemq自己的连接工厂包装在部署在Jboss AS中的Webapp中。 据此http://acti

  • 在队列选项卡的rabbitMQ web界面上,我看到了“概述”面板,我在其中找到了以下内容: 排队消息: 准备好了 未确认 总数 我猜“总数”是多少。但什么是“准备就绪”和“未确认”?“准备好了”——传递给消费者的信息?“未确认”-? 消息费率: 发表 交付 重新交付 承认 这些信息是什么?尤其是“重新交付”和“确认”?这是什么意思?