maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>priv.gitonline</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0</version>
<name>rabbitmq</name>
<description>Rabbitmq Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
application.yml配置
server:
port: 8081
spring:
application:
name: spring-boot-rabbitmq
rabbitmq:
addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
username: admin
password: admin
publisher-returns: true
publisher-confirm-type: correlated
virtual-host: /
listener:
type: simple
simple:
acknowledge-mode: auto #确认模式
prefetch: 1 #限制每次发送一条数据。
concurrency: 3 #同一个队列启动几个消费者
max-concurrency: 3 #启动消费者最大数量
#重试策略相关配置
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
RabbitMQ配置
一般队列配置
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
死信队列配置
/**
* 延迟队列
* 使用ttl和dlx实现
* Direct模式
*/
@Configuration
public class DelayQueueDirectConfig {
//-------------Direct模式--------------------------------
public final static String TTL_QUEUE = "ttl.queue.demo";
public final static String TTL_ROUTING_KEY = "ttl.queue.demo.key";
public final static String TTL_EXCHANGE = "ttl.queue.demo.exchange";
public final static String DLX_QUEUE = "dlx.queue.demo";
public final static String DLX_ROUTING_KEY = "dlx.queue.demo.key";
public final static String DLX_EXCHANGE = "dlx.queue.demo.exchange";
public final static int QUEUE_EXPIRATION = 3000;
@Bean
Queue ttlQueue() {
return QueueBuilder.durable(TTL_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // DLX
.withArgument("x-dead-letter-routing-key", DLX_ROUTING_KEY) // dead letter携带的routing key
.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
.build();
}
@Bean
Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE)
.build();
}
@Bean
DirectExchange ttlExchange() {
return new DirectExchange(TTL_EXCHANGE);
}
@Bean
DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE);
}
@Bean
Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlxQueue)
.to(dlxExchange)
.with(DLX_ROUTING_KEY);
}
@Bean
Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange) {
return BindingBuilder.bind(ttlQueue)
.to(ttlExchange)
.with(TTL_ROUTING_KEY);
}
}
消息发送
@Controller
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
@ResponseBody
public void send() {
//convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容)
User user = new User();
user.setId(UUID.randomUUID().toString().replaceAll("-","").toLowerCase());
user.setName("小明");
user.setDetail("6666666666");
rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting", user);
}
}
消费者
@Component
public class Receiver {
@RabbitListener(queues= "TestDirectQueue")
@RabbitHandler
public void process(@Payload User user, Channel channel, @Headers Map<String,Object> headers){
System.out.println(user.getDetail()+","+headers);
}
}