当前位置: 首页 > 工具软件 > RabbitMQ-CN > 使用案例 >

RabbitMQ-Java-02-工作队列

马博学
2023-12-01

说明

  • RabbitMQ-Java-02-工作队列
  • 本案例是一个Maven项目
  • 假设你已经实现了上一节简单队列
  • 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/

核心概念

》原理

  • 执行资源密集型任务时往往有多个队列,每个队列有多个工作线程去处理
  • 注意:一个消息必须保证只能被处理一次

操作步骤

》搭建环境

  • idea创建一个空项目
  • 创建一个Maven管理的module
  • pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
    <!-- 指定JDK编译版本 -->
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>8</source>
            <target>8</target>
        </configuration>
    </plugin>
    
  • pom.xml添加依赖:RabbitMQ相关
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.1</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
    </dependency>
    
    

》工作队列案例(自动应答)

  • 说明
    • 主要是两步:提取工具类、分多线程同时处理一个队列
  • 代码组成
    • RabbitMQ工具类:RabbitMqUtils
      package cn.cnyasin.rabbit.utils;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      public class RabbitMqUtils {
          public static Channel getChannel() throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 获取连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              return channel;
          }
      }
      
      
    • 初始化:Init
      package cn.cnyasin.rabbit.worker;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      
      public class Init {
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 获取信道
              Channel channel = RabbitMqUtils.getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
      
              System.out.println("初始化成功。。。");
      
              System.exit(0);
          }
      }
      
      
    • 工作队列01(线程一):Worker01
      package cn.cnyasin.rabbit.worker;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Delivery;
      
      /**
       * 工作队列01(消费者)
       */
      public class Worker01 {
      
          // 定义本工作队列要处理的队列名字
          public static final String QUEUE_NAME = "queue01";
      
          public static void main(String[] args) throws Exception {
              // 获取信道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("[*] 工作队列01正在等待接收消息。。。");
      
              // 处理消息
              channel.basicConsume(
                      // 队列名
                      QUEUE_NAME,
                      // 自动应答
                      true,
                      // 成功处理消息回调
                      (String consumerTag, Delivery message) -> {
                          // TODO 业务逻辑代码在这里
                          System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
                      },
                      // 处理消息失败回调
                      (String consumerTag) -> {
                          System.out.println("处理消息失败");
                      }
              );
          }
      }
      
      
    • 工作队列02(线程二):Worker02
      • 方式一:将上面Worker01代码复制一份,名字改为Worker02,然后运行
      • 方式二:idea支持同一个run方法并行运行:
        • idea | Run | Edit Configurations | Allow parallel run
    • 生产者:Task
      package cn.cnyasin.rabbit.worker;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      
      import java.util.Scanner;
      
      public class Task {
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 获取信道
              Channel channel = RabbitMqUtils.getChannel();
      
              // 接收控制台输入
              Scanner scanner = new Scanner(System.in);
      
              System.out.println("[*] 等待控制台输入消息内容。。。");
      
              while (scanner.hasNext()) {
                  String input = scanner.next();
      
                  if (input.equals("exit")) {
                      break;
                  }
      
                  // 发送消息
                  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes());
      
                  System.out.println("[*] 消息发送成功。。。");
              }
      
              channel.close();
      
              System.out.println("[*] 用户退出。。。");
      
              System.exit(0);
          }
      }
      
      
  • 运行初始化:
    • Init -> main -> run
  • 开启两个工作队列:
    • Worker01 -> main -> run
    • Worker02 -> main -> run
  • 运行生产者:
    • Task -> main -> run

》工作队列案例(手动应答与自动重新入队)

  • 说明
    • 为了防止消息丢失,往往都使用手动应答机制
    • 如果消息处理失败,自动重新入队
    • 代码还是基于上一节:工作队列案例(自动应答)
  • 代码组成
    • RabbitMQ工具类:RabbitMqUtils
      • 代码同:工作队列案例(自动应答)
    • 初始化:Init
      • 代码同:工作队列案例(自动应答)
    • 工作队列01(线程一):Worker01
      package cn.cnyasin.rabbit.ack;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Delivery;
      
      /**
       * 工作队列01(消费者)
       */
      public class Worker01 {
      
          // 定义本工作队列要处理的队列名字
          public static final String QUEUE_NAME = "queue_ack";
      
          public static void main(String[] args) throws Exception {
              // 获取信道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("[*] 工作队列01正在等待接收消息。。。");
      
              // 处理消息
              channel.basicConsume(
                      // 队列名
                      QUEUE_NAME,
                      // 自动应答
                      false,
                      // 成功处理消息回调
                      (String consumerTag, Delivery message) -> {
                          try {
                          	// 沉睡1秒
                              Thread.sleep(1000 * 1);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
      
                          // TODO 业务逻辑代码在这里
                          System.out.println(" [*] 工作队列01成功处理消息:" + new String(message.getBody()));
      
                          // TODO 手动应答
                          channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                      },
                      // 处理消息失败回调
                      (String consumerTag) -> {
                          System.out.println("处理消息失败");
                      }
              );
          }
      }
      
      
    • 工作队列02(线程二):Worker02
      package cn.cnyasin.rabbit.ack;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Delivery;
      
      /**
       * 工作队列02(消费者)
       */
      public class Worker02 {
      
          // 定义本工作队列要处理的队列名字
          public static final String QUEUE_NAME = "queue_ack";
      
          public static void main(String[] args) throws Exception {
              // 获取信道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("[*] 工作队列02正在等待接收消息。。。");
      
              // 处理消息
              channel.basicConsume(
                      // 队列名
                      QUEUE_NAME,
                      // 自动应答
                      true,
                      // 成功处理消息回调
                      (String consumerTag, Delivery message) -> {
                          try {
                          	// 沉睡10秒
                              Thread.sleep(1000 * 10);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
      
                          // TODO 业务逻辑代码在这里
                          System.out.println(" [*] 工作队列02成功处理消息:" + new String(message.getBody()));
      
                          // TODO 手动应答
                          channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                      },
                      // 处理消息失败回调
                      (String consumerTag) -> {
                          System.out.println("处理消息失败");
                      }
              );
          }
      }
      
      
    • 生产者:Task
      • 代码同:工作队列案例(自动应答)
  • 运行初始化:
    • Init -> main -> run
  • 开启两个工作队列:
    • Worker01 -> main -> run
    • Worker02 -> main -> run
  • 运行生产者:
    • Task -> main -> run

》消息持久化

  • 说明
    • 代码借用上一节代码,在此基础上只需要在生产者(Task)代码中进行以下改动
      • 发送消息的时候,第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN,即可使消息持久化存储,即使RabbitMQ宕机消息也不会丢失。
  • 核心代码
    // 消息持久化
    AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN;
    
    // 发送消息
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, input.getBytes());
    

》不公平分发

  • 说明

    • 代码借用上一节代码,在此基础上只需要在消费者(Worker01、Worker02)代码中进行以下改动
      • 处理消息之前,信道设置一下:channel.basicQos()
      • 该函数接收一个int类型值(0=公平分发,大于0=不公平分发,默认0),该值的意思是预取值,代表该工作线程一次取的消息数量
  • 核心代码

    // 设置不公平分发
    int prefetchCount = 1; // 预取值
    channel.basicQos(prefetchCount);
    

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

 类似资料: