当前位置: 首页 > 知识库问答 >
问题:

spring amqp事务语义

范承望
2023-03-14

我目前正在测试一个相当简单的示例,该示例涉及与spring amqp数据库事务相关的消息传递事务。

用例如下所示:

>

@Transactional
public void handleMessage(EventPayload event) {
    MyEntity entity = new MyEntity();
    entity.setName(event.getName());

    rabbitTemplate.convertAndSend("myExchange", "payload.create", payload);

    MyEntity savedEntity = entityRepository.save(entity);
}

数据库操作期间发生故障时的预期行为是,接收到的消息回滚到总线(DefaultRequeueRejected=false),并进入死信队列。此外,发送的消息也应回滚。

我可以通过以下配置实现这一点:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

@Bean
    SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                              MessageListenerAdapter listenerAdapter,
                                                              PlatformTransactionManager transactionManager) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        container.setTransactionManager(transactionManager);
        container.setDefaultRequeueRejected(false);
        return container;
    }

所以这很好——我不明白的是,如果我不在SimpleMessageListenerContainer上设置事务管理器,观察到的行为是完全相同的。所以如果我配置以下内容,bebavior不会改变:

@Bean
        SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                                  MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
            container.setMessageListener(listenerAdapter);
            container.setDefaultRequeueRejected(false);
            return container;
        }

有人能解释一下那里发生了什么吗?为什么第二个案例也有效?如果在SimpleMessageListenerContainer上注册了PlatformTransactionManager,内部会有什么不同。

共有1个答案

郭业
2023-03-14

假设transactionManager是您的数据库tm,因为您的侦听器是@Transactional,所以这些场景没有太大区别。

在第一种情况下,事务在调用侦听器之前由容器启动(实际上是在从内部队列检索消息之前,因此即使没有消息,事务也会启动)。

在第二种情况下,当我们调用侦听器时,事务由事务拦截器启动。

假设侦听器不是事务性的,但一些下游组件是;假设侦听器成功调用了该组件,然后在抛出异常之前做了更多的工作。在这种情况下,数据库提交将成功,消息将被拒绝。这可能不是期望的行为,尤其是在消息被重新请求的情况下。在这种情况下,通过注入数据库tm来同步兔子事务和数据库事务通常更好。

在您的情况下,在db提交和Rabbit ack之间发生失败的可能性很小,因此这实际上不适用于您的情况,并且您不需要容器中的tm。

 类似资料:
  • 一、 我与CachingConnectionFactory有一个SpringAMQP项目。我需要从AMQP连接获取一些属性,例如:状态、连接时间、通道和一些运行时度量。CachingConnectionFactory是否有任何指标支持(例如:https://www.rabbitmq.com/blog/2016/11/30/metrics-support-in-rabbitmq-java-clien

  • 我一直在尝试SpringAMQP。我有几个问题: 我想知道什么是Publisher退货,它与Publisher确认有什么不同。据我所知,我们有一个Publisher Confirm回调,用于检查ACK的状态。现在我看了Spring AMQP和Rabbit MQ中的文档。在这件事上我真的没有发现或理解太多。 还有为什么如果消息试图发送到一个不存在的队列,我不会得到任何类型的确认(ack/nack),

  • 如果另一个客户机在我们调用watch之后更改了powerlevel的值,我们的事务将失败。如果没有客户端更改该值,则该集合将工作。我们可以在循环中执行这段代码,直到它起作用为止。 为什么不能在不能被其他命令打断的事务中执行增量?为什么我们需要迭代而不是等到没有人改变值才开始事务?

  • 本文向大家介绍Oracle中死事务的检查语句,包括了Oracle中死事务的检查语句的使用技巧和注意事项,需要的朋友参考一下 查询v$px_session和v$fast_start_servers,显示很多并行进程在rollback,根据以往的工程经验: 于是改为 之后,再次运行 使用如下脚本查看回滚完毕的预计时间(以天为单位): 24*0.21=5.04小时。即:预计5.04小时后回滚完毕。 另外

  • 问题内容: 最近,在一个用于mysql数据库的PHP脚本中,我需要在恰好位于另一个事务内部的某个位置使用事务。我所有的测试似乎都表明这很好,但是我找不到有关此用法的任何文档。 我想确定-交易中的交易在mysql中是否有效?如果是这样,是否有办法找出嵌套事务中的层数?(即恢复到正常状态需要多少回滚) 预先感谢,Brian 问题答案: 手册的本页可能使您感兴趣: 12.3.3。 导致隐性提交的陈述 ;

  • Django 为你提供几种方法来控制如何管理数据库事务。 管理数据库事务 Django’s default transaction behavior Django 的默认行为是运行在自动提交模式下。任何一个查询都立即被提交到数据库中,除非激活一个事务。具体细节看下面. Django 用事务或者保存点去自动的保证复杂ORM各种查询操作的统一性,尤其是 delete() 和update() 查询. D