- 初始化类:Initialization
package cn.cnyasin.rabbit.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*
* 初始化
* 作用:
* 创建交换机
* 创建队列
* 绑定队列、交换机、路由key
*/
public class Initialization {
// 交换机名
public static final String EXCHANGE_NAME = "exchange01";
// 队列名
public static final String QUEUE_NAME = "queue01";
// 路由key
public static final String ROUTING_KEY = "routing01";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 配置
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 获取连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
connection.close();
System.out.println("初始化成功。。。");
}
}
- 消费者类:Consumer
package cn.cnyasin.rabbit.hello;
import com.rabbitmq.client.*;
/*
* 消费者
* 消费者消费消息流程
* 建立连接(connection)
* 获取信道(channel)
* 消费队列中的消息,自动应答
* 接收消息回调方法
* 拒绝消息回调方法
*/
public class Consumer {
// 队列名
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 消费队列中的消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
// 接收消息回调方法
public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
};
// 拒绝消息回调方法
public static CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消费消息失败");
};
}
- 生产者类:Producer
package cn.cnyasin.rabbit.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*
* 生产者
* 发送消息流程:
* 建立连接(connection)
* 获取信道(channel)
* 通过信道将消息发送到指定交换机(exchange),并绑定路由key(routingKey),路由key可以是多个
* 注意:
* 生产者不需要关心队列(queue)
* 生产者发送消息前需要准备好:
* 创建相关交换机
* 创建相关队列
* 绑定队列、交换机、路由key
*/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "exchange01";
// 路由key
public static final String ROUTING_KEY = "routing01";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 配置信息
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 发送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes());
connection.close();
System.out.println("消息发送成功。。。");
}
}