我正在尝试获得骆驼路线JMS-
下面的例子说明了如果REST服务的服务器出现故障而无法交付route时会发生什么情况。
我得到了正确的例外:
2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …
但是消息被消费并从队列中删除。我的假设是使用事务/事务骆驼和AMQ可以解决这个问题并将消息移动到ActiveMQ.DLQ.
我已经阅读了《骆驼行动》第一版的第9章,并在谷歌上搜索,但没有找到任何解决我问题的方法。
我知道我可以创建/定义自己的TransactionErrorHandler()并将消息存储在我选择的队列中,但我的印象是,在使用事务处理时,这是默认的…
我正在使用独立的ActiveMQ 5.15.2 vanilla安装和配置。
Camel 2.20.1
MacOS 10.13.2上的Java8_144
我的配置:
@Configuration
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;
/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;
/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}
/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Bean
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);
return jmsConfiguration;
}
/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Bean
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}
/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}
}
我的路线:
@Component
public class SendToCore extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");
//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();
from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}
您可能会在某些豆子中找到多余的声明,这就是我试图解决问题......
在我的Github repo中添加一个链接,用一个小的测试项目来说明这一点:< br > https://github.com/hakuseki/transacted
刚刚注意到,如果您希望Spring Boot处理这些池和配置的生命周期,那么您不应该直接调用它们的方法,而是让它们作为参数提供在方法签名中
例如这个
public ActiveMQComponent activeMQComponent() {
应该是
public ActiveMQComponent activeMQComponent(JmsConfiguration config, ConnectionFactory cf, ...) {
然后 Spring Boot 会为你提供这些豆子。
关于为什么你的交易不起作用,那么你可以看看骆驼在行动第二版书中的一些交易示例:https://github.com/camelinaction/camelinaction2/tree/master/chapter12
这可能是SpringBoot自动配置的问题。
如果消息丢失而不是发送到DLQ,Camel的ActiveMQ组件会自动提交消息,而不是等待工作完成。
注意:我的配置没有事务管理器,因为您的情况不需要它。相反,只需在ActiveMQComponent
中设置事务管理器为
true
和lazyCreateTransactionManager
为false
。然后您与代理进行了“本地”事务,这就是您所需要的。
< li >我删除了< code >。transactioned()
(需要事务管理器,但不需要有“JMS本地事务处理”路由) < li >我在route类中注释掉了您的错误处理程序(需要一个事务管理器,您可以使用默认的错误处理程序) < li >禁用< code>MainApplication中JMS和ActiveMQ的自动配置:< code > @ spring boot application(exclude = { jmsautoconfiguration . class,activemqautoconfiguration . class }) < li >用下面的配置替换您的Java配置(改编自这个问题:ConnectionFactory在camel之前被销毁)
Java配置:
@Value("${jms.broker.url}")
String brokerURL;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(1);
pooledConnectionFactory.setConnectionFactory(cf);
return pooledConnectionFactory;
}
@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(connectionFactory);
activeMQComponent.setTransacted(true);
activeMQComponent.setLazyCreateTransactionManager(false);
return activeMQComponent;
}
最后,只是为了“运行”路线,我添加了一个小的骆驼路线测试
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class SampleCamelApplicationTest {
@Produce(uri = "activemq:queue:myIncomingQueue")
protected ProducerTemplate template;
@Test
public void shouldProduceMessages() throws Exception {
template.sendBody("test");
Thread.sleep(20000); //wait for ActiveMQ redeliveries
}
}
如果我运行这个测试,消息将会发送到< code>ActiveMQ。DLQ
希望这有帮助
当使用事务处理时,需要创建 Session 对象。在进行事务处理时,可以混用 ORM 方法和 RAW 方法,如下代码所示: func MyTransactionOps() error { session := engine.NewSession() defer session.Close() // add Begin() before any action
启动事务 $this->db->start(); Swoole::$php->db('slave2')->start(); 提交事务 $this->db->commit(); Swoole::$php->db('slave2')->commit(); 回滚事务 $this->db->rollback(); Swoole::$php->db('slave2')->rollback();
在2.0.0之后我们已经支持事务嵌套了,是通过事务等级去实现的。 1. 开始事务 $model->beginTransaction(); 2. 事务提交 $model->commit(); 3. 事务回滚 $model->rollback();
事务处理(transaction processing) 可以用来维护数据的完整性,保证SQL的操作要么完全执行,要么完全不执行,如果发生错误就进行撤销。 保证数据的完整性。 保证数据不受外影响。 事务处理的几道术语 事务(transaction) 一组SQL语句 退回(rollback)撤销执行SQL语句的过程 提交(commit) 将为执行的SQL语句写入数据库表 保留点(savepoint)
问题出在@Transactional中,在我的配置中spring应用程序没有使用它。我怎么能修好它? ...REST控制器没有任何事务性方法,它只使用specifiedServices加载实体。依赖集合(如果未加载到服务中)应为空。 应用程序启动程序类: 我还尝试将@Transactional添加到存储库接口中,但对我来说并不起作用 所以我从存储库中删除了@Transactional,创建了其他服
ORM 可以简单的进行事务操作 o := NewOrm() err := o.Begin() // 事务处理过程 ... ... // 此过程中的所有使用 o Ormer 对象的查询都在事务处理范围内 if SomeError { err = o.Rollback() } else { err = o.Commit() }