上篇openJms介绍 (一) 提到了openJms的构建及消息的发送和接收,这篇主要了解消息的发布和订阅。JMS 的发布/订阅模型定义了如何向一个内容节点发布和订阅消息,内容节点也叫主题(topic),主题是为发布者(publisher)和订阅者(subscribe) 提供传输的中介。发布/订阅模型使发布者和订阅者之间不需要直接通讯(如RMI)就可保证消息的传送,有效解决系统间耦合问题(当然有这个需要才行),还有就是提供了一对一、一对多的通讯方式,比较灵活。
先介绍JMS里2个概念,持久订阅模式和非持久订阅模式,其实也是发布/订阅模型在可靠性上提供的2种方式:
非持久订阅模式:只有当客户端处于激活状态,也就是和JMS 服务器保持连接的状态下,才能接收到发送到某个Topic的消息,而当客户端处于离线状态时,则这个时间段发到Topic的消息将会永远接收不到。
持久订阅模式:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS 服务器会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS 服务器时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息,即消息永远能接收到。
下面我们就接着来看openJms在发布/订阅模式上的表现,由于篇幅关系,在这里只讲述非持久订阅模式,持久订阅模式可以根据JMS的标准来试。
消息发布的代码如下:
package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; /** * @author Liang.xf 2004-12-27 * openJms 发布消息演示 * www.javayou.com */ public class TopicPublish { public static void main(String[] args) { try { //取得JNDI上下文和连接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); //openJms默认的端口是1099 properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //获得JMS Topic连接队列工厂 TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); //创建一个Topic连接,并启动 TopicConnection topicConnection = factory.createTopicConnection(); topicConnection.start(); //创建一个Topic会话,并设置自动应答 TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); //lookup 得到 topic1 Topic topic = (Topic) context.lookup("topic1"); //用Topic会话生成Topic发布器 TopicPublisher topicPublisher = topicSession.createPublisher(topic); //发布消息到Topic System.out.println("消息发布到Topic"); TextMessage message = topicSession.createTextMessage ("你好,欢迎定购Topic类消息"); topicPublisher.publish(message); //资源清除,代码略 ... ... } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } } |
而订阅消息的接收有同步的和异步2种,他们分别使用receive()和onMessage(Message message)方法来接收消息,具体代码:
同步接收:
package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; /** * @author Liang.xf 2004-12-27 * openJms 非持久订阅同步接收演示 * www.javayou.com */ public class TopicSubscribeSynchronous { public static void main(String[] args) { try { System.out.println("定购消息接收启动:"); //取得JNDI上下文和连接 Hashtable properties = new Hashtable(); properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //获得Topic工厂和Connection TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); TopicConnection topicConnection = factory.createTopicConnection(); topicConnection.start(); //创建Topic的会话,用于接收信息 TopicSession topicSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); //lookup topic1 Topic topic = (Topic) context.lookup("topic1"); //创建Topic subscriber TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); //收满10条订阅消息则退出 for (int i=0; i<10; i++) { //同步消息接收,使用receive方法,堵塞等待,直到接收消息 TextMessage message = (TextMessage) topicSubscriber.receive(); System.out.println("接收订阅消息["+i+"]: " + message.getText()); } //资源清除,代码略 ... ... System.out.println("订阅接收结束."); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } } |
非同步接收:
package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; /** * @author Liang.xf 2004-12-27 * openJms 非持久订阅异步接收演示 * www.javayou.com */ public class TopicSubscribeAsynchronous implements MessageListener { private TopicConnection topicConnection; private TopicSession topicSession; private Topic topic; private TopicSubscriber topicSubscriber; TopicSubscribeAsynchronous() { try { //取得JNDI上下文和连接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //取得Topic的连接工厂和连接 TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); //创建Topic的会话,用于接收信息 topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) context.lookup("topic1"); //创建Topic subscriber topicSubscriber = topicSession.createSubscriber(topic); //设置订阅监听 topicSubscriber.setMessageListener(this); //启动信息接收 topicConnection.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { System.out.println("非同步定购消息的接收:"); try { TopicSubscribeAsynchronous listener = new TopicSubscribeAsynchronous(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } //收到订阅信息后自动调用此方法 public void onMessage(Message message) { try { String messageText = null; if (message instanceof TextMessage) messageText = ((TextMessage) message).getText(); System.out.println(messageText); } catch (JMSException e) { e.printStackTrace(); } } } |
编译好后,启动openJms服务,打开admin管理台,为了运行方便,这里先列出三个类的运行命令:
java -cp ./; -Djava.ext.dirs=./lib; javayou.demo.openjms.TopicPublish
java -cp ./; -Djava.ext.dirs=./lib; javayou.demo.openjms.TopicSubscribeSynchronous
java -cp ./; -Djava.ext.dirs=./lib; javayou.demo.openjms.TopicSubscribeAsynchronous
先运行2个接收命令,再运行发布命令,可以看到控制台的Topic有消息接收,并且接收1和2都有消息接收的提示,到此完成演示,由于是非持久订阅,所以可以看到控制台上的Topic消息条数不会减少。
最后,说说openJms的缺点,它不支持XA transactions、集群和热备等高级功能,如果你需要这些特性,最好还是使用商业的JMS服务器,但不论怎样,openJms为我们提供了一个学习JMS的最好路径,有兴趣了解JMS的还是来尝试尝试吧。