我正在尝试SRSpring AMQP关于事务性消息处理的功能。
我有以下设置-我有一个消息消费者,被注释为@Transactional
@Transactional
public void handleMessage(EventPayload event) {
Shop shop = new Shop();
shop.setName(event.getName());
Shop savedShop = shopService.create(shop);
log.info("Created shop {} from event {}", shop, event);
}
在商店服务中。创建
I保存店铺并发送另一条关于创建的消息:
@Transactional(propagation = REQUIRED)
@Component
public class ShopService {
...
public Shop create(Shop shop) {
eventPublisher.publish(new EventPayload(shop.getName()));
return shopRepository.save(shop);
}
}
我想实现以下目标——如果数据库操作成功,则在create
方法中发送的消息应该直接发送到代理。如果失败,则不发送消息,并回滚接收到的消息。
我还配置了重试功能,所以我希望每条消息在被拒绝之前都会重试3次:
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000)
.build();
}
我观察到以下行为:
当我按照以下方式配置容器时,消息会重试3次,但每次都会在shopService中重试消息。创建
被发送到代理:
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
return container;
}
因此,我尝试将平台TransactionManager
传递给容器-
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
PlatformTransactionManager transactionManager) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
container.setTransactionManager(transactionManager);
container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
return container;
}
现在,在shop Service.create
中发送的消息只有在数据库事务成功的情况下才会发送给代理——这就是我想要的——但是现在消息会无限期重试——并且不会在3次按配置退出后丢弃。但是似乎应用了BackOff设置——所以重试之间有一段时间。
从业务的角度来看,所描述的设置没有真正意义——我试图理解和评估事务功能。
我正在使用sping-amqp 1.5.1。释放
谢谢你的提示。
我有同样的要求,一个用@Transactional
注释的@RabbitListener
,我想重试后退。它甚至可以在以下配置中无状态工作:
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor( ) {
return RetryInterceptorBuilder.stateless()
.maxAttempts( 3 )
.recoverer( new RejectAndDontRequeueRecoverer() )
.backOffOptions(1000, 2, 10000)
.build();
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter( ObjectMapper objectMapper ) {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter( objectMapper );
jackson2JsonMessageConverter.setCreateMessageIds( true );
return jackson2JsonMessageConverter;
}
@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory ();
container.setConnectionFactory(connectionFactory);
container.setChannelTransacted(true);
container.setTransactionManager(transactionManager);
container.setAdviceChain( retryOperationsInterceptor() );
container.setMessageConverter( converter );
return container;
}
要使用stateless()
,使用RejectAndDontRequeueRecoverer
非常重要,因为否则重试将有效,但消费者会在默认情况下将消息放回队列。然后消费者将再次检索它,应用重试策略,然后无限次地将其放回队列。
SQLAlchemy 1.4 / 2.0 Tutorial 此页是 SQLAlchemy 1.4/2.0教程 . 上一页: 建立连接-引擎 |下一步: |next| 处理事务和DBAPI 与 Engine 对象准备好了,我们现在可以开始深入研究一个 Engine 以及它的主要交互端点 Connection 和 Result . 我们还将介绍ORM facade 对于这些对象,称为 Session
当使用事务处理时,需要创建 Session 对象。在进行事务处理时,可以混用 ORM 方法和 RAW 方法,如下代码所示: func MyTransactionOps() error { session := engine.NewSession() defer session.Close() // add Begin() before any action
启动事务 $this->db->start(); Swoole::$php->db('slave2')->start(); 提交事务 $this->db->commit(); Swoole::$php->db('slave2')->commit(); 回滚事务 $this->db->rollback(); Swoole::$php->db('slave2')->rollback();
在2.0.0之后我们已经支持事务嵌套了,是通过事务等级去实现的。 1. 开始事务 $model->beginTransaction(); 2. 事务提交 $model->commit(); 3. 事务回滚 $model->rollback();
事务处理(transaction processing) 可以用来维护数据的完整性,保证SQL的操作要么完全执行,要么完全不执行,如果发生错误就进行撤销。 保证数据的完整性。 保证数据不受外影响。 事务处理的几道术语 事务(transaction) 一组SQL语句 退回(rollback)撤销执行SQL语句的过程 提交(commit) 将为执行的SQL语句写入数据库表 保留点(savepoint)
本文向大家介绍C#中事务处理和非事务处理方法实例分析,包括了C#中事务处理和非事务处理方法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#中事务处理和非事务处理方法。分享给大家供大家参考。具体如下: C#代码如下: StringUtil.cs如下: DbUtils.cs如下: 希望本文所述对大家的C#程序设计有所帮助。