当前位置: 首页 > 知识库问答 >
问题:

spring-AMQP-消息处理延迟

邢项禹
2023-03-14

我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。

流程如下所示:

我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。

问题是:

  • 处理正常启动,我们可以看到消息通过队列流动,但一段时间后,listener类停止接收事件。似乎我们能够将它发布到RabbitMQ通道,但它从未离开队列到侦听器。这似乎开始降级,导致事件在一段时间后被处理,直到几分钟。负载不是那么高,它像大约200个事件,从我们关心的只是其中的一小部分。

我们所尝试的:

  • 最初队列将pre-fetch设置为1,消费者的最小值为2,最大值为5,我们删除了pre-fetch,并将更多消费者添加为最大并发设置,但问题仍然存在,延迟只是需要更长的时间来呈现,但几分钟后,处理开始需要大约20/30秒。

我们可以在日志中看到我们将事件发布到队列中,我们可以看到我们在延迟的情况下将事件从队列中取出的日志。所以我们的代码中间没有运行任何东西来产生这种延迟。

据我们所知,其余的队列似乎都能正确地处理消息,但就是这个队列陷入了这种卡顿模式。

我看到的错误有以下几个,但我想知道它的含义以及它是否相关:

Jun  4 11:16:04  server: [pool-3-thread-10] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer@70dfa413 (amq.ctag-VaWc-hv-VwcUPh9mTQTj7A) method handleDelivery for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198) threw an exception for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198)
Jun  4 11:16:04  server: java.io.IOException: Unknown consumerTag
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1266)
Jun  4 11:16:04  server: at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
Jun  4 11:16:04  server: at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun  4 11:16:04  server: at java.lang.reflect.Method.invoke(Method.java:498)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
Jun  4 11:16:04  server: at com.sun.proxy.$Proxy119.basicCancel(Unknown Source)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:846)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jun  4 11:16:04  server: at java.lang.Thread.run(Thread.java:748)

这种情况发生在应用程序关闭时,但我看到它发生在应用程序仍在运行时。

2018-06-05 13:22:45,443 ERROR CachingConnectionFactory$DefaultChannelCloseLogger - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 109, class-id=60, method-id=120)

我不知道如何解决这两个错误,也不知道它们是否相关。

这是我的spring配置:

<!-- Queues -->
<rabbit:queue id="monitorIncomingEventsQueue" name="MonitorIncomingEventsQueue"/>
<rabbit:queue id="interestingEventsQueue" name="InterestingEventsQueue"/>
<rabbit:queue id="textCallsEventsQueue" name="TextCallsEventsQueue"/>
<rabbit:queue id="callDisconnectedEventQueue" name="CallDisconnectedEventQueue"/>
<rabbit:queue id="incomingCallEventQueue" name="IncomingCallEventQueue"/>
<rabbit:queue id="eventLoggingQueue" name="EventLoggingQueue"/>

<!-- listeners -->
<bean id="monitorListener" class="com.example.rabbitmq.listeners.monitorListener"/>
<bean id="interestingEventsListener" class="com.example.rabbitmq.listeners.InterestingEventsListener"/>
<bean id="textCallsEventListener" class="com.example.rabbitmq.listeners.TextCallsEventListener"/>
<bean id="callDisconnectedEventListener" class="com.example.rabbitmq.listeners.CallDisconnectedEventListener"/>
<bean id="incomingCallEventListener" class="com.example.rabbitmq.listeners.IncomingCallEventListener"/>
<bean id="eventLoggingEventListener" class="com.example.rabbitmq.listeners.EventLoggingListener"/>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="40" acknowledge="none">
    <rabbit:listener queues="interestingEventsQueue" ref="interestingEventsListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="textCallsEventsQueue" ref="textCallsEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="callDisconnectedEventQueue" ref="callDisconnectedEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="30" acknowledge="none">
    <rabbit:listener queues="incomingCallEventQueue" ref="incomingCallEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="1" max-concurrency="3" acknowledge="none">
    <rabbit:listener queues="monitorIncomingEventsQueue" ref="monitorListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="10"  acknowledge="none">
    <rabbit:listener queues="EventLoggingQueue" ref="eventLoggingEventListener" method="handleLoggingEvent"/>
</rabbit:listener-container>

<rabbit:connection-factory id="connectionFactory" host="${host.name}" port="${port.number}" username="${user.name}" password="${user.password}" connection-timeout="20000"/>

我在这里读到,处理延迟可能是由网络问题引起的,但在这种情况下,服务器和应用程序在同一个VM上。这是一个封闭的环境,所以大多数港口都不开放,但我怀疑这是什么错。

更多日志:https://pastebin.com/4qmfdt7a

感谢任何帮助,

谢谢,

共有1个答案

姬奇思
2023-03-14

我需要看更多的原木--这是确凿的证据:

Storing...Storing delivery for Consumer@a2ce092: tags=[{}]

(consumer)tags为空,这意味着使用者当时已经被取消(出于某种原因,应该出现在日志的前面)。

如果您有可能使用1.7.9.build-snapshot,我添加了一些跟踪级别的日志记录,这将有助于诊断这个问题。

编辑

回复你最近对RabbitMQ的评论-用户...

你能尝试固定并发吗?spring AMQP的容器中的可变并发常常不是很有用,因为通常只有在整个容器空闲一段时间时,消费者才会减少。

然而,这也许可以解释为什么你看到消费者被取消。

也许在这种逻辑中存在着某种竞争条件;使用固定数量的消费者(不要指定最大...)会避免那样做;如果你能尝试一下,它至少会消除这种可能性。

这就是说,我很困惑(我在你的堆栈溢出配置中没有注意到这一点);使用acknowledge=“NONE”时,不应向代理发送ACK(NONE用于设置自动ACK)

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), ...

而且

public boolean isAutoAck() {

    return this == NONE;

}

是否从代码中发送ACK?如果是,则ack模式应为手动。我看不到容器为无ack模式发送ack的场景。

 类似资料:
  • 使用Spring AMQP(使用RabbitMQ作为消息代理),我正在准备一个消息,并且我希望我的消息在有时之后被使用。在此之前,它可以在某个队列中等待,比如等待队列,然后移动到我们的主队列,在那里我们有消费者,它正在等待处理来自主队列的消息。 p.s>如果没有rabbitmq_delayed_message_exchange插件也可以。

  • 我需要解决这个场景。我有两个amqp消费者设置来获取一条消息。 taskChannel是queuechannel,但一次只允许使用一条消息,因此没有并行处理。如果另一条消息花了太长时间才继续,我如何在超时后拒绝一条消息。那么这个消息将返回到队列,由另一个节点继续?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条,所以如果第一条预取消息需要很长时间才能处理,那么如何释放第二条预取消息呢。

  • 我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后

  • 我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以:

  • 我开发了spring批处理应用程序,该应用程序生成由json对象列表组成的amqp(rabbitmq)消息。消息具有包含一些元数据的标头。Spring cloud stream应用程序正在消费消息,我使用了功能性方法。如何访问标题<将消息头用于除路由之外的任何内容,这是一种糟糕的方法吗?

  • 我正在使用Spring AMQP侦听RabbitMQ队列。在侦听队列时,根据业务逻辑,我的服务可以引发RuntimeException,在这种情况下,消息将重试多次。在最大次数重试后,消息将保留在DLQ中。我想知道,在DLQ中处理这些消息的最佳方法是什么?我从博客上读到我可以使用停车场队列。但在这种情况下,如何监控队列并通知人们死信消息?P、 对不起,我的英语不好。希望我能够解释我的问题:)