我们面临着ActiveMQ及其消费者的随机问题。我们观察到,很少有消费者不接收消息,即使他们连接到ActiveMQ队列。但在消费者重启后,它工作正常。
我们在ActiveMQ端有一个名为testQueue的队列。消费者正试图将消息从该队列中解列。为此,我们正在使用Spring的DefaultMessageListenerContainer。消息正在从ActiveMQ代理传递到使用者节点。从tcpdump也可以明显看出,消息正在到达使用者节点,但实际的使用者代码无法看到消息。换句话说,消息似乎卡在ActiveMQ使用者代码或Spring的DefaultMessageListenerContainer中。
有关该问题的更多信息,请参阅下图。消息正在到达消费者节点,但没有到达“实际消费者类”,这意味着消息被卡在AMQ消费者代码或Spring DMLC中。
以下是从ActiveMQ管理员获取的详细信息。
队列名称/挂起消息计数/消费者计数/消息排队/消息排队测试队列/9/1/9/0
以下是更多细节。
连接ID/Sesshtml" target="_blank">ionId/Selector/Enqueues/Dequeues/Dispatched/Dispatched Queue/Prefetch ID:bearsvir52-45176-1375519181268-3:5/1//9/0/9/9/250
从第二个表中可以明显看出,消息正在传递给消费者,但消费者没有确认消息。因此,消息被卡在代理端的调度队列中。
请注意以下几点:
1) 代理节点和消费者节点没有时差。
2) 观察用户侧的tcpdump。我们可以看到MessageDispatch(Openwire)数据包被传输到消费者节点,但找不到相同的MessageAck(Openwire)。
3)有时它在一个节点上工作,有时它在同一个节点上制造问题。
解决方案花了很多时间。org.apache.activemq.ActiveMQConnection.java类似乎有一些问题,以防AMQ故障转移。在这种情况下,连接对象不会在消费者端启动。
以下是我在ActiveMQConnection中添加的修复。并编译源代码以创建activemq-core-x.x.x.jar
private final Object startMutex = new Object();
添加了一个签入create会话方法
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
synchronized (startMutex) {
if(!isStarted()) {
start();
}
}
造成这种情况的一个原因可能是错误地使用CachingConnectionFactory
(带有缓存的消费者)和动态调整消费者(最大消费者)的侦听器容器
对于这样的问题,我通常建议使用跟踪日志记录,您可以看到所有消费者的活动。
我有一个JMS生产者和一个消费者,代理是ActiveMQ,参考下面的代码: 寄件人代码 接收码 问题是 ActiveMQ 队列无法接收来自发送方的消息(请参阅屏幕截图): 当我从 Web 控制台发送消息时,该消息在队列中收到,但来自创建者的消息不会进入队列。 另一个有趣的行为是(如队列接收器代码中所示,接收器在收到第一条消息后退出),同样,当我启动接收器时,它会收到相同的消息,并继续执行,直到我关
我的代码中有一个Spring JmsListener。它接收和消费消息2天,但突然在这2天后,它没有收到来自外部Activemq的消息。然而,它的队列中有一些挂起的消息。当我重置Activemq和消费者时,消费者会收到大量消息。当消息挂起时,连接到Activemq的消费者(代表Spring致动器日志)。日志和配置显示Activemq没有将消息推送给消费者。我有另一个像这个消费者一样的服务从其他队列
我已经用C语言编写了kafka消费者和生产者,使用的库是Library dkafka库。kafka代理版本是kafka_2.12-2.3.0。生产者正在成功生成消息,dr_msg_cb函数确认成功传递。但是,消费者没有收到来自代理的消息。有人能帮助进一步调试吗? 我可以看到,从消费者到代理的TCP连接已经建立。但TCPdump显示代理并没有向消费者发送任何数据。我在消费者代码上启用了调试,下面是消
我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。
我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht
如果我创建上面的类并尝试在tomcat7上部署war,我会看到以下错误。