因为集合了业务,所以Queue,Exchange都做了持久化处理
配置rmq Exchange发送成功回调,和找不到Queue错误回调
项目启动自动创建Exchange,Queue和绑定RoutingKey
ok 直接上图
<!-- MQ依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
// An highlighted block
# rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启发送确认
publisher-confirms: true
#确认消息已发送到队列(Queue)
publisher-returns: true
#开启手动确认
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
retry:
enabled: true #开启消费者 程序异常情况下会进行重试
max-attempts: 3 #重试次数
initial-interval: 2000 #消费者重试间隔次数 2s
为了项目启动自动创建 Queue Exchange 和绑定RoutingKey
统一写到枚举里,枚举类下面图
// An highlighted block
/**
* @title:
* @author: sya
* @date: 2022年07月29日 10:06
* @description:
*/
@Configuration
@Slf4j
public class RabbitConfig {
@Resource
RabbitAdmin rabbitAdmin;
@Bean
public void createExchangeQueue() {
/**
* 初始化枚举 自动创建
*/
for (RabbitmqInitEnum rabbitmqInitEnum : RabbitmqInitEnum.values()) {
DirectExchange directExchange = new DirectExchange(rabbitmqInitEnum.rabbitmqEntity.getExchange(), true, false);
Queue queue = new Queue(rabbitmqInitEnum.rabbitmqEntity.getQueue(), true, false, false);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareQueue(queue);
if (!StringUtil.isNullorEmpty(rabbitmqInitEnum.rabbitmqEntity.getRoutingKey())) {
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rabbitmqInitEnum.rabbitmqEntity.getRoutingKey()));
}
}
}
/**
* 创建初始化RabbitAdmin对象
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 初始化配置 发送回调和消费失败回调
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* ConfirmCallback机制只确认消息是否到达exchange(交换器),不保证消息可以路由到正确的queue;
* 需要设置:publisher-confirm-type: CORRELATED;
* springboot版本较低 参数设置改成:publisher-confirms: true
*
* 以实现方法confirm中ack属性为标准,true到达
* config : 需要开启rabbitmq得ack publisher-confirm-type
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback 确认结果 (true代表发送成功) : {} 消息唯一标识 : {} 失败原因 :{}", ack, correlationData, cause);
}
});
/**
* 路由不到发队列时触发,成功则不触;
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("路由不到发队列时触发 {}{}{}{}", message, replyCode, replyText, exchange, routingKey);
});
return rabbitTemplate;
}
}
需要创建的Exchange,Queue和绑定RoutingKey 就可以项目启动创建,无需手创
// An highlighted block
/**
* @title:
* @author: sya
* @date: 2022年07月29日 17:45
* @description: rabbit初始化 枚举
*/
public enum RabbitmqInitEnum {
/**
* 测试创建Rabbitmq DEMO
*/
TEST(
new RabbitmqEntity.Builder()
.setExchange("autoE")
.setQueue("autoQ")
.setRoutingKey("autoR")
.create()
);
public final RabbitmqEntity rabbitmqEntity;
RabbitmqInitEnum(RabbitmqEntity rabbitmqEntity) {
this.rabbitmqEntity = rabbitmqEntity;
}
}
// An highlighted block
/**
* @title:
* @author: sya
* @date: 2022年07月29日 17:56
* @description:
*/
@Data
public class RabbitmqEntity {
private String exchange;
private String queue;
private String routingKey;
private RabbitmqEntity(Builder builder) {
exchange = builder.exchange;
queue = builder.queue;
routingKey = builder.routingKey;
}
public static class Builder {
private String exchange;
private String queue;
private String routingKey;
public Builder setExchange(String exchange) {
this.exchange = exchange;
return this;
}
public Builder setQueue(String queue) {
this.queue = queue;
return this;
}
public Builder setRoutingKey(String routingKey) {
this.routingKey = routingKey;
return this;
}
public RabbitmqEntity create() {
return new RabbitmqEntity(this);
}
}
}
// An highlighted block
/**
* @title:
* @author: sya
* @date: 2022年07月29日 16:39
* @description:
*/
@Component
public class RabbitmqUtil {
@Resource
private RabbitTemplate rabbitTemplate;
public RabbitmqUtil(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 发送信息
*
* @param sendData 发送内容
* @param exchange
* @param routingKey
*/
public void send(Object sendData, String exchange, String routingKey) {
CorrelationData correlationData = new CorrelationData();
RmqMessageVO rmqMessageVO = new RmqMessageVO();
rmqMessageVO.setData(sendData);
rmqMessageVO.setTimestamp(DateTimeUI.getCurrentDateTimeLong());
Message message = MessageBuilder.withBody(rmqMessageVO.toString().getBytes()).build();
message.getMessageProperties().setMessageId(RandomGUID.getDatetUUID());
correlationData.setId(RandomGUID.getDatetUUID());
correlationData.setReturnedMessage(message);
rabbitTemplate.convertAndSend(exchange, routingKey, sendData, correlationData);
}
}
对象接收需要 实体类 implements Serializable
// An highlighted block
/**
* @title:
* @author: sya
* @date: 2022年07月28日 15:47
* @description: 消费者
*/
@Component
public class Consumer {
/**
*
* 消息确认回复方法
* 重回队列(个人不建议容易死循环,可以直接主动抛出异常,利用rabbitmq重试机制重新消费)
*
* channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
* ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
*
* channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
* Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
*
* channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
* nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列
*
* 注意: 对象接收需要 实体类implements Serializable
*/
/**
* 消息手动回复
*
* @param msg 消息内容
* @param message
* @param channel
* @throws InterruptedException
*/
@RabbitListener(queues = "autoQ")
public void listenSimpleQueueMessage(JSONObject msg, Message message, Channel channel) throws InterruptedException, IOException {
System.out.println(message);
System.out.println("spring 消费者1接收到消息:【" + msg + "】");
//手动ack确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送对象的话,对象接收需要 实体类implements Serializable
// An highlighted block
/**
* @author Galen
* @Date 2021/12/9 0009 16:42
*/
@Slf4j
@Controller
@RequestMapping("/test")
public class TestController {
@Resource
private RabbitmqUtil rabbitmqUtil;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/test")
@ResponseBody
public void test() {
JSONObject jsonObject = new JSONObject();
JSONObject jsonObject1 = new JSONObject();
jsonObject.put("aaa","aa");
jsonObject.put("bbb",jsonObject1);
rabbitmqUtil.send(jsonObject, RabbitmqInitEnum.TEST.rabbitmqEntity.getExchange(), RabbitmqInitEnum.TEST.rabbitmqEntity.getRoutingKey());
}
}
一个在学习的开发者,勿喷,欢迎交流