RabbitMQ之spring-boot-starter-amqp的应用

严知
2023-12-01

RabbitMQ的应用之spring-boot-starter-amqp

前言

上一篇 RabbitMQ 已对RabbitMQ和AMQP的概念进行了学习

这一篇的目的主要是使用Spring Boot的spring-boot-starter-amqp整合RabbitMQ,以达到加深理解的目的。

系列

同步异步(wx.request+ajax)
消息队列的学习与理解
消息队列之RabbitMQ的学习
RabbitMQ的应用之spring-boot-starter-amqp

时间线

2020.09.14-2020.09.15——完成初稿

参考链接

RabbitMQ系列

Springboot 整合RabbitMq ,用心看完这一篇就够了

流程

引入依赖->配置->编码

引入依赖

使用的依赖是spring-boot-starter-amqp

spring-boot-starter-amqp官方参考链接

maven Repository—spring-boot-starter-amqp

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Spring Boot整合RabbitMQ为什么会使用spring-boot-starter-amqp?

The Advanced Message Queuing Protocol (AMQP) is a platform-neutral, wire-level protocol for message-oriented middleware. The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. Spring Boot offers several conveniences for working with AMQP through RabbitMQ, including the spring-boot-starter-amqp “Starter”.

翻译:高级消息队列协议(AMQP)是一种平台无关的、用于面向消息中间件的线级协议。Spring AMQP项目将Spring的核心概念应用到基于AMQP的消息传递解决方案的开发中。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括spring-boot-starter-amqp

Spring uses RabbitMQ to communicate through the AMQP protocol.

翻译:RabbitMQ是一个轻量级、可靠、可伸缩、可移植的消息代理,基于AMQP协议。Spring使用RabbitMQ通过AMQP协议进行通信。

因为,Spring Boot为通过RabbitMQ使用AMQP提供了一些便利。所以,与其说是Spring Boot整合RabbitMQ,即Spring Boot使用AMQP

配置

spring.rabbitmq.host=localhost
# 默认端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
# 或者
spring.rabbitmq.addresses=amqp://admin:secret@localhost

# 开启生产者的消息确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消费者的消息确认
spring.rabbitmq.publisher-returns=true

编码

使用直连交换机

生产者:配置直连交换机,队列和连接

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @projectName:MQ_RabbitMQ_Provider
 * @see:zhj.pro.mq.config
 * @author:末_枭
 * @createTime:2020/9/14 18:40
 * @version:1.0
 */
@Configuration
public class DirectRabbitMqConfig {


    /**
     * description 配置交换机
     * param durable:持久性 autoDelete:自动删除
     * return org.springframework.amqp.core.DirectExchange
     * createTime 2020/9/14 18:45
     **/
    @Bean
    DirectExchange myDirectExchange(){
        // name:交换机的名称
        // durable:交换机有持久(durable)和暂存(transient)两个状态。
        // autoDelete:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它。默认是true
        return new DirectExchange("myDirectExchange",true,false);
    }

    /**
     * description 配置队列
     * param durable,autoDeletet,exclusive
     * return org.springframework.amqp.core.Queue
     * createTime 2020/9/14 18:54
     **/
    @Bean
    Queue myDirectQueue(){
        //durable,autoDeletet:
        //exclusive:只被一个连接使用,当连接关闭后,队列即将被删除
        return new Queue("myDirectQueue",true,false,false);
    }

    /**
     * description 绑定 同时配置路由键
     * param []
     * return org.springframework.amqp.core.Binding
     * createTime 2020/9/14 19:03
     **/
    @Bean
    Binding myDirectBinding(){
        // 队列 -> 交换机 -> 路由键
        return BindingBuilder.bind(myDirectQueue()).to(myDirectExchange()).with("myDirectRoutingKey");
    }

}
生产者:编写生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Instant;
import java.util.UUID;

/**
 * @description:
 * @projectName:MQ_RabbitMQ_Provider
 * @see:zhj.pro.controller
 * @author:末_枭
 * @createTime:2020/9/14 19:04
 * @version:1.0
 */
@RestController
public class WebController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("directSendMsg")
    public String directSendMsg(){
        String message = "发送成功,时间为:"+Instant.now().toString();
        //为什么不绑定队列,因为交换机+routingkey 就等于匹配队列
        rabbitTemplate.convertAndSend("myDirectExchange","myDirectRoutingKey",message);
        return "ok,发送成功";
    }
}

消费者:编写接收方法
@Component
@RabbitListener(queues = "myDirectQueue")
public class DirectReceiver {
    
    @RabbitHandler
    public void process(String msg){
        System.out.println("消费者收到消息:"+msg);
    }
    
}
启动
  • 启动生产者MQ_RabbitMQ_Provider,访问http://localhost:8083/directSendMsg,可以看到发送成功

    但是,由于没有启动消费者,所以消息就存储在消息队列

  • 启动消费者MQ_RabbitMQ_Consumer,查看控制台可以看到以下内容

    消费者收到消息:发送成功,时间为:2020-09-14T11:30:57.597Z

    消费者从消息队列中取出消息并消费

  • 再次访问 http://localhost:8083/directSendMsg

  • 查看消费者MQ_RabbitMQ_Consumer,查看控制台可以看到以下内容

    消费者收到消息:发送成功,时间为:2020-09-14T11:30:57.597Z
    消费者收到消息:发送成功,时间为:2020-09-14T11:33:38.936Z

之后的描述 全部使用生产者和消费者表示

验证:当配置多个消费者到1个队列会怎么样?

在消费者MQ_RabbitMQ_Consumer原来的基础上再创建一个接收方法,并修改之前的接收者,添加标识

// 第1个消费者
@Component
@RabbitListener(queues = "myDirectQueue")
public class DirectReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("第1个消费者收到消息:"+msg);
    }
}
// 第2个消费者
@Component
@RabbitListener(queues = "myDirectQueue")
public class NewDirectReceiver {
    @RabbitHandler
    public void process(String msg){
        System.out.println("第2个消费者收到消息:"+msg);
    }
}

结果

第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:44.409Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:48.927Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:49.745Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:50.477Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:51.227Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:51.900Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:52.619Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:53.421Z

结论:可以看到发生了循环分发

验证:当直连交换机绑定多个队列会怎么样?

使用主题交换机

由于流程一样,不再详细描述

生产者
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @projectName:MQ_RabbitMQ_Provider
 * @see:zhj.pro.mq.config
 * @author:末_枭
 * @createTime:2020/9/14 19:52
 * @version:1.0
 */
@Configuration
public class TopicRabbitMqConfig {

    private final  static String student = "people.student";
    private  final  static  String teacher = "people.teacher";


    @Bean
    TopicExchange myTopicExchange(){
        return new TopicExchange("myTopicExchange",true,false);
    }

    // 创建3个队列
    // 队列名分别是 people.student,people.teacher,people
    @Bean
    Queue myQueueOfStudent(){
        return new Queue(TopicRabbitMqConfig.student);
    }

    @Bean
    Queue myQueueOfTeacher(){
        return new Queue(TopicRabbitMqConfig.teacher);
    }

    @Bean
    Queue myQueueOfPeople(){
        return new Queue("people");
    }
    //将主题交换机与队列进行绑定
    // 其中myBindingofStudent只能收到路由键为routing key==people.student的消息
    // myBindingofteacher只能收到路由键为routing key == people.teacher的消息
    //而 myBingingofPeople由于routing key == people.# 所以能够收到以上队列的所有消息
    @Bean
    Binding myBindingofStudent(){
        return BindingBuilder.bind(myQueueOfStudent()).to(myTopicExchange()).with(TopicRabbitMqConfig.student);
    }

    @Bean
    Binding myBindingOfTeacher(){
        return BindingBuilder.bind(myQueueOfTeacher()).to(myTopicExchange()).with(TopicRabbitMqConfig.teacher);
    }
    @Bean
    Binding myBindingPeople(){
        return BindingBuilder.bind(myQueueOfPeople()).to(myTopicExchange()).with("people.#");
    }
 
}

Route

    @GetMapping("topicSendMsgtoStudent")
    public String topicSendMsgToStudent(){
        String message = "给学生的消息发送成功,时间为:"+Instant.now().toString();
        rabbitTemplate.convertAndSend("myTopicExchange","people.student",message);
        return "ok,发送成功";
    }

    @GetMapping("topicSendMsgToTeacher")
    public String topicSendMsgToTeacher(){
        String messgae = "给全部人的消息发送成功,时间为:"+Instant.now().toString();
        rabbitTemplate.convertAndSend("myTopicExchange","people.teacher",messgae);
        return "ok,发送成功";
    }
消费者
//3个接收者
//学生
@Component
@RabbitListener(queues = "people.student")
public class StudentTopicReceiver {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("学生接收到消息:"+msg);
    }
}
//老师
@Component
@RabbitListener(queues = "people.teacher")
public class TeacherTopicReceiver {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("老师接收到消息,内容为:"+msg);
    }
}
//所有人
@Component
@RabbitListener(queues = "people")
public class PeopleTopicReceiver {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("管理员收到所有信息:"+msg);
    }
}
启动
  • 启动生产者,消费者

  • 访问 http://localhost:8083/topicSendMsgToTeacher

  • 消费者处的控制台输出以下内容:

    管理员收到所有信息:给学生的消息发送成功,时间为:2020-09-14T12:33:21.717Z
    学生接收到消息:给学生的消息发送成功,时间为:2020-09-14T12:33:21.717Z

  • 访问 http://localhost:8083/topicSendMsgToStudent

  • 消费者处的控制台输出以下内容:

    管理员收到所有信息:给全部人的消息发送成功,时间为:2020-09-14T12:33:58.715Z
    老师接收到消息,内容为:给全部人的消息发送成功,时间为:2020-09-14T12:33:58.715Z

验证

使用扇形交换机

生产者
@Configuration
public class FanoutRabbitMqConfig {


    //创建1个扇形交换机
    @Bean
    FanoutExchange myFanoutExchange(){
        return new FanoutExchange("myFanoutExchange",true,false);
    }

    //创建3个队列
    @Bean
    Queue myFanoutQueueA(){
        return new Queue("myFanoutQueueA");
    }

    @Bean
    Queue myFanoutQueueB(){
        return new Queue("myFanoutQueueB");
    }

    @Bean
    Queue myFanoutQueueC(){
        return new Queue("myFanoutQueueC");
    }

    //创建 3个绑定
    @Bean
    Binding bindingOfQueueA(){
        return BindingBuilder.bind(myFanoutQueueA()).to(myFanoutExchange());
    }

    @Bean
    Binding bindingOfQueueB(){
        return BindingBuilder.bind(myFanoutQueueB()).to(myFanoutExchange());
    }

    @Bean
    Binding bindingOfQueueC(){
        return BindingBuilder.bind(myFanoutQueueC()).to(myFanoutExchange());
    }
}

根据上一篇知道,扇形交换机有个特点——不需要路由键,就会将消息发送到与扇形交换机绑定的队列中。

上面的代码就是很好的证明

//其它的交换机绑定都是以下格式
return BindingBuilder.bind(/*队列*/).to(/*交换机*/).with(/*路由键*/);
//而扇形交换机是以下格式
return BindingBuilder.bind(myFanoutQueueC()).to(myFanoutExchange());

验证成功

    @GetMapping("fanoutSendMsg")
    public String fanoutSendMsg(){
        String messgae = "消息发送成功,时间为:"+Instant.now().toString();
        rabbitTemplate.convertAndSend("myFanoutExchange",null,messgae);
        return "ok,发送成功";
    }
消费者
//3个接收者
@Component
@RabbitListener(queues = "myFanoutQueueA")
public class FanoutReceiverA {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("队列myFanoutQueueA收到消息:"+msg);
    }
}

@Component
@RabbitListener(queues = "myFanoutQueueB")
public class FanoutReceiverB {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("队列myFanoutQueueB收到消息:"+msg);
    }
}

@Component
@RabbitListener(queues = "myFanoutQueueC")
public class FanoutRecevierC {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("队列myFanoutQueueC收到消息:"+msg);
    }

}
启动
  • 启动生产者,启动消费者

  • 访问路由

    http://localhost:8083/fanoutSendMsg

  • 查看消费者控制台输出

    队列myFanoutQueueC收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z

    队列myFanoutQueueB收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z
    队列myFanoutQueueA收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z

生产者的消息确认

上一篇 生产者中的动作:消息确认 已描述

配置
# 在配置文件中开启生产者的消息确认
spring.rabbitmq.publisher-confirm-type=correlated
重写生产者消息确认回调函数
@Configuration
public class RabbitConfig {

    //ConfirmCallback//重写confirm方法
    public RabbitTemplate.ConfirmCallback myconfirmCallback(){
        return new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback");
                System.out.println("相关数据:"+correlationData);
                System.out.println("确认情况:"+ack);
                System.out.println("原因:"+cause);
            }
        };
    }

    //ReturnCallback//重写returnedMessage
    public RabbitTemplate.ReturnCallback myReturnCallback(){
        return new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback");
                System.out.println("消息:"+message);
                System.out.println("回应码:"+replyCode);
                System.out.println("回应信息:"+replyText);
                System.out.println("交换机:"+exchange);
                System.out.println("路由键:"+routingKey);
            }
        };
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMandatory(true);
        // 设置 confirmCallback
        rabbitTemplate.setConfirmCallback(myconfirmCallback());
        // 设置 returnCallback
        rabbitTemplate.setReturnCallback(myReturnCallback());
        return rabbitTemplate;
    }
}

通过上面的内容,可以看到重写了RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnCallback两个方法,增加了打印内容。并将这两个配置设置进去。

情况1:交换机找不到

在生产者处增加一个路由,并将消息发送到一个不存在的交换机no_exists_exchange

    //测试 不存在的交换机
    @GetMapping("testProviderAck")
    public String testProviderAck(){
        String msg = "发送成功,时间为:"+ Instant.now().toString();
        rabbitTemplate.convertAndSend("no_exists_exchange","myDirectRoutingKey",msg);
        return "ok,发送成功";
    }

输出

ConfirmCallback
相关数据:null
确认情况:false
原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/', class-id=60, method-id=40)

分析

触发了ConfirmCallback

可以看到里面已经说好了NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/'


情况2:交换机找到了,队列找不到

创建一个交换机,不绑定任何队列

    @Bean
    DirectExchange directExchangeNoBindQueue(){
        return new DirectExchange("directExchangeNoBindQueue",true,false);
    }

创建一个路由,将消息发送到该交换机中

    //测试 不存在的队列
    @GetMapping("testProviderAckNoQueue")
    public String testProviderAckNoQueue(){
        String msg = "发送成功,时间为:"+Instant.now().toString();
        rabbitTemplate.convertAndSend("directExchangeNoBindQueue","myDirectRoutingKey",msg);
        return "ok,发送成功";
    }

输出

ReturnCallback
消息:(Body:'发送成功,时间为:2020-09-14T15:48:19.555Z' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
回应码:312
回应信息:NO_ROUTE
交换机:directExchangeNoBindQueue
    
路由键:myDirectRoutingKey
ConfirmCallback
相关数据:null
确认情况:true
原因:null

分析

触发了ReturnCallback和ConfirmCallback,ConfirmCallback中,成功,未发现异常;

ReturnCallback中,其中回应信息为NO_ROUTE表示没有匹配的路由。


情况3:交换机和队列都找不到

创建一个路由即可

    //测试 不存在的队列找不到交换机
    @GetMapping("testProviderAckNoQueueAndNoRKey")
    public String testProviderAckNoQueueAndNoRKey(){
        String msg = "发送成功,时间为:"+Instant.now().toString();
        rabbitTemplate.convertAndSend("no_exists_exchange","no-exists-routingkey",msg);
        return "ok,发送成功";
    }

输出

ConfirmCallback
相关数据:null
确认情况:false
原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/', class-id=60, method-id=40)

分析

触发了ConfirmCallbace,和情况1是相同的。因为消息在发送给交换机的时候,就找不到交换机了。


情况4:交换机和队列都找到

使用前面的直连交换机测试即可

输出

ConfirmCallback
相关数据:null
确认情况:true
原因:null

分析

触发了ConfirmCallback,可以看到没输出什么异常。


消费者的消息确认

配置
spring.rabbitmq.publisher-returns=true
回顾与分析

在上一篇中的消费者—动作:消息确认已经有描述,分别是2种:自动确认和手动确认。

而手动确认也有3种情况:

  • 收到消息后立即发送确认回执
  • 将未处理的消息存储后发送确认回执
  • 处理完成后发送确认回执

第3种情况消息被处理完成又有2种情况,即:

  • 消息没有被正确处理

    basic.nack basic.reject

    两者的区别在于reject只能拒绝单条消息

  • 消息被正确处理

    basic.ack

basic.reject拒绝单条消息

同上一篇所讲——拒绝消费当前消息,有2种情况,直接丢弃或者重新放入队列

channel.basicReject(deliveryTag,true);//true 对应重新放入队列 而false对应直接丢弃

正常的处理逻辑应该是——发生异常后,拒绝入列,再选择是否重新入列。

如果处理不当,会导致死循环,从而导致消息积压。

basic.nack设置不消费某条消息
channel.basicNack(deliveryTag,false,true);
// 第1个参数是当前消息的唯一Id
// 第2个参数指是否针对多条消息 true表示一次性针对当前通道的消息tagID小于当前消息的。
// 第3个参数同basic.reject
手动实现消息监听类

由于是消费者的手动消息监听,所以以下内容都是在消费者处进行

创建一个手动消息监听类
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("MyAckReceiver");
        try {
            String msg = message.toString();
            System.out.println(msg);
            System.out.println("消息的内容为:"+message.getMessageProperties());
            System.out.println("消息的内容为:"+message.getBody());
            System.out.println("消息来自队列:"+message.getMessageProperties().getConsumerQueue());
            channel.basicAck(deliverTag,true);
        } catch (Exception e){
            System.out.println("发生了异常");
            channel.basicReject(deliverTag,false);
            e.printStackTrace();
        }

    }
}
创建一个消息手动确认的配置
@Configuration
public class MessageListenerConfig {

    @Autowired
    CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    MyAckReceiver myAckReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(cachingConnectionFactory);
        // simpleMessageListenerContainer.setConcurrentConsumers(1);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        simpleMessageListenerContainer.setQueueNames("myDirectQueue");

        simpleMessageListenerContainer.setMessageListener(myAckReceiver);

        return simpleMessageListenerContainer;
    }

}
输出
MyAckReceiver
(Body:'发送成功,时间为:2020-09-15T03:03:09.679Z' MessageProperties [headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=myDirectRoutingKey, deliveryTag=1, consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw, consumerQueue=myDirectQueue])
消息的内容为:MessageProperties [headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=myDirectRoutingKey, deliveryTag=1, consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw, consumerQueue=myDirectQueue]
消息的内容为:[B@32fffa43
消息来自队列:myDirectQueue
分析

首先看输出的内容,由于对象序列化,但是结构没有变化,可以看到以下结构

MessageProperties [
    headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07}, 
    contentType=text/plain, 
    contentEncoding=UTF-8, 
    contentLength=0, 
    receivedDeliveryMode=PERSISTENT, 
    priority=0, 
    redelivered=false, 
    receivedExchange=myDirectExchange, 
    receivedRoutingKey=myDirectRoutingKey, 
    deliveryTag=1, 
    consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw, 
    consumerQueue=myDirectQueue
]
改进:监听多队列,并根据不同的队列进行不同的逻辑

有2个地方需要修改

// MessageListenerConfig 完成 监听多队列
simpleMessageListenerContainer.setQueueNames("myDirectQueue");
// 如果想要添加 添加队列people,则有
simpleMessageListenerContainer.setQueueNames("myDirectQueue","people");

// MyAckReceiver 完成 根据不同的队列进行不同的逻辑
// 根据上面的分析,字段consumerQueue=myDirectQueue可以知道消息从哪个队列而来
// 所以进行逻辑处理即可
            String queueName = message.getMessageProperties().getConsumerQueue();
            if("myDirectQueue".equals(queueName)){
                System.out.println("消息来自队列"+queueName);
                System.out.println("对来自队列myDirectQueue进行处理");
            }else if("people".equals(queueName)){
                System.out.println("消息来自队列"+queueName);
                System.out.println("对来自队列myDirectQueue进行处理");
            }

运行

访问http://localhost:8083/directSendMsg时,有以下输出:

MyAckReceiver
消息来自队列myDirectQueue
对来自队列myDirectQueue进行处理

访问http://localhost:8083/topicSendMsgtoStudent时,有以下输出:

MyAckReceiver
消息来自队列people
对来自队列myDirectQueue进行处理

问题

当消费者启动时,消息代理处没有队列时,启动会报错;

还原
  • 在生产者创建直连交换机testADirectExchange和队列testAQueue,并相互绑定

    @Configuration
    public class TestARabbitMqConfig {
    
        @Bean
        DirectExchange directExchange(){
            return new DirectExchange("testADirectExchange",true,false);
        }
    
        @Bean
        Queue testAQueue(){
            return new Queue("testAQueue",true,false,false);
        }
    
        @Bean
        Binding testABinding(){
            return BindingBuilder.bind(testAQueue()).to(directExchange()).with("testARoutingkey");
        }
    }
    
  • 启动生产者

  • 在消费者处创建一个监听器,监听队列testAQueue

  • 启动消费者

  • 报错

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[testAQueue]
	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:584) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:571) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1350) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.io.IOException: null
猜想:消息代理处未生成队列,所以启动消费者报错
原因:@RabbitListener指定的队列是已经存在且绑定了交换机的
解决
方案1:启动生产者,访问Route生成队列后,消息代理会产生Queue。
方案2:修改监听类
//原
@Component
@RabbitListener(queues = "testAQueue")
public class TestAReceiver {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("队列myFanoutQueueB收到消息:"+msg);
    }
}
//修改后
@Component
public class TestAReceiver {

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "testAQueue",durable = "true"),
            exchange =@Exchange(value = "testADirectExchange",durable = "true"),key = "testARoutingkey")})
    public void receiverMsg(String msg){
        System.out.println("队列testAQueue收到消息:"+msg);
    }
}
总结

注解@RabbitListener的@queues绑定的必须是存在的队列,且bindings和queues是不可以共同使用。

总结

开始理解Spring Boot的开箱即用,约定大于配置的特点,

开箱即用——Spring Boot的spring-boot-starter-amqp已经对AMQP进行了封装,引入依赖拿来使用即可;

约定大于配置——Spring Boot存在大量的注解,这些注解把开发需要进行使用或修改的配置的都实现了,大大方便开发的效率,但是有一些特定的逻辑是不能实现了。例如实现手动消息确认将一些内容打印出来,这些都需要对组件或函数进行重写,而注解的存在,只能配置交换机和队列等必须自定义的组件。


越来越觉得,学习一种中间件,往往不应该从实践开始的。

固然直接动手可以完成一些逻辑,但仅仅只是为了实现某个需求,一旦遇到问题,或者新的需求,就束手无策了。

同时对整个中间件是如何运行的一概不知。

Spring Boot整合RabbitMQ之前,对AMQP协议的模型以及各种组件等内容进行了学习,

所以在整合过程中,整个逻辑非常清晰明了。

想要什么到应该怎么做,再到实现的整个流程可以说是把一些学习到的内容都复习了一遍。

当然,RabbitMQ的内容不仅仅只有这些,后面还会进行补充。

 类似资料: