目前项目中在克服JGroups初期使用的困难之后,已经使用比较稳定了。感觉比较烦琐和容易出错的还是JGroups配置。
JGroups 适合使用场合
服务器集群cluster、多服务器通讯、服务器replication(复制)等,分布式cache缓存
JGroups 简介
JGroups是一个基于Java语言的提供可靠多播(组播)的开发工具包。在IP Multicast基础上提供可靠服务,也可以构建在TCP或者WAN上。主要是由Bela Ban开发,属于JBoss.org,在JBoss的网站也有一些相关文档。目前在 SourceForge上还是比较活跃,经常保持更新。
JGroups 配置
PING: 发现初始成员
MERGE2: 将网络层切分的包重新合并。
FD_SOCK: Failure Dectection 错误检测,基于TCP
FD:FailureDectection 错误检测,基于心跳
VERIFY_SUSPECT: 检查貌似失败的节点
pbcast.NAKACK: 应答,提供可靠传输
UNICAST: 可靠的UNICAST
pbcast.STABLE: 计算广播信息是否稳定
VIEW_SYNC: 定期广播view(成员名单)
pbcast.GMS: Group membership, 处理joins/leaves/crashes等
FC: 流量控制
FRAG2:Fragmentation layer,分包,将大的数据包分拆成适合网络层传输
以上一些是比较重要的配置,基本上不能少。如果要深入研究可以在 org.jgroups.protocols 里面查看源代码
JGroups使用例子, JGroups demo, Tim的hello world例子
Timreceiver.java
import org.jgroups.tests.perf.Receiver;
import org.jgroups.tests.perf.Transport;
import org.jgroups.util.Util;
public class TimReceiver implements Receiver {
privateTransport transport = null;
public staticvoid main(String[] args) {
TimReceiver t = new TimReceiver();
try {
int sendMsgCount = 5000;
int msgSize = 1000;
t.start();
t.sendMessages(sendMsgCount, msgSize);
System.out.println("########## Begin to recv...");
Thread.currentThread().join();
} catch(Exception e) {
e.printStackTrace();
} finally {
if (t != null) {
t.stop();
}
}
}
public voidstart()
throws Exception {
transport = (Transport) new TimTransport();
transport.create(null);
transport.setReceiver(this);
transport.start();
}
public voidstop() {
if(transport != null) {
transport.stop();
transport.destroy();
}
}
private intcount = 0;
public voidreceive(Object sender, byte[] data) {
System.out.print(".");
if(++count == 5000) {
System.out.println("\r\nRECV DONE.");
System.exit(0);
}
}
private voidsendMessages(int count, int msgSize)
throws Exception {
byte[]buf = new byte[msgSize];
for(int k = 0; k < msgSize; k++)
buf[k] = 'T';
System.out.println("-- sending " + count + " " +Util.printBytes(msgSize) + " messages");
for(int i = 0; i < count; i++) {
transport.send(null, buf);
}
System.out.println("#########send complete");
}
}
TimTransport.java
import java.util.Map;
import java.util.Properties;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.tests.perf.Receiver;
import org.jgroups.tests.perf.Transport;
public class TimTransport extends ReceiverAdapterimplements Transport{
privateJChannel channel = null;
privateString groupName = "TimDemo";
privateReceiver receiver = null;
StringPROTOCOL_STACK_UDP1 = "UDP(bind_addr=192.168.100.59";
StringPROTOCOL_STACK_UDP2 = ";mcast_port=8888";
StringPROTOCOL_STACK_UDP3 = ";mcast_addr=225.1.1.1";
StringPROTOCOL_STACK_UDP4 = ";tos=8;loopback=false;max_bundle_size=64000;"+
"use_incoming_packet_handler=true;use_outgoing_packet_handler=false;ip_ttl=2;enable_bundling=true):"
+"PING:MERGE2:FD_SOCK:FD:VERIFY_SUSPECT:"
+"pbcast.NAKACK(gc_lag=50;max_xmit_size=50000;use_mcast_xmit=false;"+
"retransmit_timeout=300,600,1200,2400,4800;discard_delivered_msgs=true):"
+"UNICAST:pbcast.STABLE:VIEW_SYNC:"
+"pbcast.GMS(print_local_addr=false;join_timeout=3000;" +
"join_retry_timeout=2000;" +
"shun=true;view_bundling=true):"
+"FC(max_credits=2000000;min_threshold=0.10):FRAG2(frag_size=50000)";
public ObjectgetLocalAddress() {
return channel != null ?channel.getLocalAddress() : null;
}
public voidstart() throws Exception {
channel.connect(groupName);
}
public voidstop() {
if(channel != null) {
channel.shutdown();
}
}
public voiddestroy() {
if(channel != null) {
channel.close();
channel = null;
}
}
public voidsetReceiver(Receiver r) {
this.receiver = r;
}
public MapdumpStats() {
returnchannel != null ? channel.dumpStats() : null;
}
public voidsend(Object destination, byte[] payload) throws Exception {
byte[]tmp = new byte[payload.length];
System.arraycopy(payload, 0, tmp, 0, payload.length);
Messagemsg = null;
msg =new Message((Address) destination, null, tmp);
if(channel != null) {
channel.send(msg);
}
}
public voidreceive(Message msg) {
Addresssender = msg.getSrc();
byte[]payload = msg.getBuffer();
if(receiver != null) {
try {
receiver.receive(sender, payload);
}catch (Throwable tt) {
tt.printStackTrace();
}
}
}
public voidcreate(Properties config) throws Exception {
StringPROTOCOL_STACK = PROTOCOL_STACK_UDP1 + PROTOCOL_STACK_UDP2 +PROTOCOL_STACK_UDP3 + PROTOCOL_STACK_UDP4;
channel= new JChannel(PROTOCOL_STACK);
channel.setReceiver(this);
}
public voidsend(Object destination, byte[] payload, boolean oob) throws Exception {
send(destination, payload);
}
}
JGroups 下载 download
http://www.jgroups.org/
转载于:https://blog.51cto.com/gomic/1373981