针对zookeeper,比较常用的Java客户端有zkclient、curator。由于Curator对于zookeeper的抽象层次比较高,简化了zookeeper客户端的开发量,使得Curator逐步被广泛应用。
学习本文前最好先学习我的上一遍博客:Zookeeper学习笔记(一)——Zookeeper的基本功能
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
通过ZooKeeper的构造方法来建立连接。zookeeper一共有四种构造方法,根据参数来选择。参数列表如下:
connectSstring:服务器地址(ip:port)列表,多个地址以“,”隔开;
sessionTimeout:心跳检测时间周期(毫秒);‘
wather:watch监听器;
canBeReadOnly:标识当前会话是否支持只读;
sessionld和sessionPasswd:提供连接zookeeper的sessionld和密码, 通过这两个确定唯一一台客户端,目的是可以提供重复会话;
public static ZooKeeper getZKclient(String connStr, int sessionTimeout){
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(connStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
if(Event.EventType.None == watchedEvent.getType()){
//如果收到了服务端的响应事件,连接成功
System.out.println("zookeeper服务器连接成功!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
System.out.println("状态:"+zooKeeper.getState());
return zooKeeper;
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
(1)创建节点
通过create方法创建节点。create主要分为同步和异步两种形式。
所有参数列表:
path:节点路径;
data:节点的value值;
acl:ACl权限列表;
createMode:节点的类型,新版本的zookeeper有7种:
Create2Callback:异步回调方法
StringCallback:异步回调方法
ctx:上下文参数;
ttl:超时时间(ms),在规定时间节点一直没被修改,就删除节点。
stat:保存节点创建后的属性信息。
//创建节点
public static void createNode(ZooKeeper zkClient){
try {
//需要手动创建父节点
zkClient.create("/demo/node1",
"test".getBytes(),
//zookeeper提供了几个默认的权限列表,在ZooDefs.Ids中
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("节点创建成功!");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//创建节点
public static void createNode2(ZooKeeper zkClient){
try {
//Perms的值包含:int READ = 1;int WRITE = 2;int CREATE = 4;int DELETE = 8;
//int ADMIN = 16;int ALL = 31;表示权限类型,ALL代表所有权限
//Id(String scheme, String id)。scheme表示权限模式
//有world、auth、digest、ip四种模式
//DigestAuthenticationProvider.generateDigest("user:123456")是将
//'user:123456'加密(先用SHA1再用base64)后和'user:'拼接
List<ACL> acls = new ArrayList<>();
//digest模式
//ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
//world模式,可以使用“|”赋予多个权限
//ACL acl = new ACL(ZooDefs.Perms.READ |ZooDefs.Perms.WRITE | ZooDefs.Perms.ADMIN,new Id("world","anyone"));
//auth模式,需要先添加验证口令,再创建
zkClient.addAuthInfo("digest","user:123456".getBytes());
ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("auth","user:123456"));
//ip模式
//ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("ip","127.0.0.1"));
acls.add(acl);
zkClient.create("/demo/node2", "node2".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("节点创建成功!");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
//异步创建节点
public static void createNode3(ZooKeeper zkClient){
try {
List<ACL> acls = new ArrayList<>();
ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
acls.add(acl);
String ctxParam = "test";
Create2Callback
zkClient.create("/demo/node8", "node8".getBytes(), acls, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback() {
@Override
public void processResult(int i, String s, Object o, String s1, Stat stat) {
//o就是ctxParam
System.out.println("i:"+i+",s:"+s+",o:"+o+",s1:"+s1+",stat:"+stat);
}
},ctxParam);
//StringCallback
/*zkClient.create("/demo/node8", "node8".getBytes(), acls, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1, Stat stat) {
System.out.println("i:"+i+",s:"+s+",o:"+o+",s1:"+s1);
}
},ctxParam);*/
System.out.println("节点创建成功!");
Thread.sleep(3000);
}catch (Exception e) {
e.printStackTrace();
}
}
//创建节点
public static void createNode4(ZooKeeper zkClient){
try {
List<ACL> acls = new ArrayList<>();
ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
acls.add(acl);
Stat stat = new Stat();
String str = zkClient.create("/demo/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT,stat);
System.out.println("节点创建成功!");
System.out.println("stat:"+stat.toString());
}catch (Exception e) {
e.printStackTrace();
}
}
(2)查看节点
使用getData方法查看节点。getData方法声明如下:
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public byte[] getData(String path, boolean watch, Stat stat)
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public byte[] getData(String path, Watcher watcher, Stat stat)
参数列表:
path:节点路径
boolean watch:是否使用连接时的watch监听
watcher:watch监听器
cb:回调方法
ctx:上下文参数
stat:保存节点的属性
//查看节点
public static void getData(ZooKeeper zkClient){
try {
Stat stat = new Stat();
byte[] resByte = zkClient.getData("/demo/node7",true,stat);
System.out.println("nodeVal:"+new String(resByte));
System.out.println("stat:"+stat);
byte[] resByte1 = zkClient.getData("/demo/node8", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive node:"+watchedEvent.getPath());
System.out.println("watchedEvent"+watchedEvent.toString());
}
},stat);
System.out.println("nodeVal:"+new String(resByte1));
System.out.println("stat:"+stat);
String ctxParam = "ctxVal";
zkClient.getData("/demo/node8", true, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("i:"+i+",s:"+s+",o:"+o+",bytes:"+new String(bytes)+",stat:"+stat);
}
},ctxParam);
zkClient.getData("/demo/node8", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive node:"+watchedEvent.getPath());
System.out.println("watchedEvent"+watchedEvent.toString());
}
}, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("i:" + i + ",s:" + s + ",o:" + o + ",bytes:" + new String(bytes) + ",stat:" + stat);
}
}, ctxParam);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
验证watch监听需要客户端一直保持连接。可以在main方法中使用System.in.read()。让连接一直保持。
(3)更新节点
使用setData方法,更新节点的value值。setData方法声明如下:
public Stat setData(String path, byte[] data, int version)
public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)
参数列表:
path:节点路径
data:更新后的值
version:版本号,如果节点的当前value版本号与version不一致,则不更新。
StatCallback :异步回调方法
ctx:上下文参数
//更新节点
public static void setData(ZooKeeper zkClient){
try {
Stat stat = new Stat();
zkClient.getData("/demo/node7",false, stat);
//先查询版本号,再根据版本号更新
Stat res = zkClient.setData("/demo/node7","updateVal1".getBytes(),stat.getVersion());
System.out.println("res:"+res.toString());
Stat stat1 = new Stat();
zkClient.getData("/demo/node7",false, stat);
String ctxParam = "ctxVal";
zkClient.setData("/demo/node7","updateVal2".getBytes(),stat1.getVersion(), new AsyncCallback.StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
System.out.println("i:"+i+",s:"+s+",o:"+o+",stat:"+stat);
}
},ctxParam);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
(4)删除节点
删除节点使用delete方法。方法声明如下:
public void delete(String path, int version)
public void delete(String path, int version, VoidCallback cb, Object ctx)
参数列表:
path:节点路径
version:版本号。节点当前版本号与version一致时,删除
cb:异步回调方法
ctx:上下文参数
//删除节点
public static void deleteNode(ZooKeeper zkClient){
try {
Stat stat = new Stat();
zkClient.getData("/demo/node8",false, stat);
zkClient.delete("/demo/node8",stat.getVersion());
String ctxParam = "ctxVal";
zkClient.delete("/demo/node7", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int i, String s, Object o) {
System.out.println("i:"+i+",s:"+s+",o:"+o);
}
},ctxParam);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
(5)查看子节点
使用getChildren方法查看子节点,方法声明如下:
public List<String> getChildren(String path, boolean watch)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, boolean watch, Stat stat)
public List<String> getChildren(String path, Watcher watcher)
public void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, Watcher watcher, Stat stat)
参数列表:
path:节点路路径
watch:是否使用建立连接时watch监听器
watcher:watch监听器
cb:异步回调方法,有Children2Callback 和ChildrenCallback 两种
ctx:上下文参数
stat:保存查询时的
//查看子节点
public static void getChildrenNodes(ZooKeeper zkClient){
List<String> childrens = new ArrayList<>();
try {
//childrens = zkClient.getChildren("/demo",false);
//Stat stat = new Stat();
//childrens = zkClient.getChildren("/demo",false,stat);
/*childrens = zkClient.getChildren("/demo", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive nodeChange:"+watchedEvent.getPath());
System.out.println("watchedEvent:"+watchedEvent);
}
},stat);*/
/*System.out.println("stat:"+stat);
for (String children:childrens){
System.out.println(children);
}*/
String ctxParam = "ctxVal";
zkClient.getChildren("/demo", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive nodeChange:"+watchedEvent.getPath());
System.out.println("watchedEvent:"+watchedEvent);
}
}, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int i, String s, Object o, List<String> list, Stat stat) {
System.out.println("i:" + i + ",s:" + s + ",o:" + o + ",stat:" + stat);
for (String str:list){
System.out.println(str);
}
}
},ctxParam);
} catch (Exception e) {
e.printStackTrace();
}
}
Watch监听只执行一次,再次监听需要重新诸注册
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<!--curator封装好的一些基于zookeeper的功能:如分布式锁、分布式队列等等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
public static CuratorFramework getClient(String connStr, int sessionTimeout){
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") //zk服务器地址
.sessionTimeoutMs(sessionTimeout) //心跳检测时间周期(毫秒)
.connectionTimeoutMs(5000) //连接超时时间
//重试策略
//ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
//RetryNTimes:指定最大重试次数的重试策略
//RetryOneTime:仅重试一次
//RetryUntilElapsed:一直重试直到达到规定的时间
.retryPolicy(new ExponentialBackoffRetry(1000,3))
//带上验证口令(访问权限节点时需要)
.authorization("digest","user:123456".getBytes())
//表示该客户只再/aclnodes下操作
.namespace("aclnodes")
.build();
//启动
client.start();
}
(1)创建节点
//创建节点
public static void createData(CuratorFramework client) {
try {
client.create()
//父节点不存在时会自动创建父节点
.creatingParentsIfNeeded()
//节点类型
//PERSISTENT 、PERSISTENT_SEQUENTIAL 持久节点/持久有序节点
//EPHEMERAL、EPHEMERAL_SEQUENTIAL 临时节点/临时有序节点
//CONTAINER 容器节点
//PERSISTENT_WITH_TTL、PERSISTENT_SEQUENTIAL_WITH_TTL 持久ttl节点,持久有序ttl节点
.withMode(CreateMode.PERSISTENT)
//节点路径
.forPath("/curator/data","test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
(2)修改节点
//修改节点
public static void setData(CuratorFramework client) {
try {
client.setData().forPath("/curator/data","settest".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
(3)查询节点
//查询节点
public static String getData(CuratorFramework client) {
String str = null;
try {
Stat stat = new Stat();
str = new String(client.getData().
//查询时带上节点属性
storingStatIn(stat).
forPath("/curator/data"));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(str);
return str;
}
(4)删除节点
//删除节点
public static void delete(CuratorFramework client) {
try {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/curator/data");
client.delete()
//根据版本号删除(乐观锁)
.withVersion(stat.getVersion()).forPath("/curator/data");
} catch (Exception e) {
e.printStackTrace();
}
}
Zookeeper提供了一套ACL权限控制机制来保证数据的安全。有两种方式给节点加上权限:通过setACl命令;在创建节点时加上权限。
//通过setACL设置权限
public static void setACL(CuratorFramework client){
try {
List<ACL> acls = new ArrayList<>();
//Perms的值包含:int READ = 1;int WRITE = 2;int CREATE = 4;int DELETE = 8;
//int ADMIN = 16;int ALL = 31;表示权限类型,ALL代表所有权限
//Id(String scheme, String id)。scheme表示权限模式
//有world、auth、digest、ip四种模式
//DigestAuthenticationProvider.generateDigest("user:123456")是将
//'user:123456'加密(先用SHA1再用base64)后和'user:'拼接
ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("digest",
DigestAuthenticationProvider.generateDigest("user:123456")));
acls.add(acl);
client.setACL().withACL(acls).forPath("/aclnodes/node1");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
//创建节点时赋予权限
public static void createWithACL(CuratorFramework client){
try {
List<ACL> acls = new ArrayList<>();
//设置多个权限
//使用auth模式时,客户端需要带上验证口令(authorization方法),否则会报错
ACL acl = new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN | ZooDefs.Perms.WRITE,
new Id("auth","user:123456"));
//world模式
//ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
//ip模式
//ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("ip","127.0.0.1"));
acls.add(acl);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath("/aclnodes/node3","test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
watcher监听机制是zookeeper中非常重要的特性,我们基于zookeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于zookeeper实现分布式锁、集群管理等功能。zookeeper的watcher只生效一次。但是curator对此功能做了封装,不需要我们关心了。curator提供了curator-recipes包,封装了一些功能。比如分布式锁、分布式队列、共享id等。我们可以使用curator-recipes来实现watcher监听。
Curator 提供了三种 Watcher 来监听节点的变化
• PathChildCache:监视一个路径下孩子结点的创建、删除、更新。
• NodeCache:监视当前结点的创建、更新、删除,并将结点的数据缓存在本地。
• TreeCache:PathChildCache 和 NodeCache 的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
//监视当前结点的创建、更新、删除
public static void addListener(CuratorFramework client){
try {
NodeCache nodeCache = new NodeCache(client,"/watch",false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("receive Node Changed");
System.out.println(nodeCache.getCurrentData().getPath()+"---"+new String(nodeCache.getCurrentData().getData()));
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
System.out.println("注册监听成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
//监视一个路径下孩子结点的创建、删除、更新
private static void addListenerChild(CuratorFramework client) {
try {
PathChildrenCache nodeCache=new PathChildrenCache(client,"/watch",true);
PathChildrenCacheListener nodeCacheListener= new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println(pathChildrenCacheEvent.getType()+"->"
+new String(pathChildrenCacheEvent.getData().getData()));
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start(PathChildrenCache.StartMode.NORMAL);
} catch (Exception e) {
e.printStackTrace();
}
}