当前位置: 首页 > 工具软件 > Open-MQ > 使用案例 >

安装部署(三) openMQ安装和使用

冉子石
2023-12-01
openMQ安装和使用


windows 10 x64
jdk 1.7


1 下载
https://mq.java.net/downloads/index.html
openmq5_1-binary-windows.zip


2 解压
到D:\XSetups\openmq5_1-binary-windows
{mq_home}=D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1


3 配置
{mq_home}\etc\mq\imqenv.conf
REM Default location of J2SE
set IMQ_DEFAULT_JAVAHOME=E:\wjb-workspace\Java\jdk1.7.0_17
REM
REM Default location of MQ's "var" filesystem
REM set IMQ_DEFAULT_VARHOME=D:\XSetups\openmq5_1-binary-windows\var


4 启动broker
cmd
{mq_home}\mq\bin\imqbrokerd.exe
若提示日志禁用,不管它,先测试


5 检查broker是否启动
再开一个cmd
{mq_home}\mq\bin\imqcmd.exe query bkr -u admin
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query bkr -u admin
口令:admin
查询指定的中介:


-----------------
主机           主端口
-----------------
localhost    7676


版本                  5.1
实例名称                imqbroker
中介 ID
主端口                 7676
已嵌入中介               false
实例配置/数据根目录          D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\var\mq


系统中当前的消息数           0
系统中当前的消息大小 (字节)     0


停用消息队列中的当前消息数量      0
停用消息队列中的当前消息字节总数    0


记录停用消息              false
截断停用消息队列中的消息主体      false


系统中的最大消息数           无限制 (-1)
系统中的最大消息大小          无限制 (-1)
最大消息大小              70m


自动创建队列              true
自动创建主题              true
自动创建队列的最大活动使用者数     无限制 (-1)
自动创建队列的最大备份使用者数     0
自动创建的目标 useDMQ      true


集群 ID
集群具有高可用性            false
集群中介列表 (处于活动状态)     mq://10.1.1.19:7676/
集群中介列表 (处于已配置状态)
集群主中介
集群 URL


日志级别                INFO
日志更新间隔 (秒)
日志更新大小 (字节)         268435456


成功查询中介。


6 编写Producer
import javax.jms.*;
public class Sender {
    public static void main(String[] args) {
    try {
        ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();
        ((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
        Connection con = cf.createConnection();
        Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination dest = sn.createQueue(args[0]);
        MessageProducer mp = sn.createProducer(dest);
        TextMessage tm = sn.createTextMessage();
        tm.setText("A test message");
        mp.send(tm);
        System.out.println("Message sent:");
        sn.close();
        con.close();
        } catch (Exception e) { e.printStackTrace(); }
    }
}


6.1 执行
注意:jms.jar和imq.jar在如下目录,拷过来
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\lib
javac -cp jms.jar;imq.jar;. Sender.java
java -cp jms.jar;imq.jar;. Sender AQueue


F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. Sender AQueue
Message sent:


6.2 验证message
imqcmd query dst -n AQueue -t q -u admin
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query dst -n AQueue -t q -u admin
口令:
查询目标, 其中:


--------------
目标名称      目标类型
--------------
AQueue    队列


在指定的中介上:


-----------------
主机           主端口
-----------------
localhost    7676


目标名称               AQueue
目标类型               队列
目标状态               RUNNING
是以管理方式创建的          false


当前的消息数
    实际             1
    远程             0
    在延迟传送中         0
    在事务处理中保留       0
当前消息字节
    实际             146
    远程             0
    在延迟传送中         0
    在事务处理中保留       0
当前的生成器数            0
当前的活动使用者数          0
当前的备份使用者数          0


最大消息数              100000
消息的最大总计大小 (字节)     10737418240
每条消息的最大大小 (字节)     10485760
最大生成器数             100
最大活动使用者数           无限制 (-1)
最大备份使用者数           0


限制性能               REJECT_NEWEST
使用者流限制             1000
是本地目标              false
首选本地传送             false
使用停用消息队列           true
已启用 XML 方案验证       false
XML 方案 URI 列表      -
在失败时重新加载 XML 方案    false


成功查询目标。


【当前消息数1】


7 编写Consumer
import javax.jms.*;
public class SynReceiver {
    public static void main(String[] args) {
        try {
            ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();
            ((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
            Connection con = cf.createConnection();
            Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination dest = sn.createQueue(args[0]);
            MessageConsumer mc = sn.createConsumer(dest);
            con.start();
            TextMessage msg = (TextMessage) mc.receive();
            System.out.println("Received message: "+msg.getText());
            sn.close();
            con.close();
        } catch (Exception e) { e.printStackTrace(); }
    }
}


7.1 执行
javac -cp jms.jar;imq.jar;. SynReceiver.java
java -cp jms.jar;imq.jar;. SynReceiver AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. SynReceiver AQueue
Received message: A test message


7.2 验证message
imqcmd query dst -n AQueue -t q -u admin


D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query dst -n AQueue -t q -u admin
口令:
查询目标, 其中:


--------------
目标名称      目标类型
--------------
AQueue    队列


在指定的中介上:


-----------------
主机           主端口
-----------------
localhost    7676


目标名称               AQueue
目标类型               队列
目标状态               RUNNING
是以管理方式创建的          false


当前的消息数
    实际             0
    远程             0
    在延迟传送中         0
    在事务处理中保留       0
当前消息字节
    实际             0
    远程             0
    在延迟传送中         0
    在事务处理中保留       0
当前的生成器数            0
当前的活动使用者数          0
当前的备份使用者数          0


最大消息数              100000
消息的最大总计大小 (字节)     10737418240
每条消息的最大大小 (字节)     10485760
最大生成器数             100
最大活动使用者数           无限制 (-1)
最大备份使用者数           0


限制性能               REJECT_NEWEST
使用者流限制             1000
是本地目标              false
首选本地传送             false
使用停用消息队列           true
已启用 XML 方案验证       false
XML 方案 URI 列表      -
在失败时重新加载 XML 方案    false


成功查询目标。


【当前消息数变成了0】




8 编写Consumer 异步
import javax.jms.*;
public class AsynReceiver {
    public static void main(String[] args) {
        try {
            ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();
            ((com.sun.messaging.ConnectionFactory)conFactory).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
            Connection con = conFactory.createConnection();
            Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination dest = sn.createQueue(args[0]);
            MessageConsumer mc = sn.createConsumer(dest);
            con.start();
            mc.setMessageListener(new AListener());
            System.out.println("Continuing its own work");
        } catch (Exception e) { e.printStackTrace(); }
    }
}
class AListener implements MessageListener {
    public void onMessage(Message msg) {
        TextMessage tm = (TextMessage) msg;
        try {
            System.out.println("Received: " + tm.getText());
        } catch (Exception e) { e.printStackTrace(); }
    }
}


8.1 执行
javac -cp jms.jar;imq.jar;. AsynReceiver.java
java -cp jms.jar;imq.jar;. AsynReceiver AQueue


F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. Sender AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. SynReceiver AQueue
Received message: A test message


8.2 验证message
imqcmd query dst -n AQueue -t q -u admin


9 PUB/SUB模式
参考《Advanced Java Programming》25章12节
 类似资料: