1、首先是rabbitmq的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <rabbit:connection-factory id="connectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" publisher-confirms="true" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true" /> <rabbit:queue name="CONFIRM_TEST" /> <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" > <rabbit:bindings> <rabbit:binding queue="CONFIRM_TEST" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 配置consumer, 监听的类和queue的对应关系 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" > <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" /> </rabbit:listener-container> </beans>
2、发送方:
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service("publishService") public class PublishService { @Autowired private AmqpTemplate amqpTemplate; public void send(String exchange, String routingKey, Object message) { amqpTemplate.convertAndSend(exchange, routingKey, message); } }
3、消费方:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; @Service("receiveConfirmTestListener") public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try{ System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch(Exception e){ e.printStackTrace();//TODO 业务处理 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); } } }
4、确认后回调方:
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; @Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); } }
5、失败后return回调:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.stereotype.Service; @Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey); } }
6、测试类:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.dingcheng.confirms.publish.PublishService; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:application-context.xml"}) public class TestConfirm { @Autowired private PublishService publishService; private static String exChange = "DIRECT_EX"; @Test public void test1() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis(); System.out.println("test1---message:"+message); //exchange,queue 都正确,confirm被回调, ack=true publishService.send(exChange,"CONFIRM_TEST",message); Thread.sleep(1000); } @Test public void test2() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis(); System.out.println("test2---message:"+message); //exchange 错误,queue 正确,confirm被回调, ack=false publishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000); } @Test public void test3() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis(); System.out.println("test3---message:"+message); //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE publishService.send(exChange,"",message); // Thread.sleep(1000); } @Test public void test4() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis(); System.out.println("test4---message:"+message); //exchange 错误,queue 错误,confirm被回调, ack=false publishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000); } }
7、测试结果:
test1---message:currentTime:1483786948506 test2---message:currentTime:1483786948532 consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506 test3---message:currentTime:1483786948536 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) confirm--:correlationData:null,ack:false,cause:Channel closed by application [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey: confirm--:correlationData:null,ack:true,cause:null test4---message:currentTime:1483786948546 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
8、总结如下:
如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
备注:需要说明,spring-rabbit和原生的rabbit-client ,表现是不一样的。测试的时候,原生的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的
源码地址:https://github.com/qq315737546/spring-rabbit
全文地址请点击:https://blog.csdn.net/qq315737546/article/details/54176560