当前位置: 首页 > 知识库问答 >
问题:

Spring Boot、原子和ActiveMQ的Jms队列重新传递问题

吴才俊
2023-03-14

我正在尝试设置一个jms队列,并在事务失败时进行重新交付。出现的情况是,一条消息被多次重新传递(和处理),因为在消息处理代码之后,会话会关闭两次。第二次关闭尝试抛出错误,因为它已经关闭,所以事务被回滚,消息被重新传递。

以下是我的配置:

@Configuration
public class MyJtaConfiguration {

private static final Logger LOGGER = Logger.getLogger(MyJtaConfiguration.class);

@Bean(name = "atomikosUserTransaction")
public UserTransaction atomikosUserTransaction() throws Throwable {
    UserTransactionImp userTransactionImp = new UserTransactionImp();
    userTransactionImp.setTransactionTimeout(10000);

    AtomikosJtaPlatform.transaction = userTransactionImp;

    return userTransactionImp;
}

@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(false);

    AtomikosJtaPlatform.transactionManager = userTransactionManager;

    return userTransactionManager;
}

@Bean(name = "transactionManager")
@DependsOn({ "atomikosUserTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
    UserTransaction userTransaction = atomikosUserTransaction();
    TransactionManager atomikosTransactionManager = atomikosTransactionManager();
    return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}

@Bean
public QueueConnectionFactory connectionFactory() {     
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("dataDirectory", "activeMqDataDirectory");
    String brokerUrl = Interpolator.getString("vm://MyBroker?broker.persistent=true&broker.dataDirectory=${dataDirectory}&broker.useJmx=true", parameters );


    ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(brokerUrl);
    activeMQXAConnectionFactory.setTrustedPackages(
            new ArrayList<>(Arrays.asList("java.lang,javax.security,java.util,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper,com.brabo".split(","))));

    RedeliveryPolicy redeliveryPolicy = activeMQXAConnectionFactory.getRedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(10 * 1000);
    redeliveryPolicy.setMaximumRedeliveryDelay(10 * 1000);
    redeliveryPolicy.setMaximumRedeliveries(2);

    AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
    atomikosConnectionFactoryBean.setUniqueResourceName("xamq");
    atomikosConnectionFactoryBean.setLocalTransactionMode(false);
    atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);

    try {
        atomikosConnectionFactoryBean.init();
    } catch (JMSException e) {
        throw new RuntimeException(e);
    }
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setTargetConnectionFactory(atomikosConnectionFactoryBean);
    cachingConnectionFactory.setSessionCacheSize(50);
    cachingConnectionFactory.setExceptionListener(new ExceptionListener() {

        @Override
        public void onException(JMSException exception) {
            LOGGER.error(exception);
        }
    });
    return cachingConnectionFactory;
}


@Bean(name = "myTestQueue")
public Queue backgroundTaskQueue() {
    ActiveMQQueue queue = new ActiveMQQueue("myTestQueue");
    return queue;
}


@Bean
public DefaultMessageListenerContainer backgroundTaskQueueListenerContainer(@Autowired ConnectionFactory connectionFactory,
        @Autowired @Qualifier("myServiceProcessorBean") MessageListener messageListener, @Autowired PlatformTransactionManager txManager) {
    return createListenerContainer(connectionFactory, messageListener, txManager, QueueManager.BACKGROUND_TASK_QUEUE_JNDI);
}

private DefaultMessageListenerContainer createListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener, PlatformTransactionManager txManager,
        String destinationName) {       
    DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
    listenerContainer.setConnectionFactory(connectionFactory);
    listenerContainer.setDestinationName(destinationName);
    listenerContainer.setMessageListener(messageListener);
    listenerContainer.setTransactionManager(txManager);
    listenerContainer.setSessionTransacted(true); 

    listenerContainer.setConcurrentConsumers(1);
    listenerContainer.setReceiveTimeout(3000);
    return listenerContainer;
}

}

@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = {"com.myapp.common"}, 
    entityManagerFactoryRef = "myEntityManager", transactionManagerRef = "transactionManager")
@ConfigurationProperties("myapp.ds")
@Validated
public class MyDataSourceConfiguration {

@Bean
public DataSource dataSource() throws SQLException {
    OracleXADataSource mysqlXaDataSource = new OracleXADataSource();
    mysqlXaDataSource.setURL(url);
    mysqlXaDataSource.setPassword(password);
    mysqlXaDataSource.setUser(username);

    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(mysqlXaDataSource);
    xaDataSource.setUniqueResourceName("xads");
    xaDataSource.setPoolSize(10);
    xaDataSource.setMaxPoolSize(70);

    xaDataSource.init();
    return xaDataSource;
}

@Bean(name = "myEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {

    HashMap<String, Object> properties = new HashMap<String, Object>();     
    properties.put("javax.persistence.transactionType",     "JTA");
    properties.put("hibernate.archive.autodetection",       "hbm");
    properties.put("hibernate.dialect",                     "org.hibernate.dialect.Oracle10gDialect");
    properties.put("hibernate.transaction.jta.platform",    AtomikosJtaPlatform.class.getName());
    properties.put("hibernate.generate_statistics",         "false");
    properties.put("hibernate.jdbc_fetch_size",             "2");
    properties.put("hibernate.jdbc.batch_size",             "20");
    properties.put("hibernate.show_sql",                    "false");
    properties.put("hibernate.format_sql",                  "false");
    properties.put("hibernate.hbm2ddl.auto",                "none");
    properties.put("hibernate.id.new_generator_mappings",   "false"); 

    LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
    entityManager.setJtaDataSource(dataSource());
    entityManager.setPackagesToScan("com.myapp.common");
    entityManager.setPersistenceUnitName("myPersistenceUnit");
    entityManager.setJpaPropertyMap(properties);
    entityManager.setPersistenceProvider(new HibernatePersistenceProvider());
    return entityManager;
}

}

怎么了?上面提到的错误是:ExceptionApperstandardImpl:hh000346:托管刷新期间出错[会话/实体管理器关闭]

共有1个答案

常温文
2023-03-14

我会尽量确保datasource connectionFactory在事务管理器之前初始化,然后关闭。这本身就是一种进步,不管它是否解决了问题。

 类似资料:
  • 如果ActiveMQ Artemis配置了>0,并且JMS侦听器使用或,那么代理将按预期重新传递消息。但是,如果生产者在重新传递期间将消息推送到队列中,那么接收者将获得无序的消息。 例如: 队列:1->消息1按预期重新传递 重新交付阶段的推送 当为时,一切都没问题,但是用户端的重新交付频率太高了。我的期望是,在未确认的消息从队列中清除或确认之前,应该停止向使用者的每一次传递。我们正在使用一个队列来

  • 因此,我使用Spring integration链接JMS和ActiveMQ,如下所示:- 如何使其工作,以便发送到此队列并从中接收消息?请帮忙。

  • 我正在使用ActiveMQ Artemis 2.17和Spring Boot 2.5.7。我正在发布关于主题和队列的消息并使用它。所有这些都是通过JMS完成的。所有队列(选播或多播)都是耐用的。我的主题(多播地址)有两个持久队列,以便有两个独立的使用者。在我的主题中,这两个消费者使用持久和共享订阅(JMS 2.0)。所有处理都是事务性的,通过Atomikos事务管理器进行管理(我需要它来提交数据库

  • 我正在读一条来自Solace的信息。我能够成功地阅读信息。假设我正在阅读一条消息,在侦听器线程上读取/处理消息时,应用程序崩溃。那我怎么能在那上面再读一遍那条信息呢。使用下面的代码,我无法再次阅读该消息。下面是我的配置

  • spring XML中的jmsTemplate定义: 有人对问题有什么建议吗/关于如何实现延迟消息传递的其他想法?谢了!

  • 我正在使用spring boot连接到solace队列。我使用下面的教程连接到solace JMS队列。https://www.devglan.com/spring-boot/spring-jms-solace-example它能够连接到solace队列。当应用程序启动并且与solace队列的连接正常工作时,我们遇到了一个问题,但在solace队列停止运行一段时间后,spring boot应用程序