当前位置: 首页 > 知识库问答 >
问题:

当application server突然停止时,丢失带有Spring JMS和ActiveMQ的JMS消息

马天逸
2023-03-14

我有一个Spring JMS应用程序,它有一个JMS侦听器,在应用程序启动时连接到活动的MQ队列。这个JMS侦听器是an应用程序的一部分,该应用程序接收消息,用内容丰富消息,然后将消息传递给同一个ActiveMQ代理上的主题。

SessionTransact设置为true。我没有执行任何数据库事务,所以我没有@Transactional设置。从我所读到的内容来看,SessionTransact属性围绕JMS侦听器的receive方法设置本地事务,因此在事务完成之前不会将消息从队列中拉出。我已经使用本地ActiveMQ实例和本地tomcat容器对此进行了测试,它按预期工作。

但是,当我部署到我们的PERF环境并重试相同的测试时,我注意到服务器关闭时当前正在运行的消息是在完成receive方法之前从队列中提取的。

我想知道的是,是否有任何明显的东西,我应该寻找?是否有某些JMS标头会导致这种行为发生?请让我知道,如果有更多的信息,我可以提供。

我在运行Java8的Tomcat7容器上使用Spring 4.1.2和Apache ActiveMQ 5.8.0版本。

更新-添加我的Java JMS配置。请注意,为了清晰起见,我将PERF属性文件中的内容替换到了相关区域。

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(-1);
    factory.setConcurrency(1);
    factory.setSessionTransacted(Boolean.TRUE);
    return factory;
}

@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

    redeliveryPolicy.setInitialRedeliveryDelay(1000);
    redeliveryPolicy.setRedeliveryDelay(1000);
    redeliveryPolicy.setMaximumRedeliveries(6);
    redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
    redeliveryPolicy.setBackOffMultiplier(5);

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
    cachingConnectionFactory.setSessionCacheSize(1);
    return cachingConnectionFactory;
}

@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);

    return jmsMessagingTemplate;
}

protected ActiveMQPrefetchPolicy prefetchPolicy(){
    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    int prefetchValue = 0; 
    prefetchPolicy.setQueuePrefetch(prefetchValue);
    return prefetchPolicy;
}

谢谢,

胡安

暂时还没有答案

 类似资料:
  • 我错过了什么? AMQ版本5.13.2 Java 1.8.0\u 74 Windows 10 给定一个简单的测试用例,传输两条Object消息,一条带有数据,另一条是数据结束标记。只有数据结束标记被接收。 队列在作业开始时创建,并在作业完成后销毁。 如果我运行更多的事务,我会看到大约50%的接收率。 日志清楚地显示接收器在第一条消息被放入队列之前就已启动,两条消息都被放入队列,但实际上只有第二条消

  • 我有一个稍微令人困惑的问题,我认为由于一个愚蠢的疏忽,这将是一个容易解决的问题。 我的主要任务是上传图像和录音文件到我的服务器上的一个位置。我通过FTP这样做。 活动通过startService(intentName)调用服务 onHandleIntent()创建一个新线程 在新线程中,需要上传的文件被放入一个列表数组 在列表数组中循环。在这个循环中,将文件名传递给FTP服务器。如果添加成功,我会

  • 版本: SpringBoot: 2.3.12。发布 SpringCloud:Hoxton。SR12 SpringCloud Starter Sleuth: 3.0.3 骆驼: 3.4.6 我想将Sleuth添加到一个预先存在的项目中,该项目现在使用ActiveMQ,以前它使用JMS。当我这样做时,ActiceMQ消息中的值会被阻止/删除(其中一个是“filename”,它是S2请求的键值)。其他J

  • 我是一个新的Kafka和使用Apache kafka消费者读取消息从生产者。但当我停下来开始一段时间。之间产生的所有消息都将丢失。如何处理这种情况。我正在使用这些属性“auto.offset.reset”、“latest”和“enable.auto.commit”、“false”。 这是我正在使用的代码。任何帮助都是感激的。

  • 我的应用程序在服务器上发生某种事件后接收推送通知。 除了在一小段时间内有许多事件一个接一个地发生的情况外,一切都很正常。 设备上的Firebase令牌也是正常的,因为应用程序从Firebase控制台接收通知。

  • 根据使用JMS 1.1的官方文档,ActiveMQ 5.15是否可以与JMS 2.0配合使用。将ActiveMQ 5.15与JMS 2.0配合使用有何意义。如果ActiveMQ 5.15 JMS 2.0不是一种可行的方法,那么Artemis是一个很好的替代品吗?Artemis是否足够稳定,可以用于生产级企业应用程序?