为了体验rocketmq的request-reply模型,近日将demo中rocketmq-spring-boot-starter库升级到2.1.0版本
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<!--
2.0.2对应rocketmq 4.4.0
2.1.0对应rocketmq 4.6.0
-->
<version>2.1.0</version>
</dependency>
发现发送事务消息的方法参数个数从4个变成3个, 这可是不兼容性升级了,到底为什么会这样呢,我们来看一下
2.0.2中的事务消息方法:
/**
* Send Spring Message in Transaction
*
* @param txProducerGroup the validate txProducerGroup name, set null if using the default name
* @param destination destination formats: `topicName:tags`
* @param message message {@link org.springframework.messaging.Message}
* @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
try {
TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, message);
return txProducer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
2.1.0中的事务消息方法:
/**
* Send Spring Message in Transaction
*
* @param destination destination formats: `topicName:tags`
* @param message message {@link org.springframework.messaging.Message}
* @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
if (((TransactionMQProducer) producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return producer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
去除了txProducerGroup, 同时发现@RocketMQTransactionListener注解中也同样移除了txProducerGroup
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RocketMQTransactionListener {
/**
* Set ExecutorService params -- corePoolSize
*/
int corePoolSize() default 1;
/**
* Set ExecutorService params -- maximumPoolSize
*/
int maximumPoolSize() default 1;
/**
* Set ExecutorService params -- keepAliveTime
*/
long keepAliveTime() default 1000 * 60; //60ms
/**
* Set ExecutorService params -- blockingQueueSize
*/
int blockingQueueSize() default 2000;
/**
* Set rocketMQTemplate bean name, the default is rocketMQTemplate.
* if use ExtRocketMQTemplate, can set ExtRocketMQTemplate bean name.
*/
String rocketMQTemplateBeanName() default "rocketMQTemplate";
}
在rocketmq-spring-boot-starter < 2.1.0以前的项目中,我可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic, 但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报
java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener
看源码中了,似乎表明了项目中只能有一个@RocketMQTransactionListener, 不能出现多个。
@Configuration
public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionConfiguration.class);
private ConfigurableApplicationContext applicationContext;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerTransactionListener);
}
private void registerTransactionListener(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
}
RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
}
((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
}
}
wiki中的第9条有说明这个不兼容的更改
- 如何发送事务消息?
在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明 @RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板 RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。 注意:从 RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置 txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。
但是为什么要对事务消息的api进行重构呢, 似乎是由于下边的这个issue的原因,不过我没有遇到过
Serious bug in TransactionMQ Producer cache in RocketMQTemplate #110
When I execute transactions concurrently by using RocketMQTemplate, it's very easy to reproduce the Exception like "illegal state" The cause for this issue is that a TransactionProducer can't be shared during the execution of a transaction. So, we have to make sure a TransactionProducer instance must not be used concurrently.
当我使用RocketMQTemplate并发的执行事务时,非常容易重现类似"illegal state"的异常 这个问题的原因是一个TransactionProducer在执行事务时不能被共享 所以,我们不得不保证一个TransactionProducer实例必须不能被并发使用
[ISSUE #178] Refactor transaction message implementation #179
Refactor the implementation of transaction message
Brief changelog Use the same TransactionMQProducer to send all types of messages in implementation.
实现了使用同一个TransactionMQProducer来发送所有类型的事务消息