基础
高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范,是一个线路层协议,而不是API,不能被开发者直接使用,它的客户端能够无视消息的来源任意发送和接受信息,AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。
版本号
AMQP版本使用两个或三个数字进行表示 – 主版本号,次版本号以及可选的修订版本号.为了方便,版本号表示为:major-minor[-revision] 或major.minor[.revision]:
官方说明中,major, minor, 和revision均支持0到99的数字.
Major, minor, 和 revision 中100及其以上的数字保留用于内部测试和开发.
版本号表明了语法和语义互操作性。
版本 0-9-1 表示 major = 0, minor = 9, revision = 1.
1.1版本表示为major = 1, minor = 1, revision = 0. AMQP/1.1等价于AMQP/1.1.0或AMQP/1-1-0.
QPID 和 RabbitMQ 是实现同种功能的服务软件,都支持AMQP 协议
两种实现的方式:QPID
//配置
@Configuration
public class MQConfiguration {
//mq的地址
@Value("${mq.connection}")
private String mqConnection;
//监听地址一
@Value("${address1}")
private String address1;
//监听地址二
@Value("${address2}")
private String address2;
//地址一的监听器
@Autowired
private Listener1 listener1;
//地址二的监听器
@Autowired
private Listener2 listener2;
//获取连接工厂,通过连接地址将建立这个工厂,制定生成的工厂的名字
@Bean(name = "connectFactory")
public AMQConnectionFactory getAMQConnectionFactory() throws URLSyntaxException {
return new AMQConnectionFactory(mqConnection);
//或者下边的进行设置
final CachingConnectionFactory result = new CachingConnectionFactory(new AMQConnectionFactory(mqConnection));
result.setSessionCacheSize(10);
}
//Spring提供的java消息服务工具类,建立连接等,具体向哪个地址发送在发送类中根据类型进行判断
@Bean(name = "jmsTemplate")
public JmsTemplate jmsTemplate() throws URLSyntaxException {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.getAMQConnectionFactory());
jmsTemplate.setExplicitQosEnabled(true);
return jmsTemplate;
}
//订阅或者监听的地址,生成一个AMQAnyDestination类用于监听
@Bean(name = "address1Subscribe")
public AMQAnyDestination address1Subscribe() throws URISyntaxException {
return new AMQAnyDestination(address1Subscribe);
}
/**
*是一个用于异步消息监听的管理类,、
*第一、生成一个管理类的对象
*第二、进行连接
*第三、设置监听器,是一个类的对象,监听这个地址的消息
**/
@Bean(name = "address1ListenerContainer")
public DefaultMessageListenerContainer address1ListenerContainer() throws URISyntaxException {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(this.getAMQConnectionFactory());
container.setDestination(this.address1Subscribe());
container.setMessageListener(listener1);
return container;
}
@Bean(name = "address2Subscribe")
public AMQAnyDestination address2nSubscribe() throws URISyntaxException {
return new AMQAnyDestination(address2Subscribe);
}
@Bean(name = "address2ListenerContainer")
public DefaultMessageListenerContainer address2nListenerContainer() throws URISyntaxException {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(this.getAMQConnectionFactory());
container.setDestination(this.address2Subscribe());
container.setMessageListener(listener2);
return container;
}
}
//发送消息
@Component
public class MessageSender {
//发送消息的地址一
@Value("${addr.one.publish}")
private String publishOne;
//发送消息的地址一
@Value("${addr.two.publish}")
private String publishTwo;
//将配置类中的Java消息服务类注入
@Autowired
private JmsTemplate jmsTemplate;
//json和bean之间转换互相转换的类
private ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//在使用消息的时候,除了要发送message对象之外,还要发送判断发送地址的类型
public void sendMessage(MessageTypeEnum messageType, Object message) throws URISyntaxException, JsonProcessingException {
if (message == null) {
throw new BadRequestException("JMS message can't be null!");
}
//根据类型判断发送的消息地址
Destination destination = null;
if (messageType.equals(MessageTypeEnum.TASK_ISSUE)) {
destination = new AMQAnyDestination(publishOne);
} else {
destination = new AMQAnyDestination(publishTwo);
}
LOGGER.info(new NameValueList<String, String>().add("desc", "MessageSender send oneMessage")
.add("type", messageType.name()).add("message", objectMapper.writeValueAsString(message)));
jmsTemplate.convertAndSend(destination, objectMapper.writeValueAsString(message));
}
}
//接收端
@Component
public class Listener1 implements MessageListener {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
//由于在配置类中已经将监听地址设置,这里不需要再进行设置,如果没有设置可以采用下边的方式进行
//@JmsListener(destination = "${address1}", concurrency = "5")
@Override
public void onMessage(Message message) {
if (message != null) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String msg = null;
try {
msg = textMessage.getText();
LOGGER.info(new NameValueList().add("desc", "Listener1 receive one message")
.add("body", msg));
handler1Message1(msg);
} catch (Exception e) {
}
}
} else {
LOGGER.info("Listener1 receive one empty message,ignore!");
throw new IllegalArgumentException("AssetAlteration mesage is empty!");
}
}
//进行业务处理
private void handler1Message1(String msg) throws IOException, URISyntaxException, InterruptedException {
while (true) {
if (hasNoLock()) {
break;
} else {//如果有锁的话阻塞2s
Thread.sleep(2000L);
}
}
//反序列化msg
AlterationMessage1 alterationMessage1 = OBJECT_MAPPER.readValue(msg, AlterationMessage1.class);
}
private boolean hasNoLock() throws IOException, URISyntaxException {
List<Info> list = aa.getList();
Boolean noLock = true;
for (Info info : list){
if (something){
noLock = false;
LOGGER.info("There is a lock:"+key);
}
}
return noLock;
}
}
两种实现的方式:RabbitMQ
这个文章讲的好
参考文章
http://www.infoq.com/cn/articles/AMQP-RabbitMQ
AMQP-0-9-1中文规范http://www.blogjava.net/qbna350816/archive/2016/08/12/431554.html