当前位置: 首页 > 知识库问答 >
问题:

Spring AMQP事务处理和重试

江迪
2023-03-14

我正在尝试SRSpring AMQP关于事务性消息处理的功能。

我有以下设置-我有一个消息消费者,被注释为@Transactional

@Transactional
    public void handleMessage(EventPayload event) {
        Shop shop = new Shop();
        shop.setName(event.getName());

        Shop savedShop = shopService.create(shop);

        log.info("Created shop {} from event {}", shop, event);
    }

商店服务中。创建I保存店铺并发送另一条关于创建的消息

@Transactional(propagation = REQUIRED)
@Component
public class ShopService {

   ...

    public Shop create(Shop shop) {
        eventPublisher.publish(new EventPayload(shop.getName()));
        return shopRepository.save(shop);
    }
}

我想实现以下目标——如果数据库操作成功,则在create方法中发送的消息应该直接发送到代理。如果失败,则不发送消息,并回滚接收到的消息。

我还配置了重试功能,所以我希望每条消息在被拒绝之前都会重试3次:

@Bean
    public RetryOperationsInterceptor retryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .maxAttempts(3)
                .backOffOptions(1000, 2.0, 10000)
                .build();
    }

我观察到以下行为:

当我按照以下方式配置容器时,消息会重试3次,但每次都会在shopService中重试消息。创建被发送到代理:

@Bean
    SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
                                                            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(testEventSubscriberQueue().getName());
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
        return container;
    }

因此,我尝试将平台TransactionManager传递给容器-

@Bean
    SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
                                                            MessageListenerAdapter listenerAdapter,
                                                            PlatformTransactionManager transactionManager) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(testEventSubscriberQueue().getName());
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        container.setTransactionManager(transactionManager);
        container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
        return container;
    }

现在,在shop Service.create中发送的消息只有在数据库事务成功的情况下才会发送给代理——这就是我想要的——但是现在消息会无限期重试——并且不会在3次按配置退出后丢弃。但是似乎应用了BackOff设置——所以重试之间有一段时间。

从业务的角度来看,所描述的设置没有真正意义——我试图理解和评估事务功能。

我正在使用sping-amqp 1.5.1。释放

谢谢你的提示。

共有1个答案

季城
2023-03-14

我有同样的要求,一个用@Transactional注释的@RabbitListener,我想重试后退。它甚至可以在以下配置中无状态工作:

   @Bean
   public RetryOperationsInterceptor retryOperationsInterceptor( ) {
      return RetryInterceptorBuilder.stateless()
            .maxAttempts( 3 )
            .recoverer( new RejectAndDontRequeueRecoverer() )
            .backOffOptions(1000, 2, 10000)
            .build();
   }

   @Bean
   public Jackson2JsonMessageConverter producerJackson2MessageConverter( ObjectMapper objectMapper ) {
      Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter( objectMapper );
      jackson2JsonMessageConverter.setCreateMessageIds( true );
      return jackson2JsonMessageConverter;
   }      

   @Bean
   SimpleRabbitListenerContainerFactory  rabbitListenerContainerFactory( ConnectionFactory connectionFactory,
                                                            PlatformTransactionManager transactionManager,
                                                                   Jackson2JsonMessageConverter converter) {
      SimpleRabbitListenerContainerFactory  container = new SimpleRabbitListenerContainerFactory ();
      container.setConnectionFactory(connectionFactory);

      container.setChannelTransacted(true);
      container.setTransactionManager(transactionManager);

      container.setAdviceChain( retryOperationsInterceptor() );

      container.setMessageConverter( converter );
      return container;
   }

要使用stateless(),使用RejectAndDontRequeueRecoverer非常重要,因为否则重试将有效,但消费者会在默认情况下将消息放回队列。然后消费者将再次检索它,应用重试策略,然后无限次地将其放回队列。

 类似资料:
  • SQLAlchemy 1.4 / 2.0 Tutorial 此页是 SQLAlchemy 1.4/2.0教程 . 上一页: 建立连接-引擎 |下一步: |next| 处理事务和DBAPI 与 Engine 对象准备好了,我们现在可以开始深入研究一个 Engine 以及它的主要交互端点 Connection 和 Result . 我们还将介绍ORM facade 对于这些对象,称为 Session

  • 当使用事务处理时,需要创建 Session 对象。在进行事务处理时,可以混用 ORM 方法和 RAW 方法,如下代码所示: func MyTransactionOps() error { session := engine.NewSession() defer session.Close() // add Begin() before any action

  • 启动事务 $this->db->start(); Swoole::$php->db('slave2')->start(); 提交事务 $this->db->commit(); Swoole::$php->db('slave2')->commit(); 回滚事务 $this->db->rollback(); Swoole::$php->db('slave2')->rollback();

  • 在2.0.0之后我们已经支持事务嵌套了,是通过事务等级去实现的。 1. 开始事务 $model->beginTransaction(); 2. 事务提交 $model->commit(); 3. 事务回滚 $model->rollback();

  • 事务处理(transaction processing) 可以用来维护数据的完整性,保证SQL的操作要么完全执行,要么完全不执行,如果发生错误就进行撤销。 保证数据的完整性。 保证数据不受外影响。 事务处理的几道术语 事务(transaction) 一组SQL语句 退回(rollback)撤销执行SQL语句的过程 提交(commit) 将为执行的SQL语句写入数据库表 保留点(savepoint)

  • 本文向大家介绍C#中事务处理和非事务处理方法实例分析,包括了C#中事务处理和非事务处理方法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#中事务处理和非事务处理方法。分享给大家供大家参考。具体如下: C#代码如下: StringUtil.cs如下: DbUtils.cs如下: 希望本文所述对大家的C#程序设计有所帮助。