当前位置: 首页 > 工具软件 > JGroups-ME > 使用案例 >

JGroups简介和例子

柳宪
2023-12-01
JGroups是一个组播通信工具,它可以:
[list]
[*]创建和删除一个组
[*]加入和离开某个组
[*]管理组成员关系,当有新的成员进入或存在的成员离开的时候会通知组内其它成员
[*]侦测和移除出现故障的组成员
[*]发送单播消息(unicast,point-to-point)
[*]发送广播消息(multicast,point-to-multipoint)
[/list]
JGroups的强大之处在于它有一个很灵活的协议栈,可以根据你的需要,随意的添加或删除某些功能。比如说,刚开始你使用IP广播发送你的消息,过了一会程序开始要求无损的消息传输,你可以添加NAKACK协议,它能确保接收方一定能收到你发送的消息。但是此时接收方收到的消息的顺序是不固定的,为了让接收顺序和发送顺序保持一致,你可以选择添加FIFO协议来确保一对收发者之间发送和接收的顺序。如果要确保组里所有成员的收发顺序,你可以添加TOTAL协议。再接下来,你可以添加GMS和FLUSH协议来维护组成员间的关系;FD协议可以进行故障检测;STATE_TRANSFER协议可以让新加入的组成员从已存在的成员中获取一致的状态;最后你还可以使用CRYPT协议来加密你发送的消息。

下面开始演示一个聊天组的程序。我们建立一个聊天组,分别发送单播消息和广播消息,当组成员发生变化的时候,所有组成员自动获得新的组成员视图,每当聊天组中加入一个新的成员的时候,新成员先和已存在的组成员进行状态同步(获取聊天记录)。

public class GroupChat extends ExtendedReceiverAdapter{

private JChannel channel;
private List<String> msgList = new ArrayList<String>(); //模拟状态对象,保存的是本节点的收到的消息

/**
* 在有节点向本节点请求状态的时候被调用
*/
@Override
public byte[] getState() {
byte[] state = null;
synchronized(msgList){
try {
state = Util.objectToByteBuffer(msgList);
} catch (Exception e) {
e.printStackTrace();
}
return state;
}
}

/**
* 在其他节点返回状态给本节点的时候被调用
*/
@Override
public void setState(byte[] state) {
synchronized(msgList){
try {
List<String> tmpList = (List<String>)Util.objectFromByteBuffer(state);
msgList.clear();
msgList.addAll(tmpList);
System.err.println("===receive state:[");
for(String msg : msgList){
System.out.println(msg);
}
System.out.println("]");
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 当有消息进来的时候被调用
*/
@Override
public void receive(Message msg) {
System.out.println(">>>new message receive from "+msg.getSrc()+":"+msg.getObject());
msgList.add(msg.getSrc()+" send : "+ msg.getObject());
}

/**
* 当组成员发生变化的时候被调用
*/
@Override
public void viewAccepted(View new_view) {
System.out.println("***new view receiver:"+new_view);
}

public void start() throws ChannelException{
//打开channel并指定配置文件
channel = new JChannel("udp.xml");
//指定组名,就可以创建或连接到广播组
channel.connect("ChatGroup");
//注册回调接口,使用"推"模式来接受广播信息
channel.setReceiver(this);
//查看当前组成员
System.out.println("---current view:"+channel.getView());
//状态同步,第一个参数为null则向协调者节点获取信息
channel.getState(null, 1000);
}

public void close(){
channel.close();
}

public void loopSendMessage(){
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try {
while(true){
System.out.println("please input a string");
String line = br.readLine();
System.out.println("you put:"+line);
if(line.equals("exit")){
break;
}else{
String[] array = line.split(",");

Address des = null; //接收方地址,为null代表发送广播消息
Address src = null; //发送方地址,为null代表自己的地址
String msg = line; //发送内容

if(array.length == 3){
des = new IpAddress(Integer.parseInt(array[0]));
src = new IpAddress(Integer.parseInt(array[1]));
msg = array[2];
}else if(array.length == 2){
des = new IpAddress(Integer.parseInt(array[0]));
msg = array[1];
}

Message message = new Message(des, src, msg);
//发送消息
channel.send(message);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (ChannelNotConnectedException e) {
e.printStackTrace();
} catch (ChannelClosedException e) {
e.printStackTrace();
}
}


public static void main(String[] args) {
GroupChat chat = new GroupChat();
try {
chat.start();
chat.loopSendMessage();
chat.close();
} catch (ChannelException e) {
e.printStackTrace();
}

}

}

可以看到JGroups提供的API屏蔽了底层的通信机制,对于开发人员来说是完全透明的,要关注的只是消息的接受处理就可以了。
对应的协议栈配置:

<config>
<UDP
mcast_group_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
mcast_port="${jgroups.udp.mcast_port:45588}"
tos="8"
ucast_recv_buf_size="20000000"
ucast_send_buf_size="640000"
mcast_recv_buf_size="25000000"
mcast_send_buf_size="640000"
loopback="false"
discard_incompatible_packets="true"
max_bundle_size="64000"
max_bundle_timeout="30"
use_incoming_packet_handler="true"
ip_ttl="${jgroups.udp.ip_ttl:2}"
enable_bundling="true"
enable_diagnostics="true"
thread_naming_pattern="cl"

use_concurrent_stack="true"

thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="25"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="Run"

oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>

<PING timeout="2000"
num_initial_members="3"/>
<MERGE2 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD timeout="10000" max_tries="5" shun="true"/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK max_xmit_size="60000"
use_mcast_xmit="false" gc_lag="0"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true"/>
<UNICAST timeout="300,600,1200,2400,3600"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<VIEW_SYNC avg_send_interval="60000" />
<pbcast.GMS print_local_addr="true" join_timeout="3000"
join_retry_timeout="2000" shun="false"
view_bundling="true"/>
<FC max_credits="20000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000" />
<!--pbcast.STREAMING_STATE_TRANSFER /-->
<pbcast.STATE_TRANSFER />
<!-- pbcast.FLUSH /-->
</config>
 类似资料: