RocketMQ-Spring Boot 整合RocketMQ

琴修为
2023-12-01

在实际工作中主要使用SpringBoot集成开发,SpringBoot集成RocketMQ需要引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

引入依赖后之后需要做到在配置文件application.yml里配置rocketMQ生产者:

rocketmq:
  name-server: 192.168.36.131:9876
  producer:
    group: poc-mq-product

生产者可以在业务层创建发送消息的方法:

 @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @Override
    public void sendMessage(String topic, String msg) {
        this.rocketMQTemplate.convertAndSend(topic, msg);
    }

    @Override
    public void sendMessageInTransaction(String topic, String msg)  {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination = topic + ":" + tags[i % tags.length];
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
            System.out.printf("%s%n", sendResult);
        }
    }

    public void sendAsynMessag(String topic,String tag, String msg) {
        String destination = topic + ":" + tag;
        Message message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendAndReceive(destination,message,new RocketMQLocalRequestCallback(){

            @Override
            public void onSuccess(Object o) {

            }

            @Override
            public void onException(Throwable throwable) {

            }
        });
    }

    @Override
    public void syncSendOrderly(String topic, String tag, String msg) {
        String destination = topic + ":" + tag;
        Message message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendOneWay(destination, message);
    }

    @Override
    public void send(String topic, String tag, String msg) {
        String destination = topic + ":" + tag;
        Message message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.send(destination, message);
    }

消费者监听消费

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = Constant.consumerGroup, topic = Constant.topic, consumeMode = ConsumeMode.CONCURRENTLY)
public class MqConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Autowired
    private RedisUtils redisUtils;

    @Override
    public void onMessage(String message) {
        log.info("消费端  Received message : " + message);

        String businessCode = JsonUtil.jsonFromJsonPath(message, "$.mqBody.businessCode");
        String businessType = JsonUtil.jsonFromJsonPath(message, "$.businessName");
        String mobile = JsonUtil.jsonFromJsonPath(message, "$.mqBody.mobile");
        //1:直接存储为JSON格式数据
        JSONObject json = JSON.parseObject(message);

        //2:存储到redis里hash里面
        json.put("reciveTime", DataUtils.format(new Date()));
        message = json.toJSONString();

        Object ob = redisUtils.hmGet(Constant.mqBusinessName, businessType);
        dealMqData(ob, message, businessType);
    }


    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        // 每次拉取的间隔,单位为毫秒
        defaultMQPushConsumer.setPullInterval(1000);
        // 设置每次从队列中拉取的消息数为1
        defaultMQPushConsumer.setPullBatchSize(10);
        defaultMQPushConsumer.setConsumeTimeout(TimeUnit.SECONDS.toSeconds(10));

        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    }

事务监听:

/**
 * 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
 * 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
 * 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
 **/
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
        String destination = arg.toString();
        localTrans.put(transId,msg);
        //这个msg的实现类是GenericMessage,里面实现了toString方法
        //在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
        //而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
        System.out.println("executeLocalTransaction msg = "+msg);
        //转成RocketMQ的Message对象
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    //延迟检查的时间间隔要有点奇怪。
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
        Message originalMessage = localTrans.get(transId);
        //这里能够获取到自定义的transaction_id属性
        System.out.println("checkLocalTransaction msg = "+originalMessage);
        //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
//        String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
        String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
        if(StringUtils.contains(tags,"TagC")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagD")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

 类似资料: