当前位置: 首页 > 工具软件 > rabbit-ex > 使用案例 >

RabbitMQ的Java应用(1) -- Rabbit Java Client使用

邬博涉
2023-12-01

Java环境下使用RabbitMQ客户端需要导入ampq-client库(RabbitMQ的Java Client库,这里我们使用3.6.5版本) ,RabbitMQ服务器使用的是本地RabbitMQ 3.6.6版本。

Maven环境配置

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

Gradle环境

compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
RabbitMQ使用的结构示意图如下:


从示意图可以看出消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队列。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。

这里谈到的Channel可以理解为建立在生产者/消费者和RabbitMQ服务器之间的TCP连接上的虚拟连接,一个TCP连接上可以建立多个Channel。 RabbitMQ服务器的Exchange对象可以理解为生产者发送消息的邮局,消息队列可以理解为消费者的邮箱。Exchange对象根据它定义的规则和消息包含的routing key以及header信息将消息转发到消息队列。

根据转发消息的规则不同,RabbitMQ服务器中使用的Exchange对象有四种,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,如果定义Exchange时没有指定类型和名称, RabbitMQ将会为每个消息队列设定一个Default Exchange,它的Routing Key是消息队列名称。

RabbitMQ Java Client的官网示例有6个,本篇只使用三个例程,分别是使用默认Default Exchange的消息生产/消费,使用Direct Exchange的消息生产/消费,以及RPC方式的消息生产/消费。

为了测试方便,我们新定义了一个virutal host,名字是test_vhosts,定义了两个用户rabbitmq_producer和rabbitmq_consumer, 设置其user_tag为administrator(可以进行远程连接), 为它们设置了访问test_vhosts下所有资源的权限。

使用默认Default Exchange的消息生产/消费

我们定义一个生产者程序,一个消费者程序。

生产者程序代码如下:


public class ProducerApp
{
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbitmq_producer");
            factory.setPassword("123456");
            factory.setVirtualHost("test_vhosts");
 
            //创建与RabbitMQ服务器的TCP连接
            connection  = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("firstQueue", true, false, false, null);
            String message = "First Message";           
            channel.basicPublish("", "firstQueue", null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");            
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
        finally
        {
            if(channel != null)
            {
                channel.close();
            }
            if(connection != null)
            {
                connection.close();
            }
        }
    }

关于生产者的代码有几点说明:

1) RabbitMQ Java Client示例提供的ConnectionFactory属性设置的代码只有一句:

factory.setHost("localhost");


这句代码表示使用rabbitmq服务器默认的virutal host(“/”),默认的用户guest/guest进行连接,但是如果这段代码运行在远程机器上时, 将因为guest用户不能用于远程连接RabbitMQ服务器而运行失败,上面提供的代码是可以进行建立远程连接的代码。
2)Channel建立后,调用Channel.queueDeclare方法创建消息队列firstQueue。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                 Map<String, Object> arguments) throws IOException;   
这个方法的第二个参数durable表示建立的消息队列是否是持久化(RabbitMQ重启后仍然存在,并不是指消息的持久化),第三个参数exclusive 表示建立的消息队列是否只适用于当前TCP连接,第四个参数autoDelete表示当队列不再被使用时,RabbitMQ是否可以自动删除这个队列。 第五个参数arguments定义了队列的一些参数信息,主要用于Headers Exchange进行消息匹配时。
3)生产者发送消息使用Channel.basicPublish方法。

void basicPublish(String exchange, String routingKey, 
 BasicProperties props, byte[] body) throws IOException;

第一个参数exchange是消息发送的Exchange名称,如果没有指定,则使用Default Exchange。 第二个参数routingKey是消息的路由Key,是用于Exchange将消息路由到指定的消息队列时使用(如果Exchange是Fanout Exchange,这个参数会被忽略), 第三个参数props是消息包含的属性信息。RabbitMQ的消息属性和消息体是分开的,不像JMS消息那样同时包含在javax.jms.Message对象中,这一点需要特别注意。 第四个参数body是RabbitMQ消息体。 我们这里调用basicPublish方法发送消息时,props参数为null,因而我们发送的消息是非持久化消息,如果要发送持久化消息,我们需要进行如下设置:
AMQP.BasicProperties props =
                    new AMQP.BasicProperties("text/plain",
                            "UTF-8",
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
 channel.basicPublish("", "firstQueue", props, message.getBytes());   


定义props时的参数2表示消息的类型为持久化消息。 运行生产者程序后,我们可以执行rabbitmqctl命令查看队列消息,我们看到firstQueue队列有一条消息。


消费者代码如下:

public class ConsumerApp
{
    public static void main(String[] args)
    {
        Connection connection = null;
        Channel channel = null;
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbitmq_consumer");
            factory.setPassword("123456");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();
 
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" Consumer have received '" + message + "'");
                }
            };
            channel.basicConsume("firstQueue", true, consumer);
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

消费者代码中,建立Connection,Channel的代码和生产者程序类似。它主要定义了一个Consumer对象,这个对象重载了DefaultCustomer类 的handleDelivery方法:
void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body) 
handleDelivery方法的第一个参数consumerTag是接收到消息时的消费者Tag,如果我们没有在basicConsume方法中指定Consumer Tag,RabbitMQ将使用随机生成的Consumer Tag(如下图所示)


第二个参数envelope是消息的打包信息,包含了四个属性:

1._deliveryTag,消息发送的编号,表示这条消息是RabbitMQ发送的第几条消息,我们可以看到这条消息是发送的 第一条消息。

2._redeliver,重传标志,确认在收到对消息的失败确认后,是否需要重发这条消息,我们这里的值是false,不需要重发。

3._exchange,消息发送到的Exchange名称,正如我们上面发送消息时一样,exchange名称为空,使用的是Default Exchange。

4._routingKey,消息发送的路由Key,我们这里是发送消息时设置的“firstQueue”。

第三个参数properties就是上面使用basicPublish方法发送消息时的props参数,由于我们上面设置它为null,这里接收到的properties 是默认的Properties,只有bodySize,其他全是null。

第四个参数body是消息体.

我们这里重载的handleDelivery方法仅仅打印出了生产者发送的消息内容,实际使用时可以转发给后台程序进行处理。

在Consumer对象定义后,我们调用了Channel.basicConsume方法将Consumer与消息队列绑定,否则Consumer无法从消息队列获取消息。

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException

basicConsume方法的第一个参数是Consumer绑定的队列名,第二个参数是自动确认标志,如果为true,表示Consumer接受到消息后,会自动发确认消息(Ack消息)给消息队列,消息队列会将这条消息从消息队列里删除,第三个参数就是Consumer对象,用于处理接收到的消息。

如果我们想让消费者接收到消息后对消息进行手动确认(Manual Ack),我们需要对代码进行两处改动:

1)在调用basicConsume方法时,将autoAck属性设置为false。

channel.basicConsume("firstQueue", false, consumer);

2)在handleDelivery方法中调用Channel.basicAck方法,发送手动确认消息给消息队列。
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException
{
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}
basicAck方法有两个参数,第一个参数deliverTag是消息的发送编号,第二个参数multiple是消息确认方式,如果值为true,表示对消息队列里所有编号小于或等于当前消息编号的未确认消息进行手动确认,如果为false,表示仅确认当前消息。

消费者代码执行后,我们可以看到消费者程序的控制台输出了这条消息的内容,而且使用rabbitmqctl命令查看队列消息时,队列里的消息数为0。


使用Direct Exchange的消息生产/消费

使用Direct Exchange的生产者/消费者代码与Default Exchange比较类似,不过生产者程序的代码需要添加创建Direct Exchange和 将Exchange和消息队列绑定的代码,具体添加和修改的代码如下:

channel.exchangeDeclare("directExchange", "direct");
channel.queueDeclare("directQueue", true, false, false, null);
channel.queueBind("directQueue", "directExchange", "directMessage");
String message = "First Direct Message";
 
channel.basicPublish("directExchange", "directMessage", null, message.getBytes());
System.out.println("Send Direct Message is:'" + message + "'");


首先我们调用Channel.exchangeDeclare方法创建名为“directExchange”的Direct Exchange。

Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable) throws IOException

exchangeDeclare方法的第一个参数exchange是exchange名称,第二个参数type是Exchange类型,有“direct”,“fanout”,“topic”,“headers”四种,分别对应RabbitMQ的四种Exchange。第三个参数durable是设置Exchange是否持久化( 即在RabbitMQ服务器重启后Exchange是否仍存在,如果没有设置,默认是非持久化的)

创建“directQueue”消息队列后,我们再调用Channel.queueBind方法,将我们创建的Direct Exchange和消息队列绑定。

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
queueBind方法第一个参数queue是消息队列的名称,第二个参数exchange是Exchange的名称,第三个参数routingKey是消息队列和Exchange之间绑定的路由key,我们这里绑定的路由key是“directMessage”。从Exchange过来的消息,只有routing key为“directMessage”的消息会被转到消息队列“directQueue”,其他消息将不会被转发,下面将证实这一点。

运行ProducerApp程序,使用rabbitmq_producer用户登录管理页面,我们可以看到名为“directExchange”的Direct Exchange被创建出来。


消息队列directQueue与它绑定,routing key为directMessage。


消息队列directQueue里有一条消息

我们修改ProducerApp的程序,将消息的routing key改为“indirectMessage”

 String message = "First Indirect Message";
 channel.basicPublish("directExchange", "indirectMessage", null, message.getBytes());
 System.out.println("Send Indirect Message is:'" + message + "'");

再次运行程序后,打开管理页面,我们看到“directQueue”队列里仍然只有一条消息。


我们向Exchange发送的第二条消息由于和绑定的routing key不一致,没有被转发到“directQueue”消息队列,被RabbitMQ丢弃了。

我们通过管理界面再创建一个消息队列“indirectQueue”,在它和“directExchange”之间建立bind关系,routingkey为“indirectMessage” 。

再次运行ProducerApp程序,我们可以看到“directQueue”消息队列消息数仍是1,但“indirectQueue”消息队列接收到了从Exchange转发来的消息。


使用RPC方式的消息生产/消费

RPC方式的消息生产和消费示意图如下:

在这种方式下,生产者和消费者之间的消息发送/接收流程如下:

1)生产者在发送消息的同时,将返回消息的消息队列名(replyTo中指定)以及消息关联Id(correlationId)附带在消息Properties中发送给消费者。

2)消费者在接收到消息,处理完成后,将结果作为返回消息发送到replyTo指定的返回消息队列中,同时附带接收消息中的corrleationId, 以便让生产者接收到到返回消息后,根据corrleationId确认是针对1)中发送消息的返回消息,如果correlationId确认一致,则将返回消息 取出,进行后续处理。

示意图中的生产者和消费者在发送消息时使用的都是Default Exchange,我们接下来的程序做一点改动,使用Direct Exchange。

在我们的程序中,生产者发送一个数字给消费者,消费者接收到消息后,计算这个数字的阶乘结果,返回给生产者。 生产者程序的主要代码如下:

   //创建RPC发送消息的Direct Exchange,消息队列和绑定关系。
   channel.exchangeDeclare("rpcSendExchange", "direct",true);
   channel.queueDeclare("rpcSendQueue", true, false, false, null);
   channel.queueBind("rpcSendQueue", "rpcSendExchange", "rpcSendMessage");
 
   //建立RPC返回消息的Direct Exchange, 消息队列和绑定关系         
   channel.exchangeDeclare("rpcReplyExchange", "direct",true);
   channel.queueDeclare("rpcReplyQueue", true, false, false, null);
   channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");
 
   //创建接收RPC返回消息的消费者,并将它与RPC返回消息队列相关联。
   QueueingConsumer replyCustomer = new QueueingConsumer(channel);
   channel.basicConsume("rpcReplyQueue", true,replyCustomer);
 
   String number = "10";
 
   //生成RPC请求消息的CorrelationId
   String correlationId = UUID.randomUUID().toString();
   //在RabbitMQ消息的Properties中设置RPC请求消息的CorrelationId以及
   //ReplyTo名称(我们这里使用的是Exchange名称,
   //而不是消息队列名称)
   BasicProperties props = new BasicProperties
                          .Builder()
                          .correlationId(correlationId)
                          .replyTo("rpcReplyExchange")
                          .build();
 
   System.out.println("The send message's correlation id is:" + correlationId);            
   channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, number.getBytes());
 
   String response = null;
 
   while(true)
   {
           //从返回消息中取一条消息
       Delivery delivery = replyCustomer.nextDelivery();
       //如果消息的CorrelationId与发送消息的CorrleationId一致,表示这条消息是
           //发送消息对应的返回消息,是阶乘运算的计算结果。
           System.out.println("The received reply message's correlation id is:" + messageCorrelationId);
           String messageCorrelationId = delivery.getProperties().getCorrelationId();
       if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId)) 
           {
        response = new String(delivery.getBody());
        break;
       }
   }
 
   //输出阶乘运算结果
   if(!Strings.isNullOrEmpty(response))
   {
    System.out.println("Factorial(" + number + ") = " + response);
   }
消费者程序的主要代码如下:
 Consumer consumer = new DefaultConsumer(channel)
 {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
    {
       //获取返回消息发送到的Exchange名称
       String replyExchange = properties.getReplyTo();
 
       //设置返回消息的Properties,附带发送消息的CorrelationId.
       AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();
 
       String message = new String(body,"UTF-8");
       System.out.println("The received message is:" + message);
       System.out.println("The received message's correlation id is:" + properties.getCorrelationId());
 
       //计算阶乘,factorial方法是计算阶乘的方法。
       int number = Integer.parseInt(message);
       String response = factorial(number);
 
       //将阶乘消息发送到Reply Exchange
       this.getChannel().basicPublish(replyExchange, "rpcReplyMessage",replyProps, response.getBytes());
   }
};
 
channel.basicConsume("rpcSendQueue", true, consumer);

先运行生产者程序,发送请求消息到Send Exchange,然后等待消费者发送的返回消息。 
再启动消费者程序,计算阶乘并返回结果给Reply Exchange。 两个程序的控制台信息如下图所示
生产者程序控制台
消费者程序控制台
从控制台信息可以看出生产者端根据返回消息中包含的Correlation Id判断出这是发送消息对应的返回消息,获取了阶乘的计算结果。

这个例子只是简单的生产者和消费者之间的方法调用,实际使用时,我们可以基于这个实例,实现更为复杂的操作。

RabbitMQ Client的重连机制

RabbitMQ Java Client提供了重连机制,不过在RabbitMQ Java Client 4.0版本之前,自动重连默认是关闭的。从Rabbit Client 4.0版本开始,自动重连默认是打开的。控制自动重连的属性是com.rabbitmq.client.ConnectionFactory类的automaticRecovery和topologyRecovery属性。

设置automaticRecovery属性为true时,会执行以下recovery:

1)Connection的重连。

2)侦听Connection的Listener的恢复。

3)重新建立在Connection基础上的Channel。

4)侦听Channel的Listener的恢复。

5)Channel上的设置,如basicQos,publisher confirm以及事务属性等的恢复。

当设置topologyRecovery属性为true时,会执行以下recovery:

1)exchange的重新定义(不包含预定义的exchange)

2)queue的重新定义(不包含预定义的queue)

3)binding的重新定义(不包含预定义的binding)

4)所有Consumer的恢复

我们定义一个带auto recovery的消费者程序,我们使用RabbitMQ Java Client 4.0.0版本,这个版本引入了AutorecoveringConnection和

AutorecoveringChannel类,可以添加RecoveryListener对Recovery过程进行监控。

public class RecoveryConsumerApp
{
    public static void main( String[] args ) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            ...................
 
            AutorecoveringConnection connection = (AutorecoveringConnection)connectionFactory.newConnection();
            String originalLocalAddress =
                    connection.getLocalAddress() + ":" + connection.getLocalPort();
            System.out.println("The origin connection's local address is:" + originalLocalAddress);
 
            AutorecoveringChannel  channel = (AutorecoveringChannel)connection.createChannel();
            System.out.println("The origin channel's channel number is:" + channel.getChannelNumber());
 
            channel.exchangeDeclare("recoveryExchange", BuiltinExchangeType.DIRECT, false, true ,null);
            channel.queueDeclare("recoveryQueue", false, false, true,null);
            channel.queueBind("recoveryQueue", "recoveryExchange", "recoveryMessage");
 
            connection.addRecoveryListener(new RecoveryListener() {
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println("Connection handleRecovery method is called");
                    AutorecoveringConnection recoveredConnection =
                            (AutorecoveringConnection)recoverable;
                    String recoveredLocalAddress =
                            recoveredConnection.getLocalAddress() + ":" + recoveredConnection.getLocalPort();
                    System.out.println("The recovered connection's local address is:" + recoveredLocalAddress);
                }
 
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println("Connection handleRecoveryStarted method is called");
                }
            });
 
            channel.addRecoveryListener(new RecoveryListener() {
                    public void handleRecovery(Recoverable recoverable) {
                        System.out.println("Channel handleRecovery method is called");
                        AutorecoveringChannel recoveryChannel =
                                (AutorecoveringChannel)recoverable;
                        System.out.println("The recovered Channel's number is:" + recoveryChannel.getChannelNumber());
                    }
 
                    public void handleRecoveryStarted(Recoverable recoverable) {
                        System.out.println("Channel handleRecoveryStarted method is called");
                    }
            });
 
    }
}
这个程序中Exchange, Queue都是非持久化并且自动删除的。 我们为Connection和Channel分别添加了Recovery Listener匿名对象,

便于确认他们确实进行了Recovery操作。

启动程序后,我们可以看到recoveryExchange和recoveryQueue都被创建出来,且Binding关系建立了。

连接的本地地址是0.0.0.0:8109,Channel编号是1

此时我们关闭RabbitMQ服务器,再重启RabbitMQ服务器,我们可以从控制台界面看到有连接超时的警告信

息以及重连信息。

从重连日志信息中我们可以看出Channel的编号还是1,但是Connection的本地地址已经变成了0.0.0.0:8470,证明进行了重连。

连接到recoveryQueue队列上的Consumer Tag也进行了恢复,而且Consumer Tag与之前的Consumer Tag一致,这是因为设置了

topologyRecovery属性为true。

我们再在生产者程序中使用重连机制,依然使用Rabbit Java Client 4.0版本 生产者程序的片段如下:


[java] view plain copy
<code class="language-java"><span style="font-size:17.5px;">  </span>factory.setAutomaticRecoveryEnabled(true);  
   factory.setNetworkRecoveryInterval(60000);  
   factory.setTopologyRecoveryEnabled(true);  
   
   AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection();  
   AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();     
   //设置Channel为Publish Confirm模式  
   channel.confirmSelect();  <span style="font-size:17.5px;">  </span></code>  


登录管理界面,我们可以看到生产者建立的Channel是Confirm模式(图中Mode列用C表示)

我们关掉RabbitMQ服务器,再重启RabbitMQ服务器,可以看到生产者Channel被恢复,但是本地端口号已经从13684变成了13874,

说明这是重新创建的Channel,创建的Channel仍然是Confirm模式,和最初的Channel一致。

如果我们设置Channel为Transaction模式(调用Channel.txSelect()方法),重连后恢复的Channel的模式也仍然是Transaction模式
 

 类似资料: