上一篇 RabbitMQ 已对RabbitMQ和AMQP的概念进行了学习
这一篇的目的主要是使用Spring Boot的spring-boot-starter-amqp整合RabbitMQ,以达到加深理解的目的。
同步异步(wx.request+ajax)
消息队列的学习与理解
消息队列之RabbitMQ的学习
RabbitMQ的应用之spring-boot-starter-amqp
2020.09.14-2020.09.15——完成初稿
Springboot 整合RabbitMq ,用心看完这一篇就够了
引入依赖->配置->编码
使用的依赖是spring-boot-starter-amqp
spring-boot-starter-amqp官方参考链接
maven Repository—spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Spring Boot整合RabbitMQ为什么会使用spring-boot-starter-amqp?
The Advanced Message Queuing Protocol (AMQP) is a platform-neutral, wire-level protocol for message-oriented middleware. The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. Spring Boot offers several conveniences for working with AMQP through RabbitMQ, including the
spring-boot-starter-amqp
“Starter”.翻译:高级消息队列协议(AMQP)是一种平台无关的、用于面向消息中间件的线级协议。Spring AMQP项目将Spring的核心概念应用到基于AMQP的消息传递解决方案的开发中。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括
spring-boot-starter-amqp
。Spring uses
RabbitMQ
to communicate through the AMQP protocol.翻译:RabbitMQ是一个轻量级、可靠、可伸缩、可移植的消息代理,基于AMQP协议。Spring使用RabbitMQ通过AMQP协议进行通信。
因为,Spring Boot为通过RabbitMQ使用AMQP提供了一些便利。所以,与其说是Spring Boot整合RabbitMQ,即Spring Boot使用AMQP
spring.rabbitmq.host=localhost
# 默认端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
# 或者
spring.rabbitmq.addresses=amqp://admin:secret@localhost
# 开启生产者的消息确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消费者的消息确认
spring.rabbitmq.publisher-returns=true
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @projectName:MQ_RabbitMQ_Provider
* @see:zhj.pro.mq.config
* @author:末_枭
* @createTime:2020/9/14 18:40
* @version:1.0
*/
@Configuration
public class DirectRabbitMqConfig {
/**
* description 配置交换机
* param durable:持久性 autoDelete:自动删除
* return org.springframework.amqp.core.DirectExchange
* createTime 2020/9/14 18:45
**/
@Bean
DirectExchange myDirectExchange(){
// name:交换机的名称
// durable:交换机有持久(durable)和暂存(transient)两个状态。
// autoDelete:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它。默认是true
return new DirectExchange("myDirectExchange",true,false);
}
/**
* description 配置队列
* param durable,autoDeletet,exclusive
* return org.springframework.amqp.core.Queue
* createTime 2020/9/14 18:54
**/
@Bean
Queue myDirectQueue(){
//durable,autoDeletet:
//exclusive:只被一个连接使用,当连接关闭后,队列即将被删除
return new Queue("myDirectQueue",true,false,false);
}
/**
* description 绑定 同时配置路由键
* param []
* return org.springframework.amqp.core.Binding
* createTime 2020/9/14 19:03
**/
@Bean
Binding myDirectBinding(){
// 队列 -> 交换机 -> 路由键
return BindingBuilder.bind(myDirectQueue()).to(myDirectExchange()).with("myDirectRoutingKey");
}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Instant;
import java.util.UUID;
/**
* @description:
* @projectName:MQ_RabbitMQ_Provider
* @see:zhj.pro.controller
* @author:末_枭
* @createTime:2020/9/14 19:04
* @version:1.0
*/
@RestController
public class WebController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("directSendMsg")
public String directSendMsg(){
String message = "发送成功,时间为:"+Instant.now().toString();
//为什么不绑定队列,因为交换机+routingkey 就等于匹配队列
rabbitTemplate.convertAndSend("myDirectExchange","myDirectRoutingKey",message);
return "ok,发送成功";
}
}
@Component
@RabbitListener(queues = "myDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("消费者收到消息:"+msg);
}
}
启动生产者MQ_RabbitMQ_Provider,访问http://localhost:8083/directSendMsg
,可以看到发送成功
但是,由于没有启动消费者,所以消息就存储在消息队列中
启动消费者MQ_RabbitMQ_Consumer,查看控制台可以看到以下内容
消费者收到消息:发送成功,时间为:2020-09-14T11:30:57.597Z
消费者从消息队列中取出消息并消费
再次访问 http://localhost:8083/directSendMsg
查看消费者MQ_RabbitMQ_Consumer,查看控制台可以看到以下内容
消费者收到消息:发送成功,时间为:2020-09-14T11:30:57.597Z
消费者收到消息:发送成功,时间为:2020-09-14T11:33:38.936Z
之后的描述 全部使用生产者和消费者表示
在消费者MQ_RabbitMQ_Consumer原来的基础上再创建一个接收方法,并修改之前的接收者,添加标识
// 第1个消费者
@Component
@RabbitListener(queues = "myDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("第1个消费者收到消息:"+msg);
}
}
// 第2个消费者
@Component
@RabbitListener(queues = "myDirectQueue")
public class NewDirectReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("第2个消费者收到消息:"+msg);
}
}
结果
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:44.409Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:48.927Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:49.745Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:50.477Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:51.227Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:51.900Z
第1个消费者收到消息:发送成功,时间为:2020-09-15T13:20:52.619Z
第2个消费者收到消息:发送成功,时间为:2020-09-15T13:20:53.421Z
结论:可以看到发生了循环分发
由于流程一样,不再详细描述
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @projectName:MQ_RabbitMQ_Provider
* @see:zhj.pro.mq.config
* @author:末_枭
* @createTime:2020/9/14 19:52
* @version:1.0
*/
@Configuration
public class TopicRabbitMqConfig {
private final static String student = "people.student";
private final static String teacher = "people.teacher";
@Bean
TopicExchange myTopicExchange(){
return new TopicExchange("myTopicExchange",true,false);
}
// 创建3个队列
// 队列名分别是 people.student,people.teacher,people
@Bean
Queue myQueueOfStudent(){
return new Queue(TopicRabbitMqConfig.student);
}
@Bean
Queue myQueueOfTeacher(){
return new Queue(TopicRabbitMqConfig.teacher);
}
@Bean
Queue myQueueOfPeople(){
return new Queue("people");
}
//将主题交换机与队列进行绑定
// 其中myBindingofStudent只能收到路由键为routing key==people.student的消息
// myBindingofteacher只能收到路由键为routing key == people.teacher的消息
//而 myBingingofPeople由于routing key == people.# 所以能够收到以上队列的所有消息
@Bean
Binding myBindingofStudent(){
return BindingBuilder.bind(myQueueOfStudent()).to(myTopicExchange()).with(TopicRabbitMqConfig.student);
}
@Bean
Binding myBindingOfTeacher(){
return BindingBuilder.bind(myQueueOfTeacher()).to(myTopicExchange()).with(TopicRabbitMqConfig.teacher);
}
@Bean
Binding myBindingPeople(){
return BindingBuilder.bind(myQueueOfPeople()).to(myTopicExchange()).with("people.#");
}
}
Route
@GetMapping("topicSendMsgtoStudent")
public String topicSendMsgToStudent(){
String message = "给学生的消息发送成功,时间为:"+Instant.now().toString();
rabbitTemplate.convertAndSend("myTopicExchange","people.student",message);
return "ok,发送成功";
}
@GetMapping("topicSendMsgToTeacher")
public String topicSendMsgToTeacher(){
String messgae = "给全部人的消息发送成功,时间为:"+Instant.now().toString();
rabbitTemplate.convertAndSend("myTopicExchange","people.teacher",messgae);
return "ok,发送成功";
}
//3个接收者
//学生
@Component
@RabbitListener(queues = "people.student")
public class StudentTopicReceiver {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("学生接收到消息:"+msg);
}
}
//老师
@Component
@RabbitListener(queues = "people.teacher")
public class TeacherTopicReceiver {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("老师接收到消息,内容为:"+msg);
}
}
//所有人
@Component
@RabbitListener(queues = "people")
public class PeopleTopicReceiver {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("管理员收到所有信息:"+msg);
}
}
启动生产者,消费者
访问 http://localhost:8083/topicSendMsgToTeacher
消费者处的控制台输出以下内容:
管理员收到所有信息:给学生的消息发送成功,时间为:2020-09-14T12:33:21.717Z
学生接收到消息:给学生的消息发送成功,时间为:2020-09-14T12:33:21.717Z
访问 http://localhost:8083/topicSendMsgToStudent
消费者处的控制台输出以下内容:
管理员收到所有信息:给全部人的消息发送成功,时间为:2020-09-14T12:33:58.715Z
老师接收到消息,内容为:给全部人的消息发送成功,时间为:2020-09-14T12:33:58.715Z
@Configuration
public class FanoutRabbitMqConfig {
//创建1个扇形交换机
@Bean
FanoutExchange myFanoutExchange(){
return new FanoutExchange("myFanoutExchange",true,false);
}
//创建3个队列
@Bean
Queue myFanoutQueueA(){
return new Queue("myFanoutQueueA");
}
@Bean
Queue myFanoutQueueB(){
return new Queue("myFanoutQueueB");
}
@Bean
Queue myFanoutQueueC(){
return new Queue("myFanoutQueueC");
}
//创建 3个绑定
@Bean
Binding bindingOfQueueA(){
return BindingBuilder.bind(myFanoutQueueA()).to(myFanoutExchange());
}
@Bean
Binding bindingOfQueueB(){
return BindingBuilder.bind(myFanoutQueueB()).to(myFanoutExchange());
}
@Bean
Binding bindingOfQueueC(){
return BindingBuilder.bind(myFanoutQueueC()).to(myFanoutExchange());
}
}
根据上一篇知道,扇形交换机有个特点——不需要路由键,就会将消息发送到与扇形交换机绑定的队列中。
上面的代码就是很好的证明
//其它的交换机绑定都是以下格式 return BindingBuilder.bind(/*队列*/).to(/*交换机*/).with(/*路由键*/); //而扇形交换机是以下格式 return BindingBuilder.bind(myFanoutQueueC()).to(myFanoutExchange());
验证成功
@GetMapping("fanoutSendMsg")
public String fanoutSendMsg(){
String messgae = "消息发送成功,时间为:"+Instant.now().toString();
rabbitTemplate.convertAndSend("myFanoutExchange",null,messgae);
return "ok,发送成功";
}
//3个接收者
@Component
@RabbitListener(queues = "myFanoutQueueA")
public class FanoutReceiverA {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("队列myFanoutQueueA收到消息:"+msg);
}
}
@Component
@RabbitListener(queues = "myFanoutQueueB")
public class FanoutReceiverB {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("队列myFanoutQueueB收到消息:"+msg);
}
}
@Component
@RabbitListener(queues = "myFanoutQueueC")
public class FanoutRecevierC {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("队列myFanoutQueueC收到消息:"+msg);
}
}
启动生产者,启动消费者
访问路由
http://localhost:8083/fanoutSendMsg
查看消费者控制台输出
队列myFanoutQueueC收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z
队列myFanoutQueueB收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z
队列myFanoutQueueA收到消息:消息发送成功,时间为:2020-09-14T13:14:46.930Z
上一篇 生产者中的动作:消息确认 已描述
# 在配置文件中开启生产者的消息确认
spring.rabbitmq.publisher-confirm-type=correlated
@Configuration
public class RabbitConfig {
//ConfirmCallback//重写confirm方法
public RabbitTemplate.ConfirmCallback myconfirmCallback(){
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback");
System.out.println("相关数据:"+correlationData);
System.out.println("确认情况:"+ack);
System.out.println("原因:"+cause);
}
};
}
//ReturnCallback//重写returnedMessage
public RabbitTemplate.ReturnCallback myReturnCallback(){
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback");
System.out.println("消息:"+message);
System.out.println("回应码:"+replyCode);
System.out.println("回应信息:"+replyText);
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
}
};
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
// 设置 confirmCallback
rabbitTemplate.setConfirmCallback(myconfirmCallback());
// 设置 returnCallback
rabbitTemplate.setReturnCallback(myReturnCallback());
return rabbitTemplate;
}
}
通过上面的内容,可以看到重写了RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback两个方法,增加了打印内容。并将这两个配置设置进去。
在生产者处增加一个路由,并将消息发送到一个不存在的交换机no_exists_exchange
//测试 不存在的交换机
@GetMapping("testProviderAck")
public String testProviderAck(){
String msg = "发送成功,时间为:"+ Instant.now().toString();
rabbitTemplate.convertAndSend("no_exists_exchange","myDirectRoutingKey",msg);
return "ok,发送成功";
}
输出
ConfirmCallback
相关数据:null
确认情况:false
原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/', class-id=60, method-id=40)
分析
触发了ConfirmCallback
可以看到里面已经说好了NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/'
创建一个交换机,不绑定任何队列
@Bean
DirectExchange directExchangeNoBindQueue(){
return new DirectExchange("directExchangeNoBindQueue",true,false);
}
创建一个路由,将消息发送到该交换机中
//测试 不存在的队列
@GetMapping("testProviderAckNoQueue")
public String testProviderAckNoQueue(){
String msg = "发送成功,时间为:"+Instant.now().toString();
rabbitTemplate.convertAndSend("directExchangeNoBindQueue","myDirectRoutingKey",msg);
return "ok,发送成功";
}
输出
ReturnCallback
消息:(Body:'发送成功,时间为:2020-09-14T15:48:19.555Z' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
回应码:312
回应信息:NO_ROUTE
交换机:directExchangeNoBindQueue
路由键:myDirectRoutingKey
ConfirmCallback
相关数据:null
确认情况:true
原因:null
分析
触发了ReturnCallback和ConfirmCallback,ConfirmCallback中,成功,未发现异常;
ReturnCallback中,其中回应信息为NO_ROUTE表示没有匹配的路由。
创建一个路由即可
//测试 不存在的队列找不到交换机
@GetMapping("testProviderAckNoQueueAndNoRKey")
public String testProviderAckNoQueueAndNoRKey(){
String msg = "发送成功,时间为:"+Instant.now().toString();
rabbitTemplate.convertAndSend("no_exists_exchange","no-exists-routingkey",msg);
return "ok,发送成功";
}
输出
ConfirmCallback
相关数据:null
确认情况:false
原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no_exists_exchange' in vhost '/', class-id=60, method-id=40)
分析
触发了ConfirmCallbace,和情况1是相同的。因为消息在发送给交换机的时候,就找不到交换机了。
使用前面的直连交换机测试即可
输出
ConfirmCallback
相关数据:null
确认情况:true
原因:null
分析
触发了ConfirmCallback,可以看到没输出什么异常。
spring.rabbitmq.publisher-returns=true
在上一篇中的消费者—动作:消息确认已经有描述,分别是2种:自动确认和手动确认。
而手动确认也有3种情况:
第3种情况消息被处理完成又有2种情况,即:
消息没有被正确处理
basic.nack
basic.reject
两者的区别在于reject只能拒绝单条消息
消息被正确处理
basic.ack
同上一篇所讲——拒绝消费当前消息,有2种情况,直接丢弃或者重新放入队列
channel.basicReject(deliveryTag,true);//true 对应重新放入队列 而false对应直接丢弃
正常的处理逻辑应该是——发生异常后,拒绝入列,再选择是否重新入列。
如果处理不当,会导致死循环,从而导致消息积压。
channel.basicNack(deliveryTag,false,true);
// 第1个参数是当前消息的唯一Id
// 第2个参数指是否针对多条消息 true表示一次性针对当前通道的消息tagID小于当前消息的。
// 第3个参数同basic.reject
由于是消费者的手动消息监听,所以以下内容都是在消费者处进行
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliverTag = message.getMessageProperties().getDeliveryTag();
System.out.println("MyAckReceiver");
try {
String msg = message.toString();
System.out.println(msg);
System.out.println("消息的内容为:"+message.getMessageProperties());
System.out.println("消息的内容为:"+message.getBody());
System.out.println("消息来自队列:"+message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliverTag,true);
} catch (Exception e){
System.out.println("发生了异常");
channel.basicReject(deliverTag,false);
e.printStackTrace();
}
}
}
@Configuration
public class MessageListenerConfig {
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Autowired
MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(){
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(cachingConnectionFactory);
// simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setQueueNames("myDirectQueue");
simpleMessageListenerContainer.setMessageListener(myAckReceiver);
return simpleMessageListenerContainer;
}
}
MyAckReceiver
(Body:'发送成功,时间为:2020-09-15T03:03:09.679Z' MessageProperties [headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=myDirectRoutingKey, deliveryTag=1, consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw, consumerQueue=myDirectQueue])
消息的内容为:MessageProperties [headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=myDirectRoutingKey, deliveryTag=1, consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw, consumerQueue=myDirectQueue]
消息的内容为:[B@32fffa43
消息来自队列:myDirectQueue
首先看输出的内容,由于对象序列化,但是结构没有变化,可以看到以下结构
MessageProperties [
headers={spring_listener_return_correlation=bea69d52-a12d-48b4-a2a5-ff188b25aa07},
contentType=text/plain,
contentEncoding=UTF-8,
contentLength=0,
receivedDeliveryMode=PERSISTENT,
priority=0,
redelivered=false,
receivedExchange=myDirectExchange,
receivedRoutingKey=myDirectRoutingKey,
deliveryTag=1,
consumerTag=amq.ctag-9QciU0w71GFfFHUtwbZgqw,
consumerQueue=myDirectQueue
]
有2个地方需要修改
// MessageListenerConfig 完成 监听多队列
simpleMessageListenerContainer.setQueueNames("myDirectQueue");
// 如果想要添加 添加队列people,则有
simpleMessageListenerContainer.setQueueNames("myDirectQueue","people");
// MyAckReceiver 完成 根据不同的队列进行不同的逻辑
// 根据上面的分析,字段consumerQueue=myDirectQueue可以知道消息从哪个队列而来
// 所以进行逻辑处理即可
String queueName = message.getMessageProperties().getConsumerQueue();
if("myDirectQueue".equals(queueName)){
System.out.println("消息来自队列"+queueName);
System.out.println("对来自队列myDirectQueue进行处理");
}else if("people".equals(queueName)){
System.out.println("消息来自队列"+queueName);
System.out.println("对来自队列myDirectQueue进行处理");
}
运行
访问http://localhost:8083/directSendMsg
时,有以下输出:
MyAckReceiver
消息来自队列myDirectQueue
对来自队列myDirectQueue进行处理
访问http://localhost:8083/topicSendMsgtoStudent
时,有以下输出:
MyAckReceiver
消息来自队列people
对来自队列myDirectQueue进行处理
在生产者创建直连交换机testADirectExchange
和队列testAQueue
,并相互绑定
@Configuration
public class TestARabbitMqConfig {
@Bean
DirectExchange directExchange(){
return new DirectExchange("testADirectExchange",true,false);
}
@Bean
Queue testAQueue(){
return new Queue("testAQueue",true,false,false);
}
@Bean
Binding testABinding(){
return BindingBuilder.bind(testAQueue()).to(directExchange()).with("testARoutingkey");
}
}
启动生产者
在消费者处创建一个监听器,监听队列testAQueue
启动消费者
报错
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[testAQueue]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:584) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:571) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1350) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.io.IOException: null
//原
@Component
@RabbitListener(queues = "testAQueue")
public class TestAReceiver {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("队列myFanoutQueueB收到消息:"+msg);
}
}
//修改后
@Component
public class TestAReceiver {
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "testAQueue",durable = "true"),
exchange =@Exchange(value = "testADirectExchange",durable = "true"),key = "testARoutingkey")})
public void receiverMsg(String msg){
System.out.println("队列testAQueue收到消息:"+msg);
}
}
注解@RabbitListener的@queues绑定的必须是存在的队列,且bindings和queues是不可以共同使用。
开始理解Spring Boot的开箱即用,约定大于配置的特点,
开箱即用——Spring Boot的spring-boot-starter-amqp
已经对AMQP进行了封装,引入依赖拿来使用即可;
约定大于配置——Spring Boot存在大量的注解,这些注解把开发需要进行使用或修改的配置的都实现了,大大方便开发的效率,但是有一些特定的逻辑是不能实现了。例如实现手动消息确认将一些内容打印出来,这些都需要对组件或函数进行重写,而注解的存在,只能配置交换机和队列等必须自定义的组件。
越来越觉得,学习一种中间件,往往不应该从实践开始的。
固然直接动手可以完成一些逻辑,但仅仅只是为了实现某个需求,一旦遇到问题,或者新的需求,就束手无策了。
同时对整个中间件是如何运行的一概不知。
Spring Boot整合RabbitMQ之前,对AMQP协议的模型以及各种组件等内容进行了学习,
所以在整合过程中,整个逻辑非常清晰明了。
想要什么到应该怎么做,再到实现的整个流程可以说是把一些学习到的内容都复习了一遍。
当然,RabbitMQ的内容不仅仅只有这些,后面还会进行补充。