我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。
我使用默认心跳和连接超时设置(60秒)运行。
我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。
版本:
RabbitMQ 3.5.3
Erlang 17.5.3
Java 1.8
Spring boot 1.3.2.RELEASE
Spring rabbit 1.5.3.RELEASE
问题是对于一个特定队列的消费者来说,一段时间后会停止消费消息。当我重新启动我的java应用程序时,一切正常。不过,其他队列正在正常消费。应用程序方面没有错误。在兔子方面的日志流中,我看到一些条目,例如
= REPORT==== 2016-08-02 15:53:32 UTC ===
closing AMQP connection <SOMETHING> (SOMETHING_ELSE -> SOMETHING_ELSE_ELSE):
{heartbeat_timeout,running}
我无法在本地或Heroku的测试环境中复制。
使现代化
下面的代码可以在AMQConnection.class
中找到
int heartbeat = negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
private static int negotiatedMaxValue(int clientValue, int serverValue) {
return (clientValue == 0 || serverValue == 0) ?
Math.max(clientValue, serverValue) :
Math.min(clientValue, serverValue);
}
我不能将心跳的值增加到60秒以上(这是我从服务器得到的)。
不幸的是,这似乎是一个网络问题。这可能是由于以下几点:
强制所有dyno重新启动的一种方法是运行$heroku ps:restart。这将迫使Heroku重新启动dynos,这通常意味着将它们移动到新的EC2主机。如果是一次性问题,这可能会有所帮助。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我正在节点上使用stomp客户端。js与ActiveMQ。当前,生产者将消息推送到队列中,如果客户端(消费者)已连接,则它会使用消息。若客户端未连接,则消息在队列中挂起,直到或除非连接了某个消费者。 在我的例子中,消费者执行的每个操作都很昂贵,最多需要2个小时。我想消费一条消息,然后停止消费队列中的消息,直到该操作完成。目前,一旦消息被推入队列,客户端就会自动从队列中提取所有消息。我想要的是提取一
我在kafka消费者文档中看到了这个注释-
我正尝试使用Camel以事务方式从JMS队列中消费一条消息。特别是在这样的流程中: 等待消息在JMS队列上发布 尝试消费和处理单个消息 如果处理失败(发生异常),回滚消耗 如果处理通过,确认并停止使用更多消息 在应用程序生命周期的后期,另一个进程触发消费从(1)重新开始 起初,我试图使用轮询消费者,使用ConsumerTemplate来做这件事,但是我不知道是否可以通过事务来做这件事——似乎事务是
我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢
我有以下情况:我使用Kafka模板,以便与Kafka交互,但我希望能够以编程方式防止Kafka客户端从某个主题读取消息,但仍然能够产生到其他主题。有没有办法实现这一点?