当前位置: 首页 > 工具软件 > play-rabbitmq > 使用案例 >

java 连接 rabbitMq,并处理数据的例子

霍伟彦
2023-12-01

从我得另外一篇文章中摘抄出来: https://mp.csdn.net/postedit/103457479

rabbitMq jar版本: amqp-client-5.7.3.jar

例子:

说明一个rabbitmq 的例子,由于涉及隐私,某些数据已经脱敏,仅供展示

连接工厂,创建连接

package mq.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq 连接工厂
 */
public class AmqpConnectFactory {


    public static String username ="";
    public static String password ="";
    public static String inPlayPackageId = "";

    public static String inPlayHost = "";
    

    public static void main(String[] args) throws IOException, TimeoutException {
        ClientUitl.startInPlayMq(); // 启动mq
        inPlay();//连接mq 

    }


    /**
     *
     * @throws IOException
     * @throws TimeoutException
     */
    public static void inPlay() throws IOException, TimeoutException {
//        createConnection(inPlayHost,inPlayPackageId,"C:\\Users\\admin\\Desktop\\newRecord\\");
        ConnectionFactory connectioinFactory = getConnectioinFactory(inPlayHost);
        Connection connection = getConnection(connectioinFactory);

        Channel channel =  getChannel(connection,inPlayPackageId,"C:\\Users\\admin\\Desktop\\newRecord\\"); // 开启一个通道并设置 相关的消费者等信息
    }




    /**
     * 得到连接工厂
     * @param host
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private static ConnectionFactory getConnectioinFactory(String host) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(host);
        connectionFactory.setPort(5672);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setVirtualHost("Customers");
        connectionFactory.setRequestedHeartbeat(580);
        connectionFactory.setNetworkRecoveryInterval(1000);// 设置连接间隔
        // 或者使用 url 进行连接
        // factory.setUri(“ amqp:// userName:password @ hostName:portNumber / virtualHost”);
//        connectionFactory.setUri("amqp://"+username+":"+password+"@"+ inPlayHost +":" + port +"/" + virtualHost);

//        final Connection conn = connectionFactory.newConnection(); // 新建一个连接
        return connectionFactory;
    }


    /**
     * 得到一个新的连接
     * @param factory
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection(ConnectionFactory factory) throws IOException, TimeoutException {
        Connection connection = factory.newConnection();
        return connection;
    }


    /**
     * 得到一个新的通道
     * @param conn
     * @param packageId
     * @return
     * @throws IOException
     */
    public static Channel getChannel(Connection conn,String packageId,String path) throws IOException {
        final Channel channel = conn.createChannel(); // 打开一个通道(同文档中的模型)
//        model.BasicQos(prefetchSize: 0, prefetchCount: 1000, global: false);
        channel.basicQos(0, 1000, false);// 配置服务质量
        // 订阅主题, 设置消息自动确认,配置消费者
        channel.basicConsume("_" + packageId +"_", true, new TestConsumer(channel,conn,packageId,path));
        return channel;
    }













    @Override
    protected void finalize() throws Throwable {
        super.finalize();
    }
}

消费者配置

这里其实可以用 jar 包自带的DefaultConsumer, 但由于我这边处理比较复杂,因此自己实现了一个

说明: 该例实现了 Consumer 接口的handleConsumeOk() 方法,在消费者配置成功了,(调用定时任务)就调用上文代码中的线程池去队列中取消息, 具体消息处理方法handleDelivery () 方法不再处理具体事务,仅仅将消息放入异步队列中

package mq.util;

import com.rabbitmq.client.*;
import com.ws.com.util.TimerUtil;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 *   mq 消费者
 *
 */
public class TestConsumer implements Consumer { //extends DefaultConsumer {


    private Channel channel;
    private Connection connection;
    private volatile String consumerTag;


    private String packageId;
    private String path;


    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public TestConsumer(Channel channel,Connection connection,String packageId,String path) {
//        super(channel);
        this.channel = channel;
        this.connection = connection;
        this.packageId = packageId;
        this.path = path;
    }

    /**
     * 接收到此使用者的basic.deliver时调用
     * @param consumerTag
     * @param envelope
     * @param properties
     * @param body
     * @throws IOException
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//        super.handleDelivery(consumerTag, envelope, properties, body);
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        long deliveryTag = envelope.getDeliveryTag();
        // 处理消息,将消息放入队列中
        CacheUtil.queue.offer(new String(body));
//        handleMessage(body,path);

        // 由于前面设置通道时已经设置了消息消费后自动确认,所以这里不用再确认消息,否则 双重确认会导致通道异常关闭
//            channel.basicAck(deliveryTag, true);
//
    }

//
    /**
     * :当通道或基础连接被关闭时调用。
     * 通道关闭的问题
     * @param consumerTag
     * @param sig
     */
    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        System.out.println("通道意外关闭,重新设置通道");
        System.out.println("关闭原因"+sig.getMessage());
//        this.chan
        try {
            Channel newChannel = connection.createChannel();
            channel = newChannel;// 将绑定的通道设置为新通道
            channel.basicQos(0, 1000, false);// 配置服务质量
            // 配置消费者
            channel.basicConsume("_" + packageId +"_", true, this);
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    /**
     * 当消费者由于除调用Channel.basicCancel之外的其他原因被取消时调用。
     * @param consumerTag
     * @throws IOException
     */
    @Override
    public void handleCancel(String consumerTag) throws IOException {
//        super.handleCancel(consumerTag);
        this.consumerTag = consumerTag;
    }

    /**
     * 当消费者被Channel.basicCancel的调用取消时调用。
     * @param consumerTag
     */
    @Override
    public void handleCancelOk(String consumerTag) {
//        super.handleCancelOk(consumerTag);
    }

    /**
     * 当使用者通过对任何通道的调用注册时调用 Channel.basicConsume方法。
     * @param consumerTag
     */
    @Override
    public void handleConsumeOk(String consumerTag) {
//        super.handleConsumeOk(consumerTag);
        System.out.println("注册通过成功");
        // 开启线程池读取消息,并保存, 每 500毫秒调用1次,拿取一个消息
        TimerUtil.setTimer(new SaveRunner(path),50,TimeUnit.MICROSECONDS);

    }


    /**
     *  当 basic.recover. 收到 回复 basic.recover-ok  时调用
     * @param consumerTag
     */
    @Override
    public void handleRecoverOk(String consumerTag) {
        System.out.println("接收消息成功");
//        super.handleRecoverOk(consumerTag);
    }


    // ==============================================================


    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public String getConsumerTag() {
        return consumerTag;
    }


    // ================================================= 具体处理消息的逻辑 ==================================================================================================================




}

其他类,同上篇文章(https://mp.csdn.net/postedit/103457479) TimerUtil, CacheUtil, SaveRunner

 类似资料: