mq的ack 主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除。
而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:
Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系。
正题部分(配置手动Ack,实现异常消息回滚)
A. 在消费者端的mq配置文件上添加,配置 关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="queue_xxx" ref="MqConsumer"/> <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/> </rabbit:listener-container>
B. 新建一个类 MqConsumer ,并实现接口 ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。
springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法
C. 关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。
一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
消息监听接口实现
1.MessageListener消费者消息监听(自动进行任务完成确认)
基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto" ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者 则会继续发到该消费者
如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack)
消息确认 如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者,否则返回队列保留在服务器),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
(1) 消息确认 如未确认则消息不再发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
(2) 消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有 consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
(3) 拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
D. 针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。
1. SimpleMessageListenerContainer
这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。
2. org.springframework.amqp.support.converter.SimpleMessageConverter
这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj
3. org.springframework.amqp.rabbit.retry.MessageRecoverer
这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列
4. org.springframework.util.ErrorHandler
这个接口也是在出现异常时候,会触发他的方法
案例。。。。。。。。。。。。。。。。。。。
生产者
import java.io.IOException;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer{
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;
public void sendMessage(Object message) throws IOException {
logger.info("to send message:{}", message);
amqpTemplate.convertAndSend("queue.Test.admin", message);
// implements ConfirmCallback
// Message re = (Message)amqpTemplate.convertSendAndReceive("queue.Test.admin", message);
// amqpTemplate2.convertAndSend("queue.Test.admin", message);
}
消费者 1 及其配置文件.xml
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("admin MessageConsumer consumer receive message------->:{}", message);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 时 是自动确认ack 如果此时消费者抛出异常 消息会发到该队列其他消费者 如没有其他消费者 则会一直发到该消费者
// throw new NullPointerException(".....admin.....消费者异常。。。。。。。。");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定连接rabbit server参数
<rabbit:connection-factory id="connectionFactory" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" />
-->
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" >
</rabbit:queue>
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest"/>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledge="manual" -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定义queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTopic"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>
消费者 2 及其配置文件.xml(另外一个项目)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// TODO Auto-generated method stub
// 消息监听接口实现
// 1.MessageListener消费者消息监听(自动进行任务完成确认)
// 基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto" ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者 则会继续发到该消费者
// 如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
//
// 2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
// 基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
// 所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack)
//
// 消息确认 如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
// 消息确认 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
// (1)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
// (2)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
// (3)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//..........手动消息确认。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 时 是自动确认ack 如果此时消费者抛出异常 消息会发到该队列其他消费者 如没有其他消费者 则会一直发到该消费者
// if(true){
// throw new NullPointerException(".....admin.....消费者异常。。。。。。。。");
// }
logger.error("收到");
//消息确认 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("business-admin MessageConsumer receive message 出现异常 并将该消息重新入队列------->:{}", message);
logger.info("messageid:"+message.getMessageProperties().getDeliveryTag()+" ...messageBody:"+message.getBody());
//消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//........................手动通知消息生产者。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定连接rabbit server参数
<rabbit:connection-factory id="connectionFactory2" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" publisher-confirms="true"/>
-->
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<rabbit:queue name="queueTest3" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledge="manual"-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage" />
</rabbit:listener-container>
<!-- .............................................................................. -->
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定义queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTopic" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="topicConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>