我正在尝试设置一个jms队列,并在事务失败时进行重新交付。出现的情况是,一条消息被多次重新传递(和处理),因为在消息处理代码之后,会话会关闭两次。第二次关闭尝试抛出错误,因为它已经关闭,所以事务被回滚,消息被重新传递。
以下是我的配置:
@Configuration
public class MyJtaConfiguration {
private static final Logger LOGGER = Logger.getLogger(MyJtaConfiguration.class);
@Bean(name = "atomikosUserTransaction")
public UserTransaction atomikosUserTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
AtomikosJtaPlatform.transaction = userTransactionImp;
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
AtomikosJtaPlatform.transactionManager = userTransactionManager;
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "atomikosUserTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = atomikosUserTransaction();
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
@Bean
public QueueConnectionFactory connectionFactory() {
Map<String, Object> parameters = new HashMap<>();
parameters.put("dataDirectory", "activeMqDataDirectory");
String brokerUrl = Interpolator.getString("vm://MyBroker?broker.persistent=true&broker.dataDirectory=${dataDirectory}&broker.useJmx=true", parameters );
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(brokerUrl);
activeMQXAConnectionFactory.setTrustedPackages(
new ArrayList<>(Arrays.asList("java.lang,javax.security,java.util,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper,com.brabo".split(","))));
RedeliveryPolicy redeliveryPolicy = activeMQXAConnectionFactory.getRedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(10 * 1000);
redeliveryPolicy.setMaximumRedeliveryDelay(10 * 1000);
redeliveryPolicy.setMaximumRedeliveries(2);
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("xamq");
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
try {
atomikosConnectionFactoryBean.init();
} catch (JMSException e) {
throw new RuntimeException(e);
}
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(atomikosConnectionFactoryBean);
cachingConnectionFactory.setSessionCacheSize(50);
cachingConnectionFactory.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOGGER.error(exception);
}
});
return cachingConnectionFactory;
}
@Bean(name = "myTestQueue")
public Queue backgroundTaskQueue() {
ActiveMQQueue queue = new ActiveMQQueue("myTestQueue");
return queue;
}
@Bean
public DefaultMessageListenerContainer backgroundTaskQueueListenerContainer(@Autowired ConnectionFactory connectionFactory,
@Autowired @Qualifier("myServiceProcessorBean") MessageListener messageListener, @Autowired PlatformTransactionManager txManager) {
return createListenerContainer(connectionFactory, messageListener, txManager, QueueManager.BACKGROUND_TASK_QUEUE_JNDI);
}
private DefaultMessageListenerContainer createListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener, PlatformTransactionManager txManager,
String destinationName) {
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setDestinationName(destinationName);
listenerContainer.setMessageListener(messageListener);
listenerContainer.setTransactionManager(txManager);
listenerContainer.setSessionTransacted(true);
listenerContainer.setConcurrentConsumers(1);
listenerContainer.setReceiveTimeout(3000);
return listenerContainer;
}
}
和
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = {"com.myapp.common"},
entityManagerFactoryRef = "myEntityManager", transactionManagerRef = "transactionManager")
@ConfigurationProperties("myapp.ds")
@Validated
public class MyDataSourceConfiguration {
@Bean
public DataSource dataSource() throws SQLException {
OracleXADataSource mysqlXaDataSource = new OracleXADataSource();
mysqlXaDataSource.setURL(url);
mysqlXaDataSource.setPassword(password);
mysqlXaDataSource.setUser(username);
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("xads");
xaDataSource.setPoolSize(10);
xaDataSource.setMaxPoolSize(70);
xaDataSource.init();
return xaDataSource;
}
@Bean(name = "myEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("javax.persistence.transactionType", "JTA");
properties.put("hibernate.archive.autodetection", "hbm");
properties.put("hibernate.dialect", "org.hibernate.dialect.Oracle10gDialect");
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("hibernate.generate_statistics", "false");
properties.put("hibernate.jdbc_fetch_size", "2");
properties.put("hibernate.jdbc.batch_size", "20");
properties.put("hibernate.show_sql", "false");
properties.put("hibernate.format_sql", "false");
properties.put("hibernate.hbm2ddl.auto", "none");
properties.put("hibernate.id.new_generator_mappings", "false");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(dataSource());
entityManager.setPackagesToScan("com.myapp.common");
entityManager.setPersistenceUnitName("myPersistenceUnit");
entityManager.setJpaPropertyMap(properties);
entityManager.setPersistenceProvider(new HibernatePersistenceProvider());
return entityManager;
}
}
怎么了?上面提到的错误是:ExceptionApperstandardImpl:hh000346:托管刷新期间出错[会话/实体管理器关闭]
我会尽量确保datasource connectionFactory在事务管理器之前初始化,然后关闭。这本身就是一种进步,不管它是否解决了问题。
如果ActiveMQ Artemis配置了>0,并且JMS侦听器使用或,那么代理将按预期重新传递消息。但是,如果生产者在重新传递期间将消息推送到队列中,那么接收者将获得无序的消息。 例如: 队列:1->消息1按预期重新传递 重新交付阶段的推送 当为时,一切都没问题,但是用户端的重新交付频率太高了。我的期望是,在未确认的消息从队列中清除或确认之前,应该停止向使用者的每一次传递。我们正在使用一个队列来
因此,我使用Spring integration链接JMS和ActiveMQ,如下所示:- 如何使其工作,以便发送到此队列并从中接收消息?请帮忙。
我正在使用ActiveMQ Artemis 2.17和Spring Boot 2.5.7。我正在发布关于主题和队列的消息并使用它。所有这些都是通过JMS完成的。所有队列(选播或多播)都是耐用的。我的主题(多播地址)有两个持久队列,以便有两个独立的使用者。在我的主题中,这两个消费者使用持久和共享订阅(JMS 2.0)。所有处理都是事务性的,通过Atomikos事务管理器进行管理(我需要它来提交数据库
我正在读一条来自Solace的信息。我能够成功地阅读信息。假设我正在阅读一条消息,在侦听器线程上读取/处理消息时,应用程序崩溃。那我怎么能在那上面再读一遍那条信息呢。使用下面的代码,我无法再次阅读该消息。下面是我的配置
spring XML中的jmsTemplate定义: 有人对问题有什么建议吗/关于如何实现延迟消息传递的其他想法?谢了!
我正在使用spring boot连接到solace队列。我使用下面的教程连接到solace JMS队列。https://www.devglan.com/spring-boot/spring-jms-solace-example它能够连接到solace队列。当应用程序启动并且与solace队列的连接正常工作时,我们遇到了一个问题,但在solace队列停止运行一段时间后,spring boot应用程序