当前位置: 首页 > 工具软件 > RabbitMQ-CN > 使用案例 >

RabbitMQ-Java-04-发布订阅模式

彭鸿彩
2023-12-01

说明

  • RabbitMQ-Java-04-发布订阅模式
  • 本案例是一个Maven项目
  • 假设你已经实现了上一节工作队列
  • 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/

核心概念

》原理

  • 发布订阅模式核心是交换机Exchanges
  • 当通过信道(channel)绑定了交换机(exchange)、队列(queue)、路由key(routing_key),就实现了发布订阅模式
  • 之前章节默认我已经使用了发布订阅模式,正式使用往往都是使用发布订阅模式。如果不想使用发布订阅模式可以在发送消息的时候交换机传空字符串,路由key传队列名即可,这样消息默认走的都是默认交换机。
    # 使用发布订阅模式
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
    # 不使用发布订阅模式
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8"));
    
  • 交换机的类型
    • 扇出模式:fanout
      • 交换机消息转发规则
        • 广播到跟他绑定的所有队列
      • 注意:这里是所有不同的队列。如果是同一个队列多个线程同时run(),最终还是会轮询分发到每个线程。
      • 扇出交换机无视路由key。只要是该扇出交换机绑定的队列,不管有没有绑定路由key,所有消息都能收到。
      • 核心代码提取
        // 声明交换机(扇出模式)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        
    • 直接模式:direct
      • 交换机消息转发规则
        • 匹配跟交换机绑定的队列
        • 匹配队列绑定的路由key等于消息指定的路由key
      • 直接交换机会严格判断消息绑定的路由和队列绑定的路由是否匹配
      • 一个队列可以绑定多个路由key
      • 核心代码抽取
        // 声明交换机(直接模式)
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        
    • 主题模式:topic
      • 交换机消息转发规则
        • 匹配跟交换机绑定的队列
        • 匹配队列绑定的路由key经过模糊匹配后等于消息指定的路由key
      • 主题模式路由模糊匹配规则
        • 语法案例
          aa.bb.cc
          
          • 英文句号表示词语连接符
          • 英文句号连接的字符表示一个词语,词语可以是字母、数字、中文,每个词语最少1个字符
        • 匹配符号
          • *:星号匹配一个词语
          • #:井号匹配零个或多个词语
        • 语法案例匹配规则举例
          # 以下匹配规则都能匹配到上边的语法案例:aa.bb.cc
          *.bb.*
          aa.bb.*
          *.*.cc
          aa.#
          #.cc
          #
          aa.bb.cc
          
        • 注意:路由长度最长255个字节
        • 当一个队列绑定的路由key是#,那么这个队列将接受所有数据,等价于fanout
        • 当一个队列绑定的路由key没有#和*,那么这个队列等价于direct
    • 标题模式:headers / match
  • 如何生成一个临时队列?
    String queueName = channel.queueDeclare().getQueue()
    

操作步骤

》完整代码

  • 工具类:RabbitMqUtils
    package cn.cnyasin.rabbit.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMqUtils {
        public static Channel getChannel() throws Exception {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            // 配置
            factory.setHost("192.168.3.202");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("123456");
            factory.setVirtualHost("/");
    
            // 获取连接
            Connection connection = factory.newConnection();
    
            // 获取信道
            Channel channel = connection.createChannel();
    
            return channel;
        }
    }
    
    
  • 扇出模式:fanout
    • Subscribe01
      package cn.cnyasin.rabbit.publish.fanout;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe01 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_01";
      
          // 路由key名
          // public static final String ROUTING_KEY = "publish_subscribe_routing_01";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub1正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key无效,随便绑
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe01.channel = channel;
          }
      }
      
      
    • Subscribe02
      package cn.cnyasin.rabbit.publish.fanout;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe02 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_02";
      
          // 路由key名
          // public static final String ROUTING_KEY = "publish_subscribe_routing_02";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub2正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key无效,随便绑
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe02.channel = channel;
          }
      }
      
      
    • Publish
      package cn.cnyasin.rabbit.publish.fanout;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      
      import java.util.Scanner;
      
      public class Publish {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 路由key名
          // public static final String ROUTING_KEY = "publish_subscribe_routing_01";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 信道
              Channel channel = getChannel();
      
              System.out.println("[*] 正在等待控制台输入消息");
      
              Scanner scanner = new Scanner(System.in);
              while (scanner.hasNext()) {
                  String input = scanner.next();
                  // 发消息
                  channel.basicPublish(EXCHANGE_NAME, "", null, input.getBytes("UTF-8"));
      
                  // 扇出模式就算绑定了非空路由key,所有队列也能接收到消息
                  // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
      
                  System.out.println("[*] 消息发送成功");
              }
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Publish.channel = channel;
          }
      }
      
      
  • 直接模式:direct
    • Subscribe01
      package cn.cnyasin.rabbit.publish.direct;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe01 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_01";
      
          // 路由key名
          public static final String ROUTING_KEY = "publish_subscribe_routing_01";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub1正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe01.channel = channel;
          }
      }
      
      
    • Subscribe02
      package cn.cnyasin.rabbit.publish.direct;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe02 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_02";
      
          // 路由key名
          public static final String ROUTING_KEY = "publish_subscribe_routing_02";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub2正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe02.channel = channel;
          }
      }
      
      
    • Publish
      package cn.cnyasin.rabbit.publish.direct;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      
      import java.util.Scanner;
      
      public class Publish {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 路由key名
          public static final String ROUTING_KEY = "publish_subscribe_routing_01";
          // public static final String ROUTING_KEY = "publish_subscribe_routing_02";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 信道
              Channel channel = getChannel();
      
              System.out.println("[*] 正在等待控制台输入消息");
      
              Scanner scanner = new Scanner(System.in);
              while (scanner.hasNext()) {
                  String input = scanner.next();
                  // 发消息
                  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
      
                  System.out.println("[*] 消息发送成功");
              }
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Publish.channel = channel;
          }
      }
      
      
  • 主题模式:topic
    • Subscribe01
      package cn.cnyasin.rabbit.publish.topic;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe01 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_01";
      
          // 路由key名
          public static final String ROUTING_KEY_01 = "*.bb.*";
          public static final String ROUTING_KEY_02 = "dd.#";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub1正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_01);
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_02);
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe01.channel = channel;
          }
      }
      
      
    • Subscribe02
      package cn.cnyasin.rabbit.publish.topic;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import com.rabbitmq.client.Delivery;
      
      public class Subscribe02 {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 队列名
          public static final String QUEUE_NAME = "publish_subscribe_queue_02";
      
          // 路由key名
          public static final String ROUTING_KEY = "aa.#";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 初始化
              init();
      
              Channel channel = getChannel();
      
              // 接收消息成功回调
              DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                  System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
              };
              // 接收消息失败回调
              CancelCallback cancelCallback = (String consumerTag) -> {
                  System.out.println("[*] 接收消息失败");
              };
      
              System.out.println("[*] Sub2正在等待接收消息。。。");
      
              // 接收消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          /**
           * 初始化
           *
           * @throws Exception
           */
          public static void init() throws Exception {
              Channel channel = getChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Subscribe02.channel = channel;
          }
      }
      
      
    • Publish
      package cn.cnyasin.rabbit.publish.topic;
      
      import cn.cnyasin.rabbit.utils.RabbitMqUtils;
      import com.rabbitmq.client.Channel;
      
      import java.util.Scanner;
      
      public class Publish {
          // 交换机名
          public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
      
          // 路由key名
          public static final String ROUTING_KEY = "aa.bb.cc";
          // public static final String ROUTING_KEY = "dd.ee.ff";
      
          // 信道
          public static Channel channel = null;
      
          public static void main(String[] args) throws Exception {
              // 信道
              Channel channel = getChannel();
      
              System.out.println("[*] 正在等待控制台输入消息");
      
              Scanner scanner = new Scanner(System.in);
              while (scanner.hasNext()) {
                  String input = scanner.next();
                  // 发消息
                  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
      
                  System.out.println("[*] 消息发送成功");
              }
          }
      
          public static Channel getChannel() throws Exception {
              if (channel == null) {
                  setChannel(RabbitMqUtils.getChannel());
              }
      
              return channel;
          }
      
          public static void setChannel(Channel channel) {
              Publish.channel = channel;
          }
      }
      
      

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

 类似资料: