当前位置: 首页 > 工具软件 > rmq > 使用案例 >

SpringBoot 集成RMQ

那绪
2023-12-01

SpringBoot 集成RMQ

因为集合了业务,所以Queue,Exchange都做了持久化处理
配置rmq Exchange发送成功回调,和找不到Queue错误回调
项目启动自动创建Exchange,Queue和绑定RoutingKey
ok 直接上图

pom.xml文件引入


<!-- MQ依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

配置yml文件

// An highlighted block
  # rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 开启发送确认
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    #开启手动确认
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true  #开启消费者 程序异常情况下会进行重试
          max-attempts: 3 #重试次数
          initial-interval: 2000 #消费者重试间隔次数 2s

RMQ配置文件

为了项目启动自动创建 Queue Exchange 和绑定RoutingKey
统一写到枚举里,枚举类下面图

// An highlighted block
/**
 * @title:
 * @author: sya
 * @date: 2022年07月29日 10:06
 * @description:
 */
@Configuration
@Slf4j
public class RabbitConfig {

    @Resource
    RabbitAdmin rabbitAdmin;


    @Bean
    public void createExchangeQueue() {

        /**
         * 初始化枚举 自动创建
         */
        for (RabbitmqInitEnum rabbitmqInitEnum : RabbitmqInitEnum.values()) {

            DirectExchange directExchange = new DirectExchange(rabbitmqInitEnum.rabbitmqEntity.getExchange(), true, false);
            Queue queue = new Queue(rabbitmqInitEnum.rabbitmqEntity.getQueue(), true, false, false);

            rabbitAdmin.declareExchange(directExchange);
            rabbitAdmin.declareQueue(queue);

            if (!StringUtil.isNullorEmpty(rabbitmqInitEnum.rabbitmqEntity.getRoutingKey())) {
                rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rabbitmqInitEnum.rabbitmqEntity.getRoutingKey()));
            }

        }
    }


    /**
     * 创建初始化RabbitAdmin对象
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }


    /**
     * 初始化配置 发送回调和消费失败回调
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *  ConfirmCallback机制只确认消息是否到达exchange(交换器),不保证消息可以路由到正确的queue;
             *  需要设置:publisher-confirm-type: CORRELATED;
             *  springboot版本较低 参数设置改成:publisher-confirms: true
             *
             *  以实现方法confirm中ack属性为标准,true到达
             *  config : 需要开启rabbitmq得ack publisher-confirm-type
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("ConfirmCallback  确认结果 (true代表发送成功) : {}  消息唯一标识 : {} 失败原因 :{}", ack, correlationData, cause);

            }

        });

        /**
         * 路由不到发队列时触发,成功则不触;
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("路由不到发队列时触发 {}{}{}{}", message, replyCode, replyText, exchange, routingKey);
        });

        return rabbitTemplate;
    }
}

枚举类

需要创建的Exchange,Queue和绑定RoutingKey 就可以项目启动创建,无需手创

// An highlighted block
/**
 * @title:
 * @author: sya
 * @date: 2022年07月29日 17:45
 * @description: rabbit初始化 枚举
 */
public enum RabbitmqInitEnum {

    /**
     * 测试创建Rabbitmq DEMO
     */
    TEST(
            new RabbitmqEntity.Builder()
                    .setExchange("autoE")
                    .setQueue("autoQ")
                    .setRoutingKey("autoR")
                    .create()
    );


    public final RabbitmqEntity rabbitmqEntity;


    RabbitmqInitEnum(RabbitmqEntity rabbitmqEntity) {
        this.rabbitmqEntity = rabbitmqEntity;
    }


}

枚举内置实体类

// An highlighted block
/**
 * @title:
 * @author: sya
 * @date: 2022年07月29日 17:56
 * @description:
 */

@Data
public class RabbitmqEntity {

    private String exchange;
    private String queue;
    private String routingKey;


    private RabbitmqEntity(Builder builder) {
        exchange = builder.exchange;
        queue = builder.queue;
        routingKey = builder.routingKey;
    }

    public static class Builder {
        private String exchange;
        private String queue;
        private String routingKey;


        public Builder setExchange(String exchange) {
            this.exchange = exchange;
            return this;
        }

        public Builder setQueue(String queue) {
            this.queue = queue;
            return this;
        }

        public Builder setRoutingKey(String routingKey) {
            this.routingKey = routingKey;
            return this;
        }

        public RabbitmqEntity create() {
            return new RabbitmqEntity(this);
        }

    }

}

封装的RMQ发送方法类

// An highlighted block
/**
 * @title:
 * @author: sya
 * @date: 2022年07月29日 16:39
 * @description:
 */
@Component
public class RabbitmqUtil {


    @Resource
    private RabbitTemplate rabbitTemplate;

    public RabbitmqUtil(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }


    /**
     * 发送信息
     *
     * @param sendData   发送内容
     * @param exchange
     * @param routingKey
     */
    public void send(Object sendData, String exchange, String routingKey) {
        CorrelationData correlationData = new CorrelationData();

        RmqMessageVO rmqMessageVO = new RmqMessageVO();
        rmqMessageVO.setData(sendData);
        rmqMessageVO.setTimestamp(DateTimeUI.getCurrentDateTimeLong());


        Message message = MessageBuilder.withBody(rmqMessageVO.toString().getBytes()).build();
        message.getMessageProperties().setMessageId(RandomGUID.getDatetUUID());


        correlationData.setId(RandomGUID.getDatetUUID());
        correlationData.setReturnedMessage(message);
        rabbitTemplate.convertAndSend(exchange, routingKey, sendData, correlationData);


    }


}

消费者

对象接收需要 实体类 implements Serializable

// An highlighted block
/**
 * @title:
 * @author: sya
 * @date: 2022年07月28日 15:47
 * @description: 消费者
 */
@Component
public class Consumer {


    /**
     *
     * 消息确认回复方法
     * 重回队列(个人不建议容易死循环,可以直接主动抛出异常,利用rabbitmq重试机制重新消费)
     *
     * channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
     * ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
     *
     * channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
     * Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
     *
     * channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
     * nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列
     * 
     * 注意: 对象接收需要 实体类implements Serializable
     */


    /**
     * 消息手动回复
     *
     * @param msg     消息内容
     * @param message
     * @param channel
     * @throws InterruptedException
     */
    @RabbitListener(queues = "autoQ")
    public void listenSimpleQueueMessage(JSONObject msg, Message message, Channel channel) throws InterruptedException, IOException {
        System.out.println(message);
        System.out.println("spring 消费者1接收到消息:【" + msg + "】");
        //手动ack确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }
    
}

发送者

发送对象的话,对象接收需要 实体类implements Serializable

// An highlighted block
/**
 * @author Galen
 * @Date 2021/12/9 0009 16:42
 */
@Slf4j
@Controller
@RequestMapping("/test")
public class TestController {

    @Resource
    private RabbitmqUtil rabbitmqUtil;
    @Resource
    private RabbitTemplate rabbitTemplate;


    @GetMapping(value = "/test")
    @ResponseBody
    public void test() {
    	
        JSONObject jsonObject = new JSONObject();
        JSONObject jsonObject1 = new JSONObject();
        jsonObject.put("aaa","aa");
        jsonObject.put("bbb",jsonObject1);
        rabbitmqUtil.send(jsonObject, RabbitmqInitEnum.TEST.rabbitmqEntity.getExchange(), RabbitmqInitEnum.TEST.rabbitmqEntity.getRoutingKey());
    }


}

一个在学习的开发者,勿喷,欢迎交流

 类似资料: