当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ java客户端停止消费消息

常朗
2023-03-14

我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。

我使用默认心跳和连接超时设置(60秒)运行。

我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。

版本:

RabbitMQ 3.5.3 
Erlang 17.5.3
Java 1.8
Spring boot 1.3.2.RELEASE
Spring rabbit 1.5.3.RELEASE

问题是对于一个特定队列的消费者来说,一段时间后会停止消费消息。当我重新启动我的java应用程序时,一切正常。不过,其他队列正在正常消费。应用程序方面没有错误。在兔子方面的日志流中,我看到一些条目,例如

= REPORT==== 2016-08-02 15:53:32 UTC ===
closing AMQP connection <SOMETHING> (SOMETHING_ELSE -> SOMETHING_ELSE_ELSE):
{heartbeat_timeout,running}

我无法在本地或Heroku的测试环境中复制。

使现代化

下面的代码可以在AMQConnection.class中找到

int heartbeat = negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());


private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);
}

我不能将心跳的值增加到60秒以上(这是我从服务器得到的)。

共有1个答案

白嘉石
2023-03-14

不幸的是,这似乎是一个网络问题。这可能是由于以下几点:

  • CloudAMQP服务出现一些问题,正在终止您的连接(不太可能,因为您的其他用户工作正常)
  • 您的CloudAMQP计划不允许有您想要的那么多并发连接。您是否检查过以确保您的计划足够高,足以支持所有消费者?https://elements.heroku.com/addons/cloudamqp
  • 您的Heroku dyno和相关消费者正在重新启动,这将中断您的连接。Heroku dynos定期重启。如果您的dynos无法正常重启,您可能需要调查原因
  • 您的Heroku dynos之一出现网络问题(在这种情况下,它可能会在没有您干预的情况下自行重新启动)

强制所有dyno重新启动的一种方法是运行$heroku ps:restart。这将迫使Heroku重新启动dynos,这通常意味着将它们移动到新的EC2主机。如果是一次性问题,这可能会有所帮助。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我正在节点上使用stomp客户端。js与ActiveMQ。当前,生产者将消息推送到队列中,如果客户端(消费者)已连接,则它会使用消息。若客户端未连接,则消息在队列中挂起,直到或除非连接了某个消费者。 在我的例子中,消费者执行的每个操作都很昂贵,最多需要2个小时。我想消费一条消息,然后停止消费队列中的消息,直到该操作完成。目前,一旦消息被推入队列,客户端就会自动从队列中提取所有消息。我想要的是提取一

  • 我在kafka消费者文档中看到了这个注释-

  • 我正尝试使用Camel以事务方式从JMS队列中消费一条消息。特别是在这样的流程中: 等待消息在JMS队列上发布 尝试消费和处理单个消息 如果处理失败(发生异常),回滚消耗 如果处理通过,确认并停止使用更多消息 在应用程序生命周期的后期,另一个进程触发消费从(1)重新开始 起初,我试图使用轮询消费者,使用ConsumerTemplate来做这件事,但是我不知道是否可以通过事务来做这件事——似乎事务是

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢

  • 我有以下情况:我使用Kafka模板,以便与Kafka交互,但我希望能够以编程方式防止Kafka客户端从某个主题读取消息,但仍然能够产生到其他主题。有没有办法实现这一点?