近期要在项目中做集群的节点间内存数据同步,选择用JGroups来实现该功能。
JGroups的官网地址:http://www.jgroups.org
JGroups的源码地址:https://github.com/belaban/JGroups
JGroups是一个基于Java语言的提供可靠多播(组播)的开发工具包。在IP Multicast基础上提供可靠服务,也可以构建在TCP或者WAN上。
主要提供的功能如下:
<public class Receiver extends ReceiverAdapter {
private List<String> msgStore;
/**
* viewAccepted() 回调函数会在新成员加入组中,或者已有成员崩溃了或离开组时被调用
* @param state
*/
public Receiver(List<String> msgStore) {
this.msgStore = msgStore;
}
public void receive(Message msg) {
String msgStr = msg.getSrc() + ": " + msg.getObject();
System.out.println("接收到消息 " + msgStr);
synchronized (msgStore) {
msgStore.add(msgStr);
}
}
public void viewAccepted(View new_view) {
System.out.println("**view " + new_view);
}
}
public class Producer1 {
JChannel channel;
String user_name=System.getProperty("user.name", "n/a");
final List<String> state=new LinkedList<String>();
public void getState(OutputStream output) throws Exception {
synchronized(state) {
Util.objectToStream(state, new DataOutputStream(output));
}
}
@SuppressWarnings("unchecked")
public void setState(InputStream input) throws Exception {
List<String> list=(List<String>)Util.objectFromStream(new DataInputStream(input));
synchronized(state) {
state.clear();
state.addAll(list);
}
System.out.println("received state (" + list.size() + " messages in chat history):");
for(String str: list) {
System.out.println(str);
}
}
private void start() throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true");//强制使用IPv4
//参数里指定Channel使用的协议栈,如果是空的,则使用默认的协议栈,位于JGroups包里的udp.xml。参数可以是一个以冒号分隔的字符串,或是一个XML文件,在XML文件里定义协议栈。
channel=new JChannel();
channel.setReceiver(new Receiver(state));
//创建完之后,Channel现在处于未连接状态,需要通过connect方法将之连接到组,使其处于连接状态,它的参数就是要加入组的组名字,如果加入的组之前没有任何成员,则会自动创建一个组
channel.connect("multicast");
channel.getState(null, 10000);
eventLoop();
//channel.close();
}
private void eventLoop() throws Exception {
Message[] msg = new Message[TimeTool.count];
for(int i=0; i<TimeTool.count; i++){
msg[i]=new Message(null, null, (i+1));//将消息发送到所有成员处
}
TimeTool.start_time = System.nanoTime();
for(int i=0; i<TimeTool.count; i++){
channel.send(msg[i]);
}
TimeTool.end_time = System.nanoTime();
TimeTool.calcul();
}
public static void main(String[] args) throws Exception {
new Producer1().start();
System.in.read();
}
}
public class TimeTool {
public static int count = 100;
public static long start_time = 0;
public static long end_time = 0;
public static void calcul(){
System.out.println("发送"+count+"条消息耗费时间:"+(end_time - start_time)/1000000+"ms");
}
}