当前位置: 首页 > 知识库问答 >
问题:

Solace JMS消费者在重新连接后停止

子车俊哲
2023-03-14

我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。

@Autowired
CachingConnectionFactory ccf;
Connection connection = ccf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = new session.createConsumer(destination); // This is passed to new thread

我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常

javax.jms.IllegalStateException: Error receiving message - already closed (Tried to call receive on a stopped message consumer.) ...

更重要的是,CachingConnectionFactory默认将reConnectOnExcture属性设置为true(我检查了是否有效)。

例外似乎很清楚。所以我的问题是:当连接丢失并再次建立时,如何处理这种情况/异常?消费者有可能再次活着吗?或者我应该创建新的消费者和新的线程(我想避免什么)?

共有2个答案

印劲
2023-03-14

一般来说,CachingConnectionFactory可以通过Connection的onExc0019()支持自动重新连接。但是一旦连接被重新连接,您使用的消费者也应该被重新创建。

您可以参考/使用spring的DefaultMessageListenerContainerSimpleMessageListenerContainer,例如,SimpleMessageListenerContainer

叶嘉颖
2023-03-14

ReconnexException属性将在收到异常后尝试重新连接。由于故障切换仍在进行中,此重新连接可能会失败。

您可以将Solace API配置为尝试使用JNDI连接工厂中的“重新连接重试”和“重新连接重试等待”属性重新连接多次。这可以在Solace路由器上通过SolAdmin或CLI进行配置。如果将应用程序配置为重新连接足够长的时间以建立连接,则不应该遇到任何异常。否则,您将需要在收到此异常时再次创建会话和消费者。

 类似资料:
  • 但是,即使我将节点显式设置到属性中,它们也不会尝试重新连接到有效的kafka节点。 如何使我的使用者在他们连接的kafka节点失败后重新连接到有效的kafka节点?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我需要帮助消费者在Spring启动。当断开连接时,我需要停止应用程序,例如10分钟。当断开连接时 或者当无法连接时 我使用ConsumerFactory和ConcurrentKafkaListenerContainerFactory进行消费者的所有配置

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?