当前位置: 首页 > 工具软件 > FFMQ > 使用案例 >

可嵌入式JMS消息队列FFMQ

璩和璧
2023-12-01


经过几个开源项目比较,最终发现合适的项目FFMQ:http://sourceforge.net/projects/ffmq/,项目大小才600KB,支持JMS1.1规范
以下代码仅用于供测试参考,不具备生产环境下的严谨,具体FFMQ配置请看说明文档(下载包中有)

启动:

import java.io.FileInputStream;
import java.util.Properties;

import net.timewalker.ffmq3.listeners.ClientListener;
import net.timewalker.ffmq3.listeners.tcp.io.TcpListener;
import net.timewalker.ffmq3.local.FFMQEngine;
import net.timewalker.ffmq3.management.destination.definition.QueueDefinition;
import net.timewalker.ffmq3.management.destination.definition.TopicDefinition;
import net.timewalker.ffmq3.utils.Settings;

/**
 * Embedded FFMQ sample
 */
public class EmbeddedFFMQSample implements Runnable
{
    private FFMQEngine engine;
    
    public void run()
    {
        try
        {
            // Create engine settings
            Settings settings = createEngineSettings();
            
            // Create the engine itself
            engine = new FFMQEngine("myLocalEngineName", settings);
            // -> myLocalEngineName will be the engine name.
            // - It should be unique in a given JVM
            // - This is the name to be used by local clients to establish
            // an internal JVM connection (high performance)
            // Use the following URL for clients : vm://myLocalEngineName
            //
            
            // Deploy the engine
            System.out.println("Deploying engine : "+engine.getName());
            engine.deploy();
            // - The FFMQ engine is not functional until deployed.
            // - The deploy operation re-activates all persistent queues
            // and recovers them if the engine was not properly closed.
            // (May take some time for large queues)

            // Adding a TCP based client listener
            System.out.println("Starting listener ...");
            ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null);
            tcpListener.start();
            
            // This is how you can programmatically define a new queue
            if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo1"))
            {
                QueueDefinition queueDef = new QueueDefinition(settings);
                queueDef.setName("foo2");
                queueDef.setMaxNonPersistentMessages(0);
                queueDef.setOverflowToPersistent(false);
                queueDef.setPreAllocateFiles(true);
                queueDef.setTemporary(false);
                queueDef.setUseJournal(true);
                queueDef.setAutoExtendAmount(128);
                queueDef.setInitialBlockCount(32);
                queueDef.setMaxBlockCount(1024);
                queueDef.check();
                engine.createQueue(queueDef);
            }
            
            // You could also define a queue using some java Properties
            if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2"))
            {
                Properties queueProps = new Properties();
                queueProps.put("name", "foo2");
                queueProps.put("persistentStore.useJournal", "false");
                queueProps.put("memoryStore.maxMessages", "1000");
                QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps));
                engine.createQueue(queueDef2);
            }
            
            if(!engine.getDestinationDefinitionProvider().hasTopicDefinition("foox")) {
                TopicDefinition topicDef = new TopicDefinition(settings);
                topicDef.setName("foox");
                topicDef.setMaxNonPersistentMessages(0);
                topicDef.setOverflowToPersistent(false);
                topicDef.setPreAllocateFiles(true);
                topicDef.setTemporary(false);
                topicDef.setUseJournal(true);
                topicDef.check();
                engine.createTopic(topicDef);
            }
            
            // Run for some time
            System.out.println("Running ...");
            Thread.sleep(60*1000);
            
            // Stopping the listener
            System.out.println("Stopping listener ...");
            tcpListener.stop();
            
            // Undeploy the engine
            System.out.println("Undeploying engine ...");
            engine.undeploy();
            // - It is important to properly shutdown the engine 
            // before stopping the JVM to make sure current transactions 
            // are nicely completed and storages properly closed.
            
            System.out.println("Done.");
        }
        catch (Exception e)
        {
            // Oops
            e.printStackTrace();
        }
    }
    
    private Settings createEngineSettings()
    {
        // Various ways of creating engine settings
        
        // 1 - From a properties file
        Properties externalProperties = new Properties();
        try
        {
            FileInputStream in = new FileInputStream("D:\\ffmq3-distribution-3.0.5-dist\\conf\\ffmq-server.properties");
            externalProperties.load(in);
            in.close();
        }
        catch (Exception e)
        {
            throw new RuntimeException("Cannot load external properties",e);
        }
        Settings settings = new Settings(externalProperties);
        
        // 2 - Explicit Java code
// Settings settings = new Settings();
// 
// settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, ".");
// settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, ".");
// ...
        
        return settings;
    }
    
    public static void main(String[] args)
    {
        System.setProperty("FFMQ_BASE", "D:\\ffmq3-distribution-3.0.5-dist");
        
        new EmbeddedFFMQSample().run();
    }
}


模拟发送:

import java.util.Hashtable;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import net.timewalker.ffmq3.FFMQConstants;

public class Sender implements Runnable {

    public static void main(String[] args) throws Exception {
        new Thread(new Sender("queue/foo1", "1")).start();
        new Thread(new Sender("queue/foo2", "2")).start();
        Thread.sleep(10000);
        run = false;
        Thread.sleep(1000);
    }
    
    private static volatile boolean run = true;
    private String queueName;
    private String qtmId;

    private Sender(String queueName, String qtmId) {
        super();
        this.queueName = queueName;
        this.qtmId = qtmId;
    }

    @Override
    public void run() {
        try {
            // Create and initialize a JNDI context
            Hashtable<String,String> env = new Hashtable<>();
            env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
            env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
            Context context = new InitialContext(env);

            // Lookup a connection factory in the context
            ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);

            // Obtain a JMS connection from the factory
            Connection conn = connFactory.createConnection("test","test");
            conn.start();
            
            Destination dest1=(Queue) context.lookup(queueName);

            Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);

            
            Random rnd = new Random(System.currentTimeMillis());
            long ms = (long)rnd.nextFloat() * 10 * 1000;
            if(ms > 8000) {
                ms /= 2;
            } else if(ms < 1000) {
                ms = 1500;
            }
            
            int i = 1;
            
            MessageProducer p = session.createProducer(dest1);
            while (run) {
                TextMessage msg = session.createTextMessage();
                String t = "[" + qtmId + "] Hello " + queueName + " " + i++;
                System.out.println("sended..." + t);
                msg.setStringProperty("QTMID", qtmId);
                msg.setText(t);
                p.send(msg);
                Thread.sleep(ms);
            }
            p.close();
            session.close();
            
            conn.close();
            context.close();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }

}

模拟接收:

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

import net.timewalker.ffmq3.FFMQConstants;


public class Receiver implements Runnable {

    private static volatile boolean run = true;
    
    public static void main(String[] args) throws Exception {
        new Thread(new Receiver()).start();
        Thread.sleep(10000);
        run = false;
        Thread.sleep(2000);
    }
    
    private Connection conn;
    private Session session;
    private MessageConsumer consumer;
    
    private void init() throws Exception {
            // Create and initialize a JNDI context
            Hashtable<String,String> env = new Hashtable<>();
            env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
            env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
            Context context = new InitialContext(env);
            
            // Lookup a connection factory in the context
            ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
            Destination dest1=(Queue) context.lookup("queue/foo2");
            context.close();
            
            
            // Obtain a JMS connection from the factory
            conn = connFactory.createConnection("test", "test");
            
            conn.start();
            
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(dest1);
            
            System.err.println("INIT.........");
    }
    
    private void destory() {
        try {
            consumer.close();
            session.close();
            conn.stop();
            conn.close();
            System.err.println("EXIT........REC");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void run() {
            try {
                init();
                
//                consumer.setMessageListener(new MessageListener() {
//                    @Override
//                    public void onMessage(Message m) {
//                        try {
//                            System.err.println("receive: " + ((TextMessage) m).getText());
//                        } catch (JMSException e) {
//                            e.printStackTrace();
//                        }
//                    }
//                });
                while(run) {
//                    Thread.sleep(500);
                    Message m = consumer.receive(500);
                    if(m != null) {
                        System.err.println("receive: " + ((TextMessage) m).getText());
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                destory();
            }
        
    }

}

主题订阅:

package topic;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

import net.timewalker.ffmq3.FFMQConstants;

public class SubClient implements Runnable {

    private String topicName;
    private String qtmId;
    private TopicConnection conn;
    private TopicSession session;    
    private TopicSubscriber subscriber;
    
    public static void main(String[] args) throws Exception {
        for(int i = 1; i < 5; i++) {
            new Thread(new SubClient("topic/foox", String.valueOf(i))).start();
        }
        System.out.println(Thread.currentThread() + " EEXIT");
    }
    
    private SubClient(String topicName, String qtmId) {
        super();
        this.topicName = topicName;
        this.qtmId = qtmId;
    }

    private void init() throws Exception {
        // Create and initialize a JNDI context
        Hashtable<String,String> env = new Hashtable<>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
        env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
        Context context = new InitialContext(env);
        
        // Lookup a connection factory in the context
        TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
        Topic topic = (Topic) context.lookup(topicName);
        context.close();
        
        // Obtain a JMS connection from the factory
        conn = connFactory.createTopicConnection("test","test");
        
        session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        String selector = "(QTMID = '" + qtmId + "')";
        System.out.println("Selector: " + selector);
        subscriber = session.createSubscriber(topic, selector, false);
        
        System.err.println("INIT.........");
    }
    
    private void destory() {
        try {
            subscriber.close();
            session.close();
            conn.stop();
            conn.close();
            System.err.println(Thread.currentThread() + " Client EXIT........REC");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    @SuppressWarnings("static-access")
    @Override
    public void run() {
        try {
            init();
            
            subscriber.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message m) {
                    try {
                        System.err.println(Thread.currentThread() + " Client " + qtmId + " Subscriber receive: " + ((TextMessage) m).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            conn.start();
            
            Thread.currentThread().sleep(10000);
    
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            destory();
        }
    
    }

}

持久订阅:

package topic;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

import net.timewalker.ffmq3.FFMQConstants;

public class SubServer implements Runnable {

    private String topicName;
    
    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        new Thread(new SubServer("topic/foox")).start();
        System.out.println(Thread.currentThread() + " main exit");
    }

    private TopicConnection conn;
    private TopicSession session;    
    private TopicSubscriber subscriber;
    
    
    
    private SubServer(String topicName) {
        super();
        this.topicName = topicName;
    }

    private void init() throws Exception {
            // Create and initialize a JNDI context
            Hashtable<String,String> env = new Hashtable<>();
            env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
            env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
            Context context = new InitialContext(env);
            
            // Lookup a connection factory in the context
            TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
            Topic topic = (Topic) context.lookup(topicName);
            context.close();
            
            // Obtain a JMS connection from the factory
            conn = connFactory.createTopicConnection("test","test");
            conn.setClientID("SERVER");
            
            session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);            
            subscriber = session.createDurableSubscriber(topic, "DB");            
            System.out.println("INIT........." + subscriber);
    }
    
    private void destory() {
        try {
            subscriber.close();
            session.close();
            conn.stop();
            conn.close();
            System.err.println(Thread.currentThread() + " EXIT........REC");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    @SuppressWarnings("static-access")
    @Override
    public void run() {
        try {
            init();
            
            subscriber.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message m) {
                    try {
                        System.err.println(Thread.currentThread() + " DurableSubscriber receive: " + ((TextMessage) m).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            conn.start();
            
            Thread.currentThread().sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            destory();
        }
    
    }
}





 类似资料: