通过rocketmq-spring-boot-starter可以快速的搭建rocketmq生产者和消费者服务。
1.引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
2.yml 配置
rocketmq:
name-server: 172.22.64.1:9876 #rocketmq服务地址
producer:
group: rocketmq_test #自定义的组名称
如果需要其他配置可以参考 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties 类中的属性
3.发送消息
使用RocketMQTemplate实现消息的发送
@RestController
public class RocketMqController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* @return
*/
@GetMapping("sendMq")
public Object sendMq() {
MqMessage message = MqMessage.builder().name("普通消息").msg("这是普通消息").build();
SendResult sendResult = rocketMQTemplate.syncSend(MqUtil.common_topic, message);
return sendResult;
}
}
MqUtil是定义的常量类
public class MqUtil {
public static final String common_topic = "common_topic";
}
MqMessage是定义的消息体
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MqMessage implements Serializable {
private String name;
private String msg;
}
4.消费消息
@Slf4j
@Component
@RocketMQMessageListener(
topic = MqUtil.common_topic,
consumerGroup = "common_consumer_a_group")
public class CommonListenerA implements RocketMQListener<MqMessage> {
@Override
public void onMessage(MqMessage message) {
log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
}
}
启动项目,访问http://127.0.0.1:8080/sendMq,控制台打印日志:
CommonListenerA收到消息:MqMessage(name=普通消息, msg=这是普通消息)
需要注意的是:我们在消费者中配置的consumerGroup属性,在集群消费模式中,在同一个topic下,相同的ConsumerGroup只会有一个Consumer收到消息。即如果我们将该消费者实例部署了多台,那么只会有一个实例消费到消息。如果我们有另一个该topic下的消费者设置的consumerGroup和该属性值不一样,那么另一个消费者仍然可以消费到该topic的消息。
@RocketMQMessageListener默认的消费模式是集群消费