当前位置: 首页 > 工具软件 > JeroMQ > 使用案例 >

JeroMQ简单应用

陆子石
2023-12-01

JeroMQ的相关教程可以参考http://blog.csdn.net/kobejayandy/article/category/1628047/2以上的教程中几种模式都有讲解,这里不再复述。目前工作中有这样一个需求,Pub端发送消息,服务器接收到消息并Pub给Sub端,然后再回复Pub端。这里就不能用JeroMQ中自带的proxy,因为需要一个回复的过程,不过可以参考proxy的实现思想,Pub端与服务器间通过Request/Response模式进行连接,服务器与Sub端间通过Publish/Subscribe模式进行连接,服务器在将消息pub给Sub端后再给Pub端回复。

首先是最简单的Sub端,只要接收就好了,和简单的Pub/Sub模式没有区别。

import org.zeromq.ZMQ;

public class Subscriber {
	public static void main(String args[]) {
		ZMQ.Context context = ZMQ.context(1);
		ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
		subscriber.connect("tcp://127.0.0.1:5001");
		subscriber.subscribe("Topic".getBytes());
		while (!Thread.currentThread().isInterrupted()) {
			byte[] message = subscriber.recv();
			System.out.println("receive : " + new String(message));
		}
		subscriber.close();
		context.term();
	}
}


接下来是Pub端,虽然叫做Pub端,但实际用的是REQ类型的Socket

import org.zeromq.ZMQ;

public class Pub {
	public static void main(String[] args) {
		int count = 0;
		ZMQ.Context context = ZMQ.context(1);
		ZMQ.Socket socket = context.socket(ZMQ.REQ);
		socket.connect("tcp://127.0.0.1:5000");
		String request = "Topic"+"hello";	
		while(!Thread.currentThread ().isInterrupted ()){
			socket.send(request.getBytes());
			byte[] response = socket.recv();
			System.out.println("接收到第:"+(++count)+"条消息,内容为:" + new String(response));
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
最后是服务器

package com.aurora.zhj.req;
import org.zeromq.ZMQ;


public class Server {

	public static void main(String[] args) {
		ZMQ.Context context = ZMQ.context(1);
		ZMQ.Socket socket = context.socket(ZMQ.REP);
		socket.bind ("tcp://127.0.0.1:5000");
		ZMQ.Socket publisher = context.socket(ZMQ.PUB);
                publisher.bind("tcp://127.0.0.1:5001");
		while(!Thread.currentThread ().isInterrupted ()){
			byte[] request = socket.recv();
			String message = new String(request);
			publisher.send(message.getBytes());
			String response = "服务器接收成功";  
			socket.send(response.getBytes());
		}
		//publisher.close();
		//context.term(); 
	}

}
实际上,想要实现Pub/Sub模式,Pub端和Sub端直接连就好了,这么做只是为了在这之间增加一层服务器,后续要实现服务器的高可用,保证消息不漏发、不重发。

 类似资料: