# 使用发布订阅模式
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
# 不使用发布订阅模式
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8"));
// 声明交换机(扇出模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
// 声明交换机(直接模式)
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
aa.bb.cc
# 以下匹配规则都能匹配到上边的语法案例:aa.bb.cc
*.bb.*
aa.bb.*
*.*.cc
aa.#
#.cc
#
aa.bb.cc
String queueName = channel.queueDeclare().getQueue()
package cn.cnyasin.rabbit.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 配置
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 获取连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
return channel;
}
}
package cn.cnyasin.rabbit.publish.fanout;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe01 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_01";
// 路由key名
// public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub1正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key无效,随便绑
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe01.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.fanout;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe02 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_02";
// 路由key名
// public static final String ROUTING_KEY = "publish_subscribe_routing_02";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub2正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key无效,随便绑
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe02.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.fanout;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Publish {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 路由key名
// public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 信道
Channel channel = getChannel();
System.out.println("[*] 正在等待控制台输入消息");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String input = scanner.next();
// 发消息
channel.basicPublish(EXCHANGE_NAME, "", null, input.getBytes("UTF-8"));
// 扇出模式就算绑定了非空路由key,所有队列也能接收到消息
// channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
System.out.println("[*] 消息发送成功");
}
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Publish.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.direct;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe01 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_01";
// 路由key名
public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub1正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe01.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.direct;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe02 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_02";
// 路由key名
public static final String ROUTING_KEY = "publish_subscribe_routing_02";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub2正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe02.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.direct;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Publish {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 路由key名
public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// public static final String ROUTING_KEY = "publish_subscribe_routing_02";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 信道
Channel channel = getChannel();
System.out.println("[*] 正在等待控制台输入消息");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String input = scanner.next();
// 发消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
System.out.println("[*] 消息发送成功");
}
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Publish.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.topic;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe01 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_01";
// 路由key名
public static final String ROUTING_KEY_01 = "*.bb.*";
public static final String ROUTING_KEY_02 = "dd.#";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub1正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_01);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_02);
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe01.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.topic;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe02 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_02";
// 路由key名
public static final String ROUTING_KEY = "aa.#";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new String(message.getBody()));
};
// 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub2正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe02.channel = channel;
}
}
package cn.cnyasin.rabbit.publish.topic;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Publish {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 路由key名
public static final String ROUTING_KEY = "aa.bb.cc";
// public static final String ROUTING_KEY = "dd.ee.ff";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 信道
Channel channel = getChannel();
System.out.println("[*] 正在等待控制台输入消息");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String input = scanner.next();
// 发消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes("UTF-8"));
System.out.println("[*] 消息发送成功");
}
}
public static Channel getChannel() throws Exception {
if (channel == null) {
setChannel(RabbitMqUtils.getChannel());
}
return channel;
}
public static void setChannel(Channel channel) {
Publish.channel = channel;
}
}