注意下文中出现的大多都是由作者公司封装过一层
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
公司项目中的sedinbj已经包含mq的核心包,我们直接依赖于sedinbj即可,不需要重新依赖
/**
* 向队列发送消息接口实现
*/
@Bean(name = "sendMessageApi")
public SendMessageApi<BizMessage> sendMessageApi() {
RocketProducerConfig rocketProducerConfig = new RocketProducerConfig(this.serverAddr, this.TOPIC_NAME, "attendanceRecord", this.workerId, this.datacenterId);
rocketProducerConfig.setInstanceName("real-name-service-attendanceRecord");
rocketProducerConfig.setGroupName("real-name-service-attendanceRecord-producers");
rocketProducerConfig.setQueueName("real-name-service-attendanceRecord");
return new RocketProducer(rocketProducerConfig);
}
其中需要了解:
以下这些是封装的rocketMQ生产者RocketProducerConfig的参数含义,
serverAddr:队列服务器地址;
topic_name:队列主题;
tags1:标签;
workerId:工作机器Id,大于0不大于32的整数;
datacenterId:数据中心id。
我们是这样定义的:
/**
* 队列主题名称
*/
private final String TOPIC_NAME = "real-name-service-sync";
/**
* 队列服务地址
*/
@Value("${mq.serverAddr}")
private String serverAddr;
/**
* 工作机器ID
*/
private final long workerId= WorkIdUtils.getWorkId();
/**
* 数据中心ID
*/
private final Long datacenterId=1l;
最后需要自由设置InstanceName
GroupName
QueueName
三个名称参数,将rocketProducerConfig作为参数返回RocketProducer对象即可
/**
* 监听队列消息接口实现
*/
@Bean(name = "receiveMessageApi")
public ReceiveMessageApi<MessageListener> receiveMessageApi() {
RocketConsumerConfig rocketConsumerConfig = new RocketConsumerConfig(this.serverAddr, this.TOPIC_NAME, "attendanceRecord", this.workerId, this.datacenterId);
rocketConsumerConfig.setInstanceName("real-name-service-attendanceRecord");
rocketConsumerConfig.setGroupName("real-name-service-attendanceRecord-consumers");
rocketConsumerConfig.setQueueName("real-name-service-attendanceRecord");
rocketConsumerConfig.setConsumeMessageBatchMaxSize(100);
rocketConsumerConfig.setPullBatchSize(100);
rocketConsumerConfig.setMessageModel(MessageModel.CLUSTERING);
return new RocketConsumer(rocketConsumerConfig);
}
其中serverAddr
,topic_name
,tags1
,workerId
,datacenterId
,InstanceName
,GroupName
,QueueName
这几个参数和生产者一致
还有另外两个参数:
ConsumeMessageBatchMaxSize
:指批量消费数量的多少,也就是说当MQ消息拉取之后每次派发给我们每个消费者消息的多少;该参数默认值是1,最大值是1024,这个具体设置多少,需要根据自己的业务来定,假如我们每个消息的处理耗时很长,那么这个参数就应该设置的偏小一点,不要让单个消息的消费慢影响同一批中的其他消息。
PullBatchSize
:代表每批次拉取消息的最大个数,默认值是32
rocketConsumerConfig.setMessageModel(MessageModel.CLUSTERING);
这行代码是在设置MQ的模式;Rocket-MQ默认是以集群模式来消费的也是就如代码中的MessageModel.CLUSTERING
;还有一个广播模式可以使用MessageModel.BROADCASTING
集群消费也就是消息的负载均衡消费;广播消息,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。
@Autowired
private SendMessageApi<BizMessage> sendMessageApi;
sendMessageApi.safeSend(new BizMessage(IdWorker.getId(), JSON.toJSONString(result)));
先在业务类中注入封装的生产者(com.sedinbj.kernel.mq.api.SendMessageApi),再通过safeSend方法发送消息
@Component
public class AttendanceRecordMqHandler extends AbstractHandler<BizMessage> {
@Autowired
AttendanceRecordMapper attendanceRecordMapper;
@Autowired
IProLabourPersonService proLabourPersonService;
@Override
@Transactional
public void handler(List<BizMessage> entity) {
//业务代码
}
@Override
public void errorHandlerCallback(List<BizMessage> entity) {
//业务代码
}
}
定义一个类,需要继承AbstractHandler这个类(com.sedinbj.kernel.mq.rocketmq.handler.AbstractHandler),并重写handler方法(处理消息)和errorHandlerCallback方法(错误回调方法)
这两个方法都根据业务需求自行编写即可
需要注意的是handler方法所传入的List参数的多少就是有上边所说的ConsumeMessageBatchMaxSize
参数控制的
@Autowired
private ReceiveMessageApi<MessageListener> receiveMessageApi;
receiveMessageApi.register(new MessageListener(redisTemplate, attendanceRecordMqHandler));
先注入ReceiveMessageApi后(com.sedinbj.kernel.mq.api.ReceiveMessageApi),通过register注册方法,传入一个监听器(com.sedinbj.kernel.mq.rocketmq.listener.MessageListener),监听器中再传入redis(org.springframework.data.redis.core.RedisTemplate)以及上一步中写好的消费者类即可