在实际工作中主要使用SpringBoot集成开发,SpringBoot集成RocketMQ需要引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
引入依赖后之后需要做到在配置文件application.yml里配置rocketMQ生产者:
rocketmq:
name-server: 192.168.36.131:9876
producer:
group: poc-mq-product
生产者可以在业务层创建发送消息的方法:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String topic, String msg) {
this.rocketMQTemplate.convertAndSend(topic, msg);
}
@Override
public void sendMessageInTransaction(String topic, String msg) {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(msg).build();
String destination = topic + ":" + tags[i % tags.length];
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
}
}
public void sendAsynMessag(String topic,String tag, String msg) {
String destination = topic + ":" + tag;
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendAndReceive(destination,message,new RocketMQLocalRequestCallback(){
@Override
public void onSuccess(Object o) {
}
@Override
public void onException(Throwable throwable) {
}
});
}
@Override
public void syncSendOrderly(String topic, String tag, String msg) {
String destination = topic + ":" + tag;
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(destination, message);
}
@Override
public void send(String topic, String tag, String msg) {
String destination = topic + ":" + tag;
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.send(destination, message);
}
消费者监听消费
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = Constant.consumerGroup, topic = Constant.topic, consumeMode = ConsumeMode.CONCURRENTLY)
public class MqConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Autowired
private RedisUtils redisUtils;
@Override
public void onMessage(String message) {
log.info("消费端 Received message : " + message);
String businessCode = JsonUtil.jsonFromJsonPath(message, "$.mqBody.businessCode");
String businessType = JsonUtil.jsonFromJsonPath(message, "$.businessName");
String mobile = JsonUtil.jsonFromJsonPath(message, "$.mqBody.mobile");
//1:直接存储为JSON格式数据
JSONObject json = JSON.parseObject(message);
//2:存储到redis里hash里面
json.put("reciveTime", DataUtils.format(new Date()));
message = json.toJSONString();
Object ob = redisUtils.hmGet(Constant.mqBusinessName, businessType);
dealMqData(ob, message, businessType);
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 每次拉取的间隔,单位为毫秒
defaultMQPushConsumer.setPullInterval(1000);
// 设置每次从队列中拉取的消息数为1
defaultMQPushConsumer.setPullBatchSize(10);
defaultMQPushConsumer.setConsumeTimeout(TimeUnit.SECONDS.toSeconds(10));
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
}
事务监听:
/**
* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
**/
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}