JeroMQ的相关教程可以参考http://blog.csdn.net/kobejayandy/article/category/1628047/2以上的教程中几种模式都有讲解,这里不再复述。目前工作中有这样一个需求,Pub端发送消息,服务器接收到消息并Pub给Sub端,然后再回复Pub端。这里就不能用JeroMQ中自带的proxy,因为需要一个回复的过程,不过可以参考proxy的实现思想,Pub端与服务器间通过Request/Response模式进行连接,服务器与Sub端间通过Publish/Subscribe模式进行连接,服务器在将消息pub给Sub端后再给Pub端回复。
首先是最简单的Sub端,只要接收就好了,和简单的Pub/Sub模式没有区别。
import org.zeromq.ZMQ;
public class Subscriber {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://127.0.0.1:5001");
subscriber.subscribe("Topic".getBytes());
while (!Thread.currentThread().isInterrupted()) {
byte[] message = subscriber.recv();
System.out.println("receive : " + new String(message));
}
subscriber.close();
context.term();
}
}
接下来是Pub端,虽然叫做Pub端,但实际用的是REQ类型的Socket
import org.zeromq.ZMQ;
public class Pub {
public static void main(String[] args) {
int count = 0;
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
socket.connect("tcp://127.0.0.1:5000");
String request = "Topic"+"hello";
while(!Thread.currentThread ().isInterrupted ()){
socket.send(request.getBytes());
byte[] response = socket.recv();
System.out.println("接收到第:"+(++count)+"条消息,内容为:" + new String(response));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
最后是服务器
package com.aurora.zhj.req;
import org.zeromq.ZMQ;
public class Server {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
socket.bind ("tcp://127.0.0.1:5000");
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://127.0.0.1:5001");
while(!Thread.currentThread ().isInterrupted ()){
byte[] request = socket.recv();
String message = new String(request);
publisher.send(message.getBytes());
String response = "服务器接收成功";
socket.send(response.getBytes());
}
//publisher.close();
//context.term();
}
}
实际上,想要实现Pub/Sub模式,Pub端和Sub端直接连就好了,这么做只是为了在这之间增加一层服务器,后续要实现服务器的高可用,保证消息不漏发、不重发。