网上关于IBM这个消息队列中间件的资料相对比较少,C#相关的资料就更少了,最近因为要对接这个队列中间件,花了不少功夫去搜索、整理各种资料,遇到很多问题,因此记录下来。
1、基于 amqmdnet.dll 进行开发,这个是官方提供的DLL,安装了IBM WebSphere MQ后在安装目录可以找到(C:\Program Files\IBM\WebSphere MQ\bin)
2、基于 MmqiNetLite.dll 开发,这是一个开源组件,地址:https://github.com/fglaeser/mmqinet,这个项目代码有部分未完善,原作者也是好几年没更新,但是基础功能可以使用,本文代码主要基于此编写
源码如下:
public class IBMWMQConfig { /// <summary> /// MQ主机地址 /// </summary> private const string CONNECTION_HOST = ""; /// <summary> /// 通讯端口 /// </summary> private const int CONNECTION_PORT = 4421; /// <summary> /// CLIENT_ID /// </summary> private const string CLIENT_ID = ""; /// <summary> /// 通道名称 /// </summary> public const string CHANNEL = "SYSTEM.ADMIN.SVRCONN"; /// <summary> /// 队列管理器名称 /// </summary> public const string QUEUE_MGR_NAME = "PHIBHUBGW1"; /// <summary> /// 订阅主题持久化标识,{0}标识具体业务 /// </summary> public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ"; /// <summary> /// 主题名称模板,{0}标识具体业务 /// </summary> public static readonly string TOPIC_TEMPLATE = "{0}.REQ"; /// <summary> /// IBM.WMQ连接字符串 /// </summary> public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT); }
订阅消息:
/// <summary> /// 订阅主题 /// </summary> /// <param name="business"></param> /// <returns></returns> private string SubTopic(string business) { string message = string.Empty; try { using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO)) { MQSubscription subs = new MQSubscription(mqmgr); if (mqmgr.IsConnected) { int option = MQC.MQSO_CREATE + MQC.MQSO_RESUME + MQC.MQSO_NON_DURABLE + MQC.MQSO_MANAGED + MQC.MQSO_FAIL_IF_QUIESCING; string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business); string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business); subs.Subscribe(subName, option, topicName); MQMessage incoming = new MQMessage(); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.WaitInterval = 10 * 1000;//MQC.MQWI_UNLIMITED; gmo.Options |= MQC.MQGMO_WAIT; gmo.Options |= MQC.MQGMO_SYNCPOINT; subs.Get(incoming, gmo); message = incoming.ReadAll(); //subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE); } } } catch (MQException e) { message = e.Reason.ToString(); } return message; }
向队列推送一条消息:
/// <summary> /// 向消息队列推送一条消息 /// </summary> /// <param name="msg">消息内容</param> /// <param name="queueName">队列名称</param> public void PushMsgToQueue(string msg, string queueName) { using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO)) using (var q = new MQQueue(mqmgr, queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING)) { var outgoing = new MQMessage() { CharacterSet = MQC.CODESET_UTF, Encoding = MQC.MQENC_NORMAL }; outgoing.WriteString(msg); q.Put(outgoing, new MQPutMessageOptions()); } } /// <summary> /// 向消息队列推送一条消息 /// </summary> /// <param name="msg">消息内容</param> /// <param name="queueName">队列名称</param> public void PushMsgToQueue1(string msg, string queueName) { using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO)) { if (mqmgr.IsConnected) { var outgoing = new MQMessage() { CharacterSet = MQC.CODESET_UTF, Encoding = MQC.MQENC_NORMAL }; outgoing.WriteString(msg); var od = new MQObjectDescriptor { ObjectType = MQC.MQOT_Q, ObjectName = queueName }; mqmgr.Put1(od, outgoing, new MQPutMessageOptions()); } } }