当前位置: 首页 > 编程笔记 >

RocketMq事务消息发送代码流程详解

欧阳骏俊
2023-03-14
本文向大家介绍RocketMq事务消息发送代码流程详解,包括了RocketMq事务消息发送代码流程详解的使用技巧和注意事项,需要的朋友参考一下

一、RocketMq事务消息流程:

1、首先会向broker发送一个预请求消息,消费者不可见

2、回调执行本地事务(比如操作数据库)

3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例:

1、引入rocketMq相关的依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2、创建一个TransactionProducer类:

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //创建生产者并制定组名
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、指定消息监听对象用于执行本地事务和消息回查
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、启动producer
    producer.start();

    //6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //主题
        "Tags", //主要用于消息过滤
        "Key_1", //消息唯一值
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、发送事务消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}

3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {
  //存储事务状态信息 key:事务id value:当前事务执行的状态
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //执行本地事务
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //事务id
    String transactionId = message.getTransactionId();
    //0:执行中,状态未知 1:执行成功 2:执行失败
    localTrans.put(transactionId, 0);
    //业务执行,本地事务,service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("正在执行本地事务---");
      Thread.sleep(60000*2);
      System.out.println("本地事务执行成功---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //消息回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //获取对应事务的状态信息
    String transactionId = messageExt.getTransactionId();
    //获取对应事务id执行状态
    Integer status = localTrans.get(transactionId);
    //消息回查
    System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊html" target="_blank">教程。

 类似资料:
  • rocketMq代码写法上,是否支持这个操作呢?有没有什么坑?

  • 主要内容:1 send源码入口,1.1 同步消息,1.2 单向消息,1.3 异步消息,2 sendDefaultImpl发送消息实现,2.1 makeSureStateOK确定生产者服务状态,2.2 checkMessage校验消息的合法性,2.3 tryToFindTopicPublishInfo查找topic的发布信息,2.4 计算发送次数timesTotal,2.5 selectOneMessageQueue选择消息队列,,,基于RocketMQ 4.9.3,详细的介绍了Producer发

  • 本文向大家介绍微信开发 消息推送实现代码,包括了微信开发 消息推送实现代码的使用技巧和注意事项,需要的朋友参考一下 最近做微信公共号的开发,有个需求是这样的消息推送,以文本的形式把编辑的消息发送给微信企业号中的某一个应用组,这里做下笔记,以下是整理内容: 根据当前日期 判断Access_Token 是否超期 如果超期返回新的Access_Token 否则返回之前的Access_Token 感谢阅读

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 接口说明 轻推轻应用/订阅号支持发送文本、图片、文本卡片、图文、key-value、文件、待办等消息类型。本接口针对各种消息类型和发送的对象(单发、群发以及给部分人发送)进行了定义。 注:openid是用户关注某个轻应用/订阅号后生成的唯一id,单发和给部分人发送消息必须携带此参数,可以通过如下接口来获取: 根据qt_code获取用户基本信息 获取使用者列表 通过userId获取openid 消息

  • 主动发送消息 use EasyWeChat\Kernel\Messages\TextCard; // 获取 Messenger 实例 $messenger = $app->messenger; // 准备消息 $message = new TextCard([ 'title' => '你的请假单审批通过', 'description' => '单号:1928373, ...