经过几个开源项目比较,最终发现合适的项目FFMQ:http://sourceforge.net/projects/ffmq/,项目大小才600KB,支持JMS1.1规范。
以下代码仅用于供测试参考,不具备生产环境下的严谨,具体FFMQ配置请看说明文档(下载包中有)。
启动:
import java.io.FileInputStream;
import java.util.Properties;
import net.timewalker.ffmq3.listeners.ClientListener;
import net.timewalker.ffmq3.listeners.tcp.io.TcpListener;
import net.timewalker.ffmq3.local.FFMQEngine;
import net.timewalker.ffmq3.management.destination.definition.QueueDefinition;
import net.timewalker.ffmq3.management.destination.definition.TopicDefinition;
import net.timewalker.ffmq3.utils.Settings;
/**
* Embedded FFMQ sample
*/
public class EmbeddedFFMQSample implements Runnable
{
private FFMQEngine engine;
public void run()
{
try
{
// Create engine settings
Settings settings = createEngineSettings();
// Create the engine itself
engine = new FFMQEngine("myLocalEngineName", settings);
// -> myLocalEngineName will be the engine name.
// - It should be unique in a given JVM
// - This is the name to be used by local clients to establish
// an internal JVM connection (high performance)
// Use the following URL for clients : vm://myLocalEngineName
//
// Deploy the engine
System.out.println("Deploying engine : "+engine.getName());
engine.deploy();
// - The FFMQ engine is not functional until deployed.
// - The deploy operation re-activates all persistent queues
// and recovers them if the engine was not properly closed.
// (May take some time for large queues)
// Adding a TCP based client listener
System.out.println("Starting listener ...");
ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null);
tcpListener.start();
// This is how you can programmatically define a new queue
if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo1"))
{
QueueDefinition queueDef = new QueueDefinition(settings);
queueDef.setName("foo2");
queueDef.setMaxNonPersistentMessages(0);
queueDef.setOverflowToPersistent(false);
queueDef.setPreAllocateFiles(true);
queueDef.setTemporary(false);
queueDef.setUseJournal(true);
queueDef.setAutoExtendAmount(128);
queueDef.setInitialBlockCount(32);
queueDef.setMaxBlockCount(1024);
queueDef.check();
engine.createQueue(queueDef);
}
// You could also define a queue using some java Properties
if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2"))
{
Properties queueProps = new Properties();
queueProps.put("name", "foo2");
queueProps.put("persistentStore.useJournal", "false");
queueProps.put("memoryStore.maxMessages", "1000");
QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps));
engine.createQueue(queueDef2);
}
if(!engine.getDestinationDefinitionProvider().hasTopicDefinition("foox")) {
TopicDefinition topicDef = new TopicDefinition(settings);
topicDef.setName("foox");
topicDef.setMaxNonPersistentMessages(0);
topicDef.setOverflowToPersistent(false);
topicDef.setPreAllocateFiles(true);
topicDef.setTemporary(false);
topicDef.setUseJournal(true);
topicDef.check();
engine.createTopic(topicDef);
}
// Run for some time
System.out.println("Running ...");
Thread.sleep(60*1000);
// Stopping the listener
System.out.println("Stopping listener ...");
tcpListener.stop();
// Undeploy the engine
System.out.println("Undeploying engine ...");
engine.undeploy();
// - It is important to properly shutdown the engine
// before stopping the JVM to make sure current transactions
// are nicely completed and storages properly closed.
System.out.println("Done.");
}
catch (Exception e)
{
// Oops
e.printStackTrace();
}
}
private Settings createEngineSettings()
{
// Various ways of creating engine settings
// 1 - From a properties file
Properties externalProperties = new Properties();
try
{
FileInputStream in = new FileInputStream("D:\\ffmq3-distribution-3.0.5-dist\\conf\\ffmq-server.properties");
externalProperties.load(in);
in.close();
}
catch (Exception e)
{
throw new RuntimeException("Cannot load external properties",e);
}
Settings settings = new Settings(externalProperties);
// 2 - Explicit Java code
// Settings settings = new Settings();
//
// settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, ".");
// ...
return settings;
}
public static void main(String[] args)
{
System.setProperty("FFMQ_BASE", "D:\\ffmq3-distribution-3.0.5-dist");
new EmbeddedFFMQSample().run();
}
}
模拟发送:
import java.util.Hashtable;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import net.timewalker.ffmq3.FFMQConstants;
public class Sender implements Runnable {
public static void main(String[] args) throws Exception {
new Thread(new Sender("queue/foo1", "1")).start();
new Thread(new Sender("queue/foo2", "2")).start();
Thread.sleep(10000);
run = false;
Thread.sleep(1000);
}
private static volatile boolean run = true;
private String queueName;
private String qtmId;
private Sender(String queueName, String qtmId) {
super();
this.queueName = queueName;
this.qtmId = qtmId;
}
@Override
public void run() {
try {
// Create and initialize a JNDI context
Hashtable<String,String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
Context context = new InitialContext(env);
// Lookup a connection factory in the context
ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
// Obtain a JMS connection from the factory
Connection conn = connFactory.createConnection("test","test");
conn.start();
Destination dest1=(Queue) context.lookup(queueName);
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
Random rnd = new Random(System.currentTimeMillis());
long ms = (long)rnd.nextFloat() * 10 * 1000;
if(ms > 8000) {
ms /= 2;
} else if(ms < 1000) {
ms = 1500;
}
int i = 1;
MessageProducer p = session.createProducer(dest1);
while (run) {
TextMessage msg = session.createTextMessage();
String t = "[" + qtmId + "] Hello " + queueName + " " + i++;
System.out.println("sended..." + t);
msg.setStringProperty("QTMID", qtmId);
msg.setText(t);
p.send(msg);
Thread.sleep(ms);
}
p.close();
session.close();
conn.close();
context.close();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
模拟接收:
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import net.timewalker.ffmq3.FFMQConstants;
public class Receiver implements Runnable {
private static volatile boolean run = true;
public static void main(String[] args) throws Exception {
new Thread(new Receiver()).start();
Thread.sleep(10000);
run = false;
Thread.sleep(2000);
}
private Connection conn;
private Session session;
private MessageConsumer consumer;
private void init() throws Exception {
// Create and initialize a JNDI context
Hashtable<String,String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
Context context = new InitialContext(env);
// Lookup a connection factory in the context
ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
Destination dest1=(Queue) context.lookup("queue/foo2");
context.close();
// Obtain a JMS connection from the factory
conn = connFactory.createConnection("test", "test");
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(dest1);
System.err.println("INIT.........");
}
private void destory() {
try {
consumer.close();
session.close();
conn.stop();
conn.close();
System.err.println("EXIT........REC");
} catch (JMSException e) {
e.printStackTrace();
}
}
public void run() {
try {
init();
// consumer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message m) {
// try {
// System.err.println("receive: " + ((TextMessage) m).getText());
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// });
while(run) {
// Thread.sleep(500);
Message m = consumer.receive(500);
if(m != null) {
System.err.println("receive: " + ((TextMessage) m).getText());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
destory();
}
}
}
主题订阅:
package topic;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import net.timewalker.ffmq3.FFMQConstants;
public class SubClient implements Runnable {
private String topicName;
private String qtmId;
private TopicConnection conn;
private TopicSession session;
private TopicSubscriber subscriber;
public static void main(String[] args) throws Exception {
for(int i = 1; i < 5; i++) {
new Thread(new SubClient("topic/foox", String.valueOf(i))).start();
}
System.out.println(Thread.currentThread() + " EEXIT");
}
private SubClient(String topicName, String qtmId) {
super();
this.topicName = topicName;
this.qtmId = qtmId;
}
private void init() throws Exception {
// Create and initialize a JNDI context
Hashtable<String,String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
Context context = new InitialContext(env);
// Lookup a connection factory in the context
TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
Topic topic = (Topic) context.lookup(topicName);
context.close();
// Obtain a JMS connection from the factory
conn = connFactory.createTopicConnection("test","test");
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
String selector = "(QTMID = '" + qtmId + "')";
System.out.println("Selector: " + selector);
subscriber = session.createSubscriber(topic, selector, false);
System.err.println("INIT.........");
}
private void destory() {
try {
subscriber.close();
session.close();
conn.stop();
conn.close();
System.err.println(Thread.currentThread() + " Client EXIT........REC");
} catch (JMSException e) {
e.printStackTrace();
}
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
init();
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
System.err.println(Thread.currentThread() + " Client " + qtmId + " Subscriber receive: " + ((TextMessage) m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
conn.start();
Thread.currentThread().sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
destory();
}
}
}
持久订阅:
package topic;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import net.timewalker.ffmq3.FFMQConstants;
public class SubServer implements Runnable {
private String topicName;
/**
* @param args
*/
public static void main(String[] args) throws Exception {
new Thread(new SubServer("topic/foox")).start();
System.out.println(Thread.currentThread() + " main exit");
}
private TopicConnection conn;
private TopicSession session;
private TopicSubscriber subscriber;
private SubServer(String topicName) {
super();
this.topicName = topicName;
}
private void init() throws Exception {
// Create and initialize a JNDI context
Hashtable<String,String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
Context context = new InitialContext(env);
// Lookup a connection factory in the context
TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
Topic topic = (Topic) context.lookup(topicName);
context.close();
// Obtain a JMS connection from the factory
conn = connFactory.createTopicConnection("test","test");
conn.setClientID("SERVER");
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(topic, "DB");
System.out.println("INIT........." + subscriber);
}
private void destory() {
try {
subscriber.close();
session.close();
conn.stop();
conn.close();
System.err.println(Thread.currentThread() + " EXIT........REC");
} catch (JMSException e) {
e.printStackTrace();
}
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
init();
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
System.err.println(Thread.currentThread() + " DurableSubscriber receive: " + ((TextMessage) m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
conn.start();
Thread.currentThread().sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
destory();
}
}
}