目录
Zookeeper 是树状结构的注册中心,每个节点的类型分为持久节点、持久顺序节点、临时节点、临时顺序节点。
持久节点:服务注册后保证节点不会丢失,注册中心重启也会存在。
在持久节点特性的基础上增加了节点先后顺序的能力。
临时节点:服务注册后连接丢失或 session 超时,注册的节点会自动被移除。
临时顺序节点:在临时节点特性的基础上增加了节点先后顺序的能力。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class ZkClientUtil {
private static String zkServer = "127.0.0.1:2181";//zookeeper地址
private static ZkClient zkClient;
public static ZkClient connectZkClient() {
if (zkClient != null) return zkClient;
zkClient = new ZkClient(zkServer);//创建zookeeper的java客户端连接
return zkClient;
}
public static void closeZkClient() {
if (zkClient != null) {
zkClient.close();
zkClient = null;
}
}
/**
* 遍历展示目录下的所有节点
* @author LAN
* @date 2018年12月3日
* @param root
*/
public static void showZkPath(String root) {
showZkPath(connectZkClient(), root);
}
public static void showZkPath(ZkClient zkClient, String root) {
List<String> children = zkClient.getChildren(root);//获取节点下的所有直接子节点
if (children.isEmpty()) {
return;
}
for (String s : children) {
String childPath = root.endsWith("/") ? (root + s) : (root + "/" + s);
System.err.println(childPath);
showZkPath(zkClient, childPath);//递归获取所有子节点
}
}
}
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.ZkClient;
public class ZookeeperOperation {
public static void main(String[] args) {
ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
if(!zkClient.exists("/test")) {
zkClient.createPersistent("/test");
}
System.err.println("***************在/test下分别创建4种节点***************");
if(!zkClient.exists("/test/永久节点1")) {
zkClient.createPersistent("/test/永久节点1");
}
String s1 = zkClient.createPersistentSequential("/test/永久顺序节点", null);
zkClient.createEphemeral("/test/临时节点1");
zkClient.createEphemeralSequential("/test/临时顺序节点1", null);
ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
System.err.println("***************关闭客户端再创建新的客户端***************");
ZkClientUtil.closeZkClient();
zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
System.err.println("***************删除节点s1***************");
zkClient.delete(s1);//删除某个节点,如果该节点下有子节点,则会报错
ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
zkClient.deleteRecursive("/test");//强制删除某个节点,并且删除节点下的所有子节点
zkClient.close();
}
}
***************在/test下分别创建4种节点***************
/test/永久顺序节点0000000004
/test/临时顺序节点10000000006
/test/永久节点1
/test/临时节点1
/test/永久顺序节点0000000001
***************关闭客户端再创建新的客户端***************
[01/04/20 02:16:24:024 CST] ZkClient-EventThread-11-127.0.0.1:2181 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
[01/04/20 02:16:24:024 CST] main INFO zookeeper.ZooKeeper: Session: 0x10001204ed20003 closed
[01/04/20 02:16:24:024 CST] main INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@12f40c25
[01/04/20 02:16:24:024 CST] main-EventThread INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x10001204ed20003
[01/04/20 02:16:24:024 CST] main INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
[01/04/20 02:16:24:024 CST] ZkClient-EventThread-14-127.0.0.1:2181 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x10001204ed20004, negotiated timeout = 30000
[01/04/20 02:16:24:024 CST] main-EventThread INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
/test/永久顺序节点0000000004
/test/永久节点1
/test/永久顺序节点0000000001
***************删除节点s1***************
/test/永久节点1
/test/永久顺序节点0000000001
首先创建四种节点,当断开与zookeeper连接后,临时节点自动删除,再次连接时永久节点依然存在。
要在zookeeper节点上写入数据以及读取数据,需要对数据进行序列化及反序列化。
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.*;
public class ZkSerialize implements ZkSerializer {
/**
* 序列化
*/
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
ObjectOutputStream oos = null;
ByteArrayOutputStream baos = null;
try {
// 序列化
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(data);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(baos!=null) baos.close();
if(oos!=null) oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
/**
* 反序列化
*/
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
try {
// 反序列化
bais = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bais);
return ois.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
bais.close();
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
}
package com.alibaba.dubbo.samples.zkclient;
import java.io.Serializable;
public class User implements Serializable {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "[User:name=" + name + "]";
}
}
/**
* 遍历展示目录下的所有节点
* @author test
* @date 2018年12月3日
* @param root
*/
public static void showZkPathData(String root, ZkSerialize serializer) {
showZkPathData(connectZkClient(), root, serializer);
}
public static void showZkPathData(ZkClient zkClient, String root, ZkSerialize serializer) {
zkClient.setZkSerializer(serializer);
List<String> children = zkClient.getChildren(root);
if(children.isEmpty()){
return;
}
for(String s:children){
String childPath = root.endsWith("/")?(root+s):(root+"/"+s);
Object data = zkClient.readData(childPath, true);
if(data!=null) {
System.err.println(data.getClass());
}
System.err.println(childPath+"("+data+")");
showZkPathData(zkClient, childPath, serializer);
}
}
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.ZkClient;
public class ZookeeperDataOperation {
public static void main(String[] args) {
ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
if(!zkClient.exists("/test")) {
zkClient.createPersistent("/test");
}
User user = new User();
User user2 = new User();
user.setName("阮浩");
user2.setName("张三");
zkClient.setZkSerializer(new ZkSerialize());//这里先设置好序列化工具再写入数据
zkClient.createEphemeral("/test/ruanhao", user);
zkClient.createEphemeral("/test/zhangsan");
zkClient.writeData("/test/zhangsan", user2);
ZkClientUtil.showZkPathData(zkClient, "/test", new ZkSerialize());//展示test目录下的所有子目录
ZkClientUtil.closeZkClient();
}
}
class com.alibaba.dubbo.samples.zkclient.User
/test/zhangsan([User:name=张三])
class com.alibaba.dubbo.samples.zkclient.User
/test/ruanhao([User:name=阮浩])
可以在创建路径时存入数据,也可以调用 writeData 存入数据。
通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。
监听节点的数据变化事件包括:1、节点被创建; 2、节点上写入数据; 3、节点数据变化; 4、节点数据被删除; 5、节点本身被删除。
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
* 监听测试类
*/
public class ZookeeperSubscribeListener {
public static void main(String[] args) {
new ZookeeperSubscribeListener().subscribeDataChanges();
}
private void subscribeDataChanges() {
ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
if(!zkClient.exists("/test")) {
zkClient.createPersistent("/test");
}
//注册监听事件
zkClient.subscribeDataChanges("/test/Node", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("DataDeleted:"+dataPath);
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("DataChange:"+dataPath+",data:"+data);
}
});
System.out.println("****************************************");
zkClient.createPersistent("/test/Node");
for (int i = 0; i< 5; i++) {
sleep(100);
zkClient.writeData("/test/Node", i);
}
zkClient.delete("/test/Node");
sleep(2000);
zkClient.unsubscribeAll();
zkClient.close();
}
private static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
****************************************
DataChange:/test/Node,data:null
DataChange:/test/Node,data:0
DataChange:/test/Node,data:1
DataChange:/test/Node,data:2
DataChange:/test/Node,data:3
DataDeleted:/test/Node
数据改变,触发监听器事件。
package com.alibaba.dubbo.samples.zkclient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
/**
* 监听节点变化测试类
*/
public class ZookeeperSubscribeNodeListener {
public static void main(String[] args) {
new ZookeeperSubscribeNodeListener().subscribeDataChanges();
}
private void subscribeDataChanges() {
ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
if (!zkClient.exists("/test")) {
zkClient.createPersistent("/test");
}
zkClient.subscribeChildChanges("/test/Node", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
String childs = "";
if (currentChilds != null && currentChilds.size() > 0) {
childs += "[";
for (String s : currentChilds) {
childs += s + ",";
}
childs += "]";
}
System.out.println("ChildChange:" + parentPath + ",childs:" + childs);
}
});
zkClient.createPersistent("/test/Node");
sleep(100);
zkClient.createPersistent("/test/Node/n1");
sleep(100);
zkClient.createPersistent("/test/Node/n2");
sleep(100);
zkClient.createPersistent("/test/Node/n3");
sleep(100);
zkClient.delete("/test/Node/n1");
sleep(100);
zkClient.delete("/test/Node/n2");
sleep(100);
zkClient.delete("/test/Node/n3");
sleep(3000);
System.out.println("****");
zkClient.deleteRecursive("/test/Node");
sleep(3000);
zkClient.close();
}
private static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ChildChange:/test/Node,childs:
ChildChange:/test/Node,childs:[n1,]
ChildChange:/test/Node,childs:[n1,n2,]
ChildChange:/test/Node,childs:[n1,n2,n3,]
ChildChange:/test/Node,childs:[n2,n3,]
ChildChange:/test/Node,childs:[n3,]
ChildChange:/test/Node,childs:
****
ChildChange:/test/Node,childs:
创建节点,监听该节点的监听器就会打印。