Springboot 集成 RabbitMQ (spring-boot-starter-amqp)

柯清野
2023-12-01

maven依赖

 	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>priv.gitonline</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>1.0</version>
    <name>rabbitmq</name>
    <description>Rabbitmq Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- rabbitmq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
		<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

application.yml配置

server:
  port: 8081

spring:
  application:
    name: spring-boot-rabbitmq

  rabbitmq:
    addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
    username: admin
    password: admin
    publisher-returns: true
    publisher-confirm-type: correlated
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto #确认模式
        prefetch: 1 #限制每次发送一条数据。
        concurrency: 3 #同一个队列启动几个消费者
        max-concurrency: 3 #启动消费者最大数量
        #重试策略相关配置
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

RabbitMQ配置

一般队列配置

@Configuration
public class DirectRabbitConfig {
 
    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue",true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange");
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

}

死信队列配置

/**
 * 延迟队列
 * 使用ttl和dlx实现
 * Direct模式
 */
@Configuration
public class DelayQueueDirectConfig {

    //-------------Direct模式--------------------------------

    public final static String TTL_QUEUE = "ttl.queue.demo";

    public final static String TTL_ROUTING_KEY = "ttl.queue.demo.key";

    public final static String TTL_EXCHANGE = "ttl.queue.demo.exchange";

    public final static String DLX_QUEUE = "dlx.queue.demo";

    public final static String DLX_ROUTING_KEY = "dlx.queue.demo.key";

    public final static String DLX_EXCHANGE = "dlx.queue.demo.exchange";

    public final static int QUEUE_EXPIRATION = 3000;

    @Bean
    Queue ttlQueue() {
        return QueueBuilder.durable(TTL_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // DLX
                .withArgument("x-dead-letter-routing-key", DLX_ROUTING_KEY) // dead letter携带的routing key
                .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
                .build();
    }

    @Bean
    Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE)
                .build();
    }

    @Bean
    DirectExchange ttlExchange() {
        return new DirectExchange(TTL_EXCHANGE);
    }

    @Bean
    DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    @Bean
    Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue)
                .to(dlxExchange)
                .with(DLX_ROUTING_KEY);
    }

    @Bean
    Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange) {
        return BindingBuilder.bind(ttlQueue)
                .to(ttlExchange)
                .with(TTL_ROUTING_KEY);
    }

}

消息发送

@Controller
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    @ResponseBody
    public void send() {
        //convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容)
        User user = new User();
        user.setId(UUID.randomUUID().toString().replaceAll("-","").toLowerCase());
        user.setName("小明");
        user.setDetail("6666666666");
        rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting", user);
    }

}

消费者

@Component
public class Receiver {

    @RabbitListener(queues= "TestDirectQueue")
    @RabbitHandler
    public void process(@Payload User user, Channel channel, @Headers Map<String,Object> headers){
        System.out.println(user.getDetail()+","+headers);
    }
}
 类似资料: