[admin@host001 plugins]$ pwd
/usr/lib/rabbitmq/plugins
# 重命名
[admin@host001 plugins]$ sudo mv rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq_delayed_message_exchange.ez
[admin@host001 plugins]$ ll
total 36
-rw-r--r--. 1 root root 36358 Dec 26 08:52 rabbitmq_delayed_message_exchange.ez
# 开启
[admin@host001 plugins]$ sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@host001:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@host001...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
# 重启RabbitMq
[admin@host001 plugins]$ sudo service rabbitmq-server restart
Redirecting to /bin/systemctl restart rabbitmq-server.service
# 重新RabbitMq登录后台,新建交换机Type选项多了:x-delayed-message,表示插件开启成功
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
spring.rabbitmq.host=192.168.3.202
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
package cn.cnyasin.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL队列自动配置类
*/
@Configuration
public class DelayQueueConfig {
// 定义交换机名
public static final String EXCHANGE_NORMAL = "exchange_normal";
public static final String EXCHANGE_DEAD = "exchange_dead";
// 定义队列名
public static final String QUEUE_NORMAL_1 = "queue_normal_1";
public static final String QUEUE_NORMAL_2 = "queue_normal_2";
public static final String QUEUE_DEAD = "queue_dead";
// 定义路由key
public static final String ROUTING_NORMAL_1 = "routing_normal_1";
public static final String ROUTING_NORMAL_2 = "routing_normal_2";
public static final String ROUTING_DEAD = "routing_dead";
// 声明交换机
@Bean
public DirectExchange ExchangeNormal() {
return new DirectExchange(EXCHANGE_NORMAL);
}
@Bean
public DirectExchange ExchangeDead() {
return new DirectExchange(EXCHANGE_DEAD);
}
// 声明队列
@Bean
public Queue QueueNormal1() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
arguments.put("x-message-ttl", 10000);
return new Queue(QUEUE_NORMAL_1, true, false, false, arguments);
}
@Bean
public Queue QueueNormal2() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
arguments.put("x-message-ttl", 30000);
return new Queue(QUEUE_NORMAL_2, true, false, false, arguments);
}
@Bean
public Queue QueueDead() {
return new Queue(QUEUE_DEAD, true, false, false, null);
}
// 绑定队列、交换机、路由key
@Bean
public Binding QueueNormal1BindingExchangeNormal(
@Qualifier("QueueNormal1") Queue queueNormal1,
@Qualifier("ExchangeNormal") Exchange exchangeNormal
) {
return BindingBuilder.bind(queueNormal1).to(exchangeNormal).with(ROUTING_NORMAL_1).noargs();
}
@Bean
public Binding QueueNormal2BindingExchangeNormal(
@Qualifier("QueueNormal2") Queue queueNormal2,
@Qualifier("ExchangeNormal") Exchange exchangeNormal
) {
return BindingBuilder.bind(queueNormal2).to(exchangeNormal).with(ROUTING_NORMAL_2).noargs();
}
@Bean
public Binding QueueDeadBindingExchangeDead(
@Qualifier("QueueDead") Queue queueDead,
@Qualifier("ExchangeDead") Exchange exchangeDead
) {
return BindingBuilder.bind(queueDead).to(exchangeDead).with(ROUTING_DEAD).noargs();
}
}
package cn.cnyasin.rabbit.consumer;
import cn.cnyasin.rabbit.config.DelayQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 延迟队列消费者组件
*/
@Slf4j
@Component
public class DelayQueueConsumer {
/**
* 队列TTL-消费者
*
* @param data
*/
@RabbitListener(queues = DelayQueueConfig.QUEUE_DEAD)
public void ddlQueueConsumer(String data) {
log.info("[*] [{}] 死信队列接收到消息:{}", new Date().toString(), data);
}
}
package cn.cnyasin.rabbit.controller;
import cn.cnyasin.rabbit.config.DelayQueueConfig;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 延迟队列控制器
*/
@Slf4j
@RestController
@RequestMapping("/delay/queue")
public class DelayQueueController {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 队列TTL-生产者
*
* @param msg
* @return
*/
@RequestMapping("/ttl/queue/producer/{msg}")
public String TtlQueueProducer(@PathVariable String msg) {
log.info("[*] 准备发送消息:{}", msg);
rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_1, msg);
rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_2, msg);
return JSON.toJSONString("消息发送成功");
}
}
// 定义队列名
public static final String QUEUE_NORMAL_3 = "queue_normal_3";
// 定义路由key
public static final String ROUTING_NORMAL_3 = "routing_normal_3";
@Bean
public Queue QueueNormal3() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
return new Queue(QUEUE_NORMAL_3, true, false, false, arguments);
}
@Bean
public Binding QueueNormal3BindingExchangeNormal(
@Qualifier("QueueNormal3") Queue queueNormal3,
@Qualifier("ExchangeNormal") Exchange exchangeNormal
) {
return BindingBuilder.bind(queueNormal3).to(exchangeNormal).with(ROUTING_NORMAL_3).noargs();
}
/**
* 消息TTL-生产者
*
* @param msg
* @return
*/
@RequestMapping("/ttl/message/producer/{msg}/{ttl}")
public String TtlMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception {
log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg);
Message message = MessageBuilder.withBody(msg.getBytes("UTF-8"))
.setExpiration(String.valueOf(ttl))
.build();
rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_3, message);
return JSON.toJSONString("消息发送成功");
}
// 定义交换机名
public static final String EXCHANGE_DELAY = "exchange_delay";
// 定义队列名
public static final String QUEUE_DELAY = "queue_delay";
// 定义路由key
public static final String ROUTING_DELAY = "routing_delay";
// 声明交换机
@Bean
public CustomExchange ExchangeDelay() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, arguments);
}
// 声明队列
@Bean
public Queue QueueDelay() {
return new Queue(QUEUE_DELAY, true, false, false, null);
}
// 绑定队列、交换机、路由key
@Bean
public Binding QueueDelayBindingExchangeDelay(
@Qualifier("QueueDelay") Queue queueDelay,
@Qualifier("ExchangeDelay") Exchange exchangeDelay
) {
return BindingBuilder.bind(queueDelay).to(exchangeDelay).with(ROUTING_DELAY).noargs();
}
/**
* delay队列-消费者
*
* @param data
*/
@RabbitListener(queues = DelayQueueConfig.QUEUE_DELAY)
public void delayQueueConsumer(String data) {
log.info("[*] [{}] 延迟队列接收到消息:{}", new Date().toString(), data);
}
/**
* delay消息-生产者
*
* @param msg
* @return
*/
@RequestMapping("/delay/message/producer/{msg}/{ttl}")
public String DelayMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception {
log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg);
Message message = MessageBuilder.withBody(msg.getBytes("UTF-8"))
.setHeader("x-delay", String.valueOf(ttl))
.build();
rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_DELAY, DelayQueueConfig.ROUTING_DELAY, message);
return JSON.toJSONString("消息发送成功");
}