当前位置: 首页 > 工具软件 > JGroups-ME > 使用案例 >

JGroups学习(一)

耿学义
2023-12-01

近期要在项目中做集群的节点间内存数据同步,选择用JGroups来实现该功能。

JGroups的官网地址:http://www.jgroups.org

JGroups的源码地址:https://github.com/belaban/JGroups


一.  JGroups简介


JGroups是一个基于Java语言的提供可靠多播(组播)的开发工具包。在IP Multicast基础上提供可靠服务,也可以构建在TCP或者WAN上。

主要提供的功能如下:

  • 集群的创建和删除,集群节点可分布在局域网或广域网上
  • 节点管理
  • 点对点消息可靠有序传输
  • 消息可靠,有序组播

二. JGroups的简单使用


JGroups 使用 JChannel 作为连接到组、发送和接收消息的主 API,并可通过 JChannel 注册用来处理这些事件(成员加入、退出和发送消息)的侦听器ReceiverAdapter。
Messages 是发送的消息,它包含一个字节缓冲区、发送和接受者地址。
Addresses 是 org.jgroups.Address 的子类,通常包含一个 IP 地址和端口。
组中的节点列表被成为 View,每个实例包含相同的 View,可通过 View.getMembers() 来获取所有节点地址列表。节点Node只能在加入组后才能发送和接收消息。当一个节点要离开组时,JChannel.disconnect() 或者 JChannel.close() 方法会被调用,后者实际上会判断当连接还保持时调用了 disconnect() 方法来关闭通道。

代码如下:
<public class Receiver extends ReceiverAdapter {

	private List<String> msgStore;

	/**
	 * viewAccepted() 回调函数会在新成员加入组中,或者已有成员崩溃了或离开组时被调用
	 * @param state
	 */
	public Receiver(List<String> msgStore) {
		this.msgStore = msgStore;
	}

	public void receive(Message msg) {
		String msgStr = msg.getSrc() + ": " + msg.getObject();
		System.out.println("接收到消息 " + msgStr);
		synchronized (msgStore) {
			msgStore.add(msgStr);
		}
	}

	public void viewAccepted(View new_view) {
		System.out.println("**view " + new_view);
	}
}

public class Producer1 {

	JChannel channel;
	
    String user_name=System.getProperty("user.name", "n/a");
    
    final List<String> state=new LinkedList<String>();

    public void getState(OutputStream output) throws Exception {
        synchronized(state) {
            Util.objectToStream(state, new DataOutputStream(output));
        }
    }

    @SuppressWarnings("unchecked")
    public void setState(InputStream input) throws Exception {
        List<String> list=(List<String>)Util.objectFromStream(new DataInputStream(input));
        synchronized(state) {
            state.clear();
            state.addAll(list);
        }
        System.out.println("received state (" + list.size() + " messages in chat history):");
        for(String str: list) {
            System.out.println(str);
        }
    }

    private void start() throws Exception {
    	System.setProperty("java.net.preferIPv4Stack", "true");//强制使用IPv4
    	//参数里指定Channel使用的协议栈,如果是空的,则使用默认的协议栈,位于JGroups包里的udp.xml。参数可以是一个以冒号分隔的字符串,或是一个XML文件,在XML文件里定义协议栈。 
        channel=new JChannel();
        channel.setReceiver(new Receiver(state));
        //创建完之后,Channel现在处于未连接状态,需要通过connect方法将之连接到组,使其处于连接状态,它的参数就是要加入组的组名字,如果加入的组之前没有任何成员,则会自动创建一个组
        channel.connect("multicast");
        channel.getState(null, 10000);
        eventLoop();
        //channel.close();
    }

    private void eventLoop() throws Exception   {
    	Message[] msg = new Message[TimeTool.count];
    	for(int i=0; i<TimeTool.count; i++){
    		msg[i]=new Message(null, null, (i+1));//将消息发送到所有成员处
    	}
    	
    	TimeTool.start_time = System.nanoTime();
    	for(int i=0; i<TimeTool.count; i++){
    		channel.send(msg[i]);
    	}
    	TimeTool.end_time = System.nanoTime();
    	TimeTool.calcul();
    }
    
    public static void main(String[] args) throws Exception {
        new Producer1().start();
        System.in.read();
    }
}

public class TimeTool {

	public static int count = 100;
	
	public static long start_time = 0;
	
	public static long end_time = 0;
	
	public static void calcul(){
		System.out.println("发送"+count+"条消息耗费时间:"+(end_time - start_time)/1000000+"ms");
	}

}

 类似资料: