当前位置: 首页 > 编程笔记 >

学习spring事务与消息队列

秦永望
2023-03-14
本文向大家介绍学习spring事务与消息队列,包括了学习spring事务与消息队列的使用技巧和注意事项,需要的朋友参考一下

在开发过程中,遇到一个bug,产生bug的原因是spring事务提交晚于消息队列的生产消息,导致消息队列消费消息时获取到的数据不正确。这篇文章介绍问题的产生和一步步的解决过程。

一.问题的产生:

场景还原:接口中的一个方法,首先修改订单状态,然后向消息队列中生产消息,消息队列的消费者获取到消息检测订单状态,发现订单状态未更改。

代码:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
  @Resource MqService mqService;
  @OrderDao orderDao;
   
  public void push(String orderId) {
    // 更新订单状态,之前的状态是1
    updateStatus(orderId, 3);
    // 产生消息
    mqService.produce(orderId);
  }
  public viod updateStatus(String orderId, Integer status) {
    orderDao.updateStatus(orderId, status);
  }
}

问题产生原因:orderApi中的所有方法都有事务,事务类型PROPAGATION_REQUIRED,所以push方法对数据的操作会在push代码全部执行之后提交,而在事务提交之前消息队列的消息已经产生所以消息队列中消费到的订单从数据库查询出的状态可能还为1。为了让bug现象更明显,可以在push方法最后添加:

try {
  Thread.sleep(10000);
} catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
}

这样就会发现消费消息时,订单状态一定是未修改的。 

二.问题的解决:

解决方案:在更新数据时,新建一个事物,保证更新代码执行完成后,更新数据库的事务已被提交。(确保消息产生前数据库操作已提交)

按照上述方案,我首先想到的是直接修改updateStatus方法的事务类型;我将此方法的事务类型改为PROPAGATION_REQUIRES_NEW(新建事务,如果当前存在事务,把当前事务挂起)。

但是这么做有两点不合适:

  1.强制修改了updateStaus的事务类型,可能影响其他流程。

  2.未起到作用,updateStaus方法中没有新建事务。

关于第二点的解释:spring添加事务是通过BeanNameAutoProxyCreator实现的动态代理,只是给bean对象添加了事务,现在在类内部调用方法,是不会触发新事物的创建的。

所以在经过以上尝试后,我创建了一个新的类:

@Service("orderExtApi")
public class OrderExtApiImpl {
  @Resource OrderApi orderApi;
   
  public void updateStatusNewPropagation(String orderId) {
    orderApi.updateStatus(orderId);
  }
}

并为updateStatusNewPropagation方法添加事务PROPAGATION_REQUIRES_NEW

这个类就只是为了给orderApi中的updateStaus方法新起一个事务。

ok,到此为止bug已经解决了。

但是代码中还是存在问题:对数据库的操作已经提交,如果生产消息出现异常对业务逻辑来说还是错误的。所以需要检测消息的产生是否完成。

最终orderApi中的代码如下:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
  @Resource MqService mqService;
  @Resource OrderDao orderDao;
  @Resource OrderExtApiImpl orderExtApi;
   
  public void push(String orderId) {
    // 更新订单状态,之前的状态是1
    orderExtApi.updateStatusNewPropagation(orderId, 3);
    // 产生消息--produce会检测是否出现异常 当返回1时表示生产消息成功
    Response response = mqService.produce(orderId);
    if (response.getCode() != 1) {
      log.info("消息队列生产消息异常:" + response.getErrorMsg())
      // 生产消息异常,重置状态 等待下次重新执行
      orderExtApi.updateStatusNewPropagation(orderId, 1);
    }
     
  }
  public viod updateStatus(String orderId, Integer status) {
    orderDao.updateStatus(orderId, status);
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 我刚才看到了三个方法的文档,当我们在工作线程中工作时,它们可以用来在UI线程中执行一段代码。方法有: > public final void runOnUIThread(Runnable action)-在UI线程上运行指定的操作。如果当前线程是UI线程,则立即执行该操作。如果当前线程不是UI线程,则将操作发布到UI线程的事件队列中 public boolean post(Runnable act

  • 我们需要将JMS队列中的消息持久化到事务中的数据库中,以确保在DB持久化期间抛出任何错误时不会确认JMS消息。基于此处提供的解决方案——使用消息驱动通道适配器时的事务处理 请确认理解是否正确。此外,还有以下问题 在这种情况下是否需要TransactionAware ConnectionFactoryProxy JMS Queue和JDBC是两个独立的事务资源。如本例所示,将jdbc事务管理器注入J

  • 为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享

  • 一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 参考资料 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 发布/订阅 消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。 发布与订阅模式和观察者模式有以下不同: 观察者模式中,观察者和主题都知道对方的存在;

  • 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: *

  • 消息队列接口 结构体 struct   rt_messagequeue   消息队列控制块 更多...   类型定义 typedef struct rt_messagequeue *  rt_mq_t   消息队列类型指针定义   函数 rt_err_t  rt_mq_init (rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_