从我得另外一篇文章中摘抄出来: https://mp.csdn.net/postedit/103457479
rabbitMq jar版本: amqp-client-5.7.3.jar
说明一个rabbitmq 的例子,由于涉及隐私,某些数据已经脱敏,仅供展示
package mq.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq 连接工厂
*/
public class AmqpConnectFactory {
public static String username ="";
public static String password ="";
public static String inPlayPackageId = "";
public static String inPlayHost = "";
public static void main(String[] args) throws IOException, TimeoutException {
ClientUitl.startInPlayMq(); // 启动mq
inPlay();//连接mq
}
/**
*
* @throws IOException
* @throws TimeoutException
*/
public static void inPlay() throws IOException, TimeoutException {
// createConnection(inPlayHost,inPlayPackageId,"C:\\Users\\admin\\Desktop\\newRecord\\");
ConnectionFactory connectioinFactory = getConnectioinFactory(inPlayHost);
Connection connection = getConnection(connectioinFactory);
Channel channel = getChannel(connection,inPlayPackageId,"C:\\Users\\admin\\Desktop\\newRecord\\"); // 开启一个通道并设置 相关的消费者等信息
}
/**
* 得到连接工厂
* @param host
* @return
* @throws IOException
* @throws TimeoutException
*/
private static ConnectionFactory getConnectioinFactory(String host) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(5672);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setVirtualHost("Customers");
connectionFactory.setRequestedHeartbeat(580);
connectionFactory.setNetworkRecoveryInterval(1000);// 设置连接间隔
// 或者使用 url 进行连接
// factory.setUri(“ amqp:// userName:password @ hostName:portNumber / virtualHost”);
// connectionFactory.setUri("amqp://"+username+":"+password+"@"+ inPlayHost +":" + port +"/" + virtualHost);
// final Connection conn = connectionFactory.newConnection(); // 新建一个连接
return connectionFactory;
}
/**
* 得到一个新的连接
* @param factory
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection(ConnectionFactory factory) throws IOException, TimeoutException {
Connection connection = factory.newConnection();
return connection;
}
/**
* 得到一个新的通道
* @param conn
* @param packageId
* @return
* @throws IOException
*/
public static Channel getChannel(Connection conn,String packageId,String path) throws IOException {
final Channel channel = conn.createChannel(); // 打开一个通道(同文档中的模型)
// model.BasicQos(prefetchSize: 0, prefetchCount: 1000, global: false);
channel.basicQos(0, 1000, false);// 配置服务质量
// 订阅主题, 设置消息自动确认,配置消费者
channel.basicConsume("_" + packageId +"_", true, new TestConsumer(channel,conn,packageId,path));
return channel;
}
@Override
protected void finalize() throws Throwable {
super.finalize();
}
}
这里其实可以用 jar 包自带的DefaultConsumer, 但由于我这边处理比较复杂,因此自己实现了一个
说明: 该例实现了 Consumer 接口的handleConsumeOk() 方法,在消费者配置成功了,(调用定时任务)就调用上文代码中的线程池去队列中取消息, 具体消息处理方法handleDelivery () 方法不再处理具体事务,仅仅将消息放入异步队列中
package mq.util;
import com.rabbitmq.client.*;
import com.ws.com.util.TimerUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* mq 消费者
*
*/
public class TestConsumer implements Consumer { //extends DefaultConsumer {
private Channel channel;
private Connection connection;
private volatile String consumerTag;
private String packageId;
private String path;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public TestConsumer(Channel channel,Connection connection,String packageId,String path) {
// super(channel);
this.channel = channel;
this.connection = connection;
this.packageId = packageId;
this.path = path;
}
/**
* 接收到此使用者的basic.deliver时调用
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// 处理消息,将消息放入队列中
CacheUtil.queue.offer(new String(body));
// handleMessage(body,path);
// 由于前面设置通道时已经设置了消息消费后自动确认,所以这里不用再确认消息,否则 双重确认会导致通道异常关闭
// channel.basicAck(deliveryTag, true);
//
}
//
/**
* :当通道或基础连接被关闭时调用。
* 通道关闭的问题
* @param consumerTag
* @param sig
*/
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
System.out.println("通道意外关闭,重新设置通道");
System.out.println("关闭原因"+sig.getMessage());
// this.chan
try {
Channel newChannel = connection.createChannel();
channel = newChannel;// 将绑定的通道设置为新通道
channel.basicQos(0, 1000, false);// 配置服务质量
// 配置消费者
channel.basicConsume("_" + packageId +"_", true, this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 当消费者由于除调用Channel.basicCancel之外的其他原因被取消时调用。
* @param consumerTag
* @throws IOException
*/
@Override
public void handleCancel(String consumerTag) throws IOException {
// super.handleCancel(consumerTag);
this.consumerTag = consumerTag;
}
/**
* 当消费者被Channel.basicCancel的调用取消时调用。
* @param consumerTag
*/
@Override
public void handleCancelOk(String consumerTag) {
// super.handleCancelOk(consumerTag);
}
/**
* 当使用者通过对任何通道的调用注册时调用 Channel.basicConsume方法。
* @param consumerTag
*/
@Override
public void handleConsumeOk(String consumerTag) {
// super.handleConsumeOk(consumerTag);
System.out.println("注册通过成功");
// 开启线程池读取消息,并保存, 每 500毫秒调用1次,拿取一个消息
TimerUtil.setTimer(new SaveRunner(path),50,TimeUnit.MICROSECONDS);
}
/**
* 当 basic.recover. 收到 回复 basic.recover-ok 时调用
* @param consumerTag
*/
@Override
public void handleRecoverOk(String consumerTag) {
System.out.println("接收消息成功");
// super.handleRecoverOk(consumerTag);
}
// ==============================================================
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public String getConsumerTag() {
return consumerTag;
}
// ================================================= 具体处理消息的逻辑 ==================================================================================================================
}
其他类,同上篇文章(https://mp.csdn.net/postedit/103457479) TimerUtil, CacheUtil, SaveRunner