当前位置: 首页 > 知识库问答 >
问题:

Spring AMQP-@RabbitListener轮询是否在兜帽下?

牟正真
2023-03-14

我想异步处理来自AMQP/RabbitMQ队列的消息。我已经为此实现了一个RabbitListener方法(来自spring rabbit),但似乎这个侦听器实际上是在我的队列中进行轮询。这是意料之中的吗?我本以为监听器会以某种方式收到RabbitMQ的通知,而不必进行轮询。

如果这是意料之中的,我是否也可以在不进行轮询的情况下使用Spring AMQP异步消费消息?

当我发送消息时,听众会正确地接收到它。我仍然看到连续的日志消息流,表明侦听器继续轮询空队列:

…
15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
…

最后一条日志消息基本上每秒无限重复。

前两种方法可能是最有趣的部分;其余主要为Spring配置:

@Configuration
@EnableRabbit
public class MyTest {

    public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appCtxt =
                new AnnotationConfigApplicationContext(MyTest.class)) {
            // send a test message
            RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
            Queue queue = appCtxt.getBean(Queue.class);
            template.convertAndSend(queue.getName(), "Hello World");
            System.out.println("Sent: Hello World");

            // Now that the application with its message listeners is running,
            // block this thread forever; make sure, though, that the
            // application context can sanely be closed.
            appCtxt.registerShutdownHook();
            Object blockingObj = new Object();
            synchronized (blockingObj) {
                blockingObj.wait();
            }
        }
    }

    @RabbitListener(queues = "#{ @myQueue }")
    private void processHello(@Payload String msg,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        System.out.println("Received: " + msg);
        channel.basicAck(deliveryTag, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnFactory());
    }

    @Bean
    public ConnectionFactory rabbitConnFactory() {
        return new CachingConnectionFactory();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory
            rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory result =
                new SimpleRabbitListenerContainerFactory();
        result.setConnectionFactory(rabbitConnFactory());
        result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return result;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", false);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(rabbitConnFactory());
    }
}

共有1个答案

瞿博易
2023-03-14

它不是轮询Rabbitmq;当消息从Rabbit异步到达时,它被放置在消费者的内部队列中;将消息移交给被阻塞的侦听器线程,等待到达。

您看到的调试消息是在侦听器线程等待来自rabbitmq的新消息超时之后。

您可以增加receiveTimeout来减少日志,或者只需为BlockingQueueConsumer禁用调试日志。

增加超时将使容器对容器Stop()请求的响应性降低。

编辑:

为了回应你下面的评论...

是的,我们可以中断线程,但它比这要复杂得多。接收超时还用于在以下情况下确认消息

假设您只想每20条消息(而不是每条消息)进行一次访问。人们这样做是为了提高在大容量环境中的性能。超时也用于访问(txSize实际上是每n条消息或超时)。

现在,假设有19条消息到达,然后在60秒内没有消息,您的超时时间是30秒。

这意味着这19条消息将被长时间取消确认。在默认配置下,ack将在第19条消息到达后1秒发送。

在这个超时中确实没有什么开销(我们只是简单地返回并再次等待),所以增加它是不寻常的。

此外,当上下文关闭时容器停止,人们总是停止和启动容器。

 类似资料:
  • 一、 我与CachingConnectionFactory有一个SpringAMQP项目。我需要从AMQP连接获取一些属性,例如:状态、连接时间、通道和一些运行时度量。CachingConnectionFactory是否有任何指标支持(例如:https://www.rabbitmq.com/blog/2016/11/30/metrics-support-in-rabbitmq-java-clien

  • 汉兜即汉字 Wordle,是一款填字游戏。 《Wordle》是一款网页文字游戏。 在《Wordle》中,玩家要在一天内用六次机会内猜中某个有五字英文字母的词汇。每次尝试后,玩家可能得到三种反馈:绿色表示字母位置正确;黄色表示答案包含该字母但位置错误;灰色表示答案没有该字母。

  • 问题内容: 使连接保持打开状态,直到发生事件。 python django http rest 问题答案: 显然,最常见的方法不是直接在django中进行,而是借助附加的守护程序(可能是因为,例如Apache在许多长寿命连接方面做得不好)。如今,nodejs + socketio对此非常流行(它甚至可以使用WebSockets)-您只需要找到一种在两种方法之间传递数据的好方法。如果它是单向的(例如

  • 我一直在尝试SpringAMQP。我有几个问题: 我想知道什么是Publisher退货,它与Publisher确认有什么不同。据我所知,我们有一个Publisher Confirm回调,用于检查ACK的状态。现在我看了Spring AMQP和Rabbit MQ中的文档。在这件事上我真的没有发现或理解太多。 还有为什么如果消息试图发送到一个不存在的队列,我不会得到任何类型的确认(ack/nack),

  • 问题内容: 我最近在StackOverflow上问了一个有关我的功能的问题,人们建议我使用Ajax Long Polling。我花了几天的时间研究该主题,并尝试编写基本的长轮询代码,但是这些代码都没有起作用,而且我什么也做不了。 这是我的基本功能: 有人能够告诉我如何将其转变为基本的长轮询功能,或者甚至直接指向我需要到达的路径。很感谢任何形式的帮助。谢谢! 问题答案: 通常(即,当不使用长时间轮询

  • 长轮询在GCP PubSub JS SDK上可用吗? 我希望能够同时处理多个PubSub消息,例如: 这是它将如何在AWS上工作的一个示例: SQS队列包含超过5条消息。 侦听器将在单个中一次获得5条消息。事件