RocketMQ代码实战(一):使用rocketmq-spring-boot-starter发送和消费消息

陶唯
2023-12-01

通过rocketmq-spring-boot-starter可以快速的搭建rocketmq生产者和消费者服务。

1.引入依赖:

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.0.4</version>
</dependency>

2.yml 配置

rocketmq:
  name-server: 172.22.64.1:9876 #rocketmq服务地址
  producer:
    group: rocketmq_test #自定义的组名称

如果需要其他配置可以参考 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties 类中的属性

3.发送消息

使用RocketMQTemplate实现消息的发送

@RestController
public class RocketMqController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * @return
     */
    @GetMapping("sendMq")
    public Object sendMq() {
        MqMessage message = MqMessage.builder().name("普通消息").msg("这是普通消息").build();
        SendResult sendResult = rocketMQTemplate.syncSend(MqUtil.common_topic, message);
        return sendResult;
    }
}

MqUtil是定义的常量类

public class MqUtil {
    public static final String common_topic = "common_topic";
}

MqMessage是定义的消息体

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MqMessage implements Serializable {

    private String name;

    private String msg;

}

4.消费消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = MqUtil.common_topic,
        consumerGroup = "common_consumer_a_group")
public class CommonListenerA implements RocketMQListener<MqMessage> {
    @Override
    public void onMessage(MqMessage message) {
        log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
    }
}

启动项目,访问http://127.0.0.1:8080/sendMq,控制台打印日志:

CommonListenerA收到消息:MqMessage(name=普通消息, msg=这是普通消息)

需要注意的是:我们在消费者中配置的consumerGroup属性,在集群消费模式中,在同一个topic下,相同的ConsumerGroup只会有一个Consumer收到消息。即如果我们将该消费者实例部署了多台,那么只会有一个实例消费到消息。如果我们有另一个该topic下的消费者设置的consumerGroup和该属性值不一样,那么另一个消费者仍然可以消费到该topic的消息。

@RocketMQMessageListener默认的消费模式是集群消费

 类似资料: