安装部署(三) openMQ安装和使用
冉子石
2023-12-01
openMQ安装和使用
windows 10 x64
jdk 1.7
1 下载
https://mq.java.net/downloads/index.html
openmq5_1-binary-windows.zip
2 解压
到D:\XSetups\openmq5_1-binary-windows
{mq_home}=D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1
3 配置
{mq_home}\etc\mq\imqenv.conf
REM Default location of J2SE
set IMQ_DEFAULT_JAVAHOME=E:\wjb-workspace\Java\jdk1.7.0_17
REM
REM Default location of MQ's "var" filesystem
REM set IMQ_DEFAULT_VARHOME=D:\XSetups\openmq5_1-binary-windows\var
4 启动broker
cmd
{mq_home}\mq\bin\imqbrokerd.exe
若提示日志禁用,不管它,先测试
5 检查broker是否启动
再开一个cmd
{mq_home}\mq\bin\imqcmd.exe query bkr -u admin
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query bkr -u admin
口令:admin
查询指定的中介:
-----------------
主机 主端口
-----------------
localhost 7676
版本 5.1
实例名称 imqbroker
中介 ID
主端口 7676
已嵌入中介 false
实例配置/数据根目录 D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\var\mq
系统中当前的消息数 0
系统中当前的消息大小 (字节) 0
停用消息队列中的当前消息数量 0
停用消息队列中的当前消息字节总数 0
记录停用消息 false
截断停用消息队列中的消息主体 false
系统中的最大消息数 无限制 (-1)
系统中的最大消息大小 无限制 (-1)
最大消息大小 70m
自动创建队列 true
自动创建主题 true
自动创建队列的最大活动使用者数 无限制 (-1)
自动创建队列的最大备份使用者数 0
自动创建的目标 useDMQ true
集群 ID
集群具有高可用性 false
集群中介列表 (处于活动状态) mq://10.1.1.19:7676/
集群中介列表 (处于已配置状态)
集群主中介
集群 URL
日志级别 INFO
日志更新间隔 (秒)
日志更新大小 (字节) 268435456
成功查询中介。
6 编写Producer
import javax.jms.*;
public class Sender {
public static void main(String[] args) {
try {
ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();
((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
Connection con = cf.createConnection();
Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = sn.createQueue(args[0]);
MessageProducer mp = sn.createProducer(dest);
TextMessage tm = sn.createTextMessage();
tm.setText("A test message");
mp.send(tm);
System.out.println("Message sent:");
sn.close();
con.close();
} catch (Exception e) { e.printStackTrace(); }
}
}
6.1 执行
注意:jms.jar和imq.jar在如下目录,拷过来
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\lib
javac -cp jms.jar;imq.jar;. Sender.java
java -cp jms.jar;imq.jar;. Sender AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. Sender AQueue
Message sent:
6.2 验证message
imqcmd query dst -n AQueue -t q -u admin
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query dst -n AQueue -t q -u admin
口令:
查询目标, 其中:
--------------
目标名称 目标类型
--------------
AQueue 队列
在指定的中介上:
-----------------
主机 主端口
-----------------
localhost 7676
目标名称 AQueue
目标类型 队列
目标状态 RUNNING
是以管理方式创建的 false
当前的消息数
实际 1
远程 0
在延迟传送中 0
在事务处理中保留 0
当前消息字节
实际 146
远程 0
在延迟传送中 0
在事务处理中保留 0
当前的生成器数 0
当前的活动使用者数 0
当前的备份使用者数 0
最大消息数 100000
消息的最大总计大小 (字节) 10737418240
每条消息的最大大小 (字节) 10485760
最大生成器数 100
最大活动使用者数 无限制 (-1)
最大备份使用者数 0
限制性能 REJECT_NEWEST
使用者流限制 1000
是本地目标 false
首选本地传送 false
使用停用消息队列 true
已启用 XML 方案验证 false
XML 方案 URI 列表 -
在失败时重新加载 XML 方案 false
成功查询目标。
【当前消息数1】
7 编写Consumer
import javax.jms.*;
public class SynReceiver {
public static void main(String[] args) {
try {
ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();
((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
Connection con = cf.createConnection();
Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = sn.createQueue(args[0]);
MessageConsumer mc = sn.createConsumer(dest);
con.start();
TextMessage msg = (TextMessage) mc.receive();
System.out.println("Received message: "+msg.getText());
sn.close();
con.close();
} catch (Exception e) { e.printStackTrace(); }
}
}
7.1 执行
javac -cp jms.jar;imq.jar;. SynReceiver.java
java -cp jms.jar;imq.jar;. SynReceiver AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. SynReceiver AQueue
Received message: A test message
7.2 验证message
imqcmd query dst -n AQueue -t q -u admin
D:\XSetups\openmq5_1-binary-windows\MessageQueue5.1\mq\bin>imqcmd query dst -n AQueue -t q -u admin
口令:
查询目标, 其中:
--------------
目标名称 目标类型
--------------
AQueue 队列
在指定的中介上:
-----------------
主机 主端口
-----------------
localhost 7676
目标名称 AQueue
目标类型 队列
目标状态 RUNNING
是以管理方式创建的 false
当前的消息数
实际 0
远程 0
在延迟传送中 0
在事务处理中保留 0
当前消息字节
实际 0
远程 0
在延迟传送中 0
在事务处理中保留 0
当前的生成器数 0
当前的活动使用者数 0
当前的备份使用者数 0
最大消息数 100000
消息的最大总计大小 (字节) 10737418240
每条消息的最大大小 (字节) 10485760
最大生成器数 100
最大活动使用者数 无限制 (-1)
最大备份使用者数 0
限制性能 REJECT_NEWEST
使用者流限制 1000
是本地目标 false
首选本地传送 false
使用停用消息队列 true
已启用 XML 方案验证 false
XML 方案 URI 列表 -
在失败时重新加载 XML 方案 false
成功查询目标。
【当前消息数变成了0】
8 编写Consumer 异步
import javax.jms.*;
public class AsynReceiver {
public static void main(String[] args) {
try {
ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();
((com.sun.messaging.ConnectionFactory)conFactory).setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList,"localhost:7676");
Connection con = conFactory.createConnection();
Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = sn.createQueue(args[0]);
MessageConsumer mc = sn.createConsumer(dest);
con.start();
mc.setMessageListener(new AListener());
System.out.println("Continuing its own work");
} catch (Exception e) { e.printStackTrace(); }
}
}
class AListener implements MessageListener {
public void onMessage(Message msg) {
TextMessage tm = (TextMessage) msg;
try {
System.out.println("Received: " + tm.getText());
} catch (Exception e) { e.printStackTrace(); }
}
}
8.1 执行
javac -cp jms.jar;imq.jar;. AsynReceiver.java
java -cp jms.jar;imq.jar;. AsynReceiver AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. Sender AQueue
F:\XWorks\codes\test\javatest\openMQ>java -cp jms.jar;imq.jar;. SynReceiver AQueue
Received message: A test message
8.2 验证message
imqcmd query dst -n AQueue -t q -u admin
9 PUB/SUB模式
参考《Advanced Java Programming》25章12节