rocketmq-spring-boot-starter 2.1.0 事务消息 txProducerGroup 移除解读

梁丘威
2023-12-01

为了体验rocketmq的request-reply模型,近日将demo中rocketmq-spring-boot-starter库升级到2.1.0版本

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <!--
        2.0.2对应rocketmq 4.4.0
        2.1.0对应rocketmq 4.6.0
        -->
        <version>2.1.0</version>
    </dependency>

发现发送事务消息的方法参数个数从4个变成3个, 这可是不兼容性升级了,到底为什么会这样呢,我们来看一下

2.0.2中的事务消息方法:

    /**
     * Send Spring Message in Transaction
     *
     * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
     * @param destination     destination formats: `topicName:tags`
     * @param message         message {@link org.springframework.messaging.Message}
     * @param arg             ext arg
     * @return TransactionSendResult
     * @throws MessagingException
     */
    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
        try {
            TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            return txProducer.sendMessageInTransaction(rocketMsg, arg);
        } catch (MQClientException e) {
            throw RocketMQUtil.convert(e);
        }
    }

2.1.0中的事务消息方法:

    /**
     * Send Spring Message in Transaction
     *
     * @param destination destination formats: `topicName:tags`
     * @param message message {@link org.springframework.messaging.Message}
     * @param arg ext arg
     * @return TransactionSendResult
     * @throws MessagingException
     */
    public TransactionSendResult sendMessageInTransaction(final String destination,
        final Message<?> message, final Object arg) throws MessagingException {
        try {
            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
                throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
            }
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            return producer.sendMessageInTransaction(rocketMsg, arg);
        } catch (MQClientException e) {
            throw RocketMQUtil.convert(e);
        }
    }

去除了txProducerGroup, 同时发现@RocketMQTransactionListener注解中也同样移除了txProducerGroup

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RocketMQTransactionListener {

    /**
     * Set ExecutorService params -- corePoolSize
     */
    int corePoolSize() default 1;

    /**
     * Set ExecutorService params -- maximumPoolSize
     */
    int maximumPoolSize() default 1;

    /**
     * Set ExecutorService params -- keepAliveTime
     */
    long keepAliveTime() default 1000 * 60; //60ms

    /**
     * Set ExecutorService params -- blockingQueueSize
     */
    int blockingQueueSize() default 2000;

    /**
     * Set rocketMQTemplate bean name, the default is rocketMQTemplate.
     * if use ExtRocketMQTemplate, can set ExtRocketMQTemplate bean name.
     */
    String rocketMQTemplateBeanName() default "rocketMQTemplate";

}

在rocketmq-spring-boot-starter < 2.1.0以前的项目中,我可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic, 但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

看源码中了,似乎表明了项目中只能有一个@RocketMQTransactionListener, 不能出现多个。

@Configuration
public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

    private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerTransactionListener);
    }

    private void registerTransactionListener(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
        }
        RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
        if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
            throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
        }
        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
            annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
        log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
    }
}

wiki中的第9条有说明这个不兼容的更改

  1. 如何发送事务消息?

在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明 @RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板 RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。 注意:从 RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置 txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。

但是为什么要对事务消息的api进行重构呢, 似乎是由于下边的这个issue的原因,不过我没有遇到过

Serious bug in TransactionMQ Producer cache in RocketMQTemplate #110

When I execute transactions concurrently by using RocketMQTemplate, it's very easy to reproduce the Exception like "illegal state" The cause for this issue is that a TransactionProducer can't be shared during the execution of a transaction. So, we have to make sure a TransactionProducer instance must not be used concurrently.

当我使用RocketMQTemplate并发的执行事务时,非常容易重现类似"illegal state"的异常 这个问题的原因是一个TransactionProducer在执行事务时不能被共享 所以,我们不得不保证一个TransactionProducer实例必须不能被并发使用

[ISSUE #178] Refactor transaction message implementation #179

Refactor the implementation of transaction message

Brief changelog Use the same TransactionMQProducer to send all types of messages in implementation.

实现了使用同一个TransactionMQProducer来发送所有类型的事务消息

 类似资料: