Zookeeper学习笔记(二)——Zookeeper的java客户端搭建

赫连骏
2023-12-01

一、客户端种类

针对zookeeper,比较常用的Java客户端有zkclient、curator。由于Curator对于zookeeper的抽象层次比较高,简化了zookeeper客户端的开发量,使得Curator逐步被广泛应用。
学习本文前最好先学习我的上一遍博客:Zookeeper学习笔记(一)——Zookeeper的基本功能

二、使用zkclient搭建zookeeper的java客户端

1、依赖

	<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>

2、建立连接

通过ZooKeeper的构造方法来建立连接。zookeeper一共有四种构造方法,根据参数来选择。参数列表如下:
connectSstring:服务器地址(ip:port)列表,多个地址以“,”隔开;
sessionTimeout:心跳检测时间周期(毫秒);‘
wather:watch监听器;
canBeReadOnly:标识当前会话是否支持只读;
sessionld和sessionPasswd:提供连接zookeeper的sessionld和密码, 通过这两个确定唯一一台客户端,目的是可以提供重复会话;

	public static ZooKeeper getZKclient(String connStr, int sessionTimeout){
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connStr, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                        if(Event.EventType.None == watchedEvent.getType()){
                            //如果收到了服务端的响应事件,连接成功
                            System.out.println("zookeeper服务器连接成功!");
                            countDownLatch.countDown();
                        }

                    }
                }
            });
            countDownLatch.await();
            System.out.println("状态:"+zooKeeper.getState());
            return zooKeeper;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

3、节点的增删改查

(1)创建节点
通过create方法创建节点。create主要分为同步和异步两种形式。
所有参数列表:
path:节点路径;
data:节点的value值;
acl:ACl权限列表;
createMode:节点的类型,新版本的zookeeper有7种:

  1. PERSISTENT、PERSISTENT_SEQUENTIAL 持久节点/持久有序节点
  2. EPHEMERAL、EPHEMERAL_SEQUENTIAL 临时节点/临时有序节点
  3. CONTAINER 容器节点
  4. PERSISTENT_WITH_TTL、PERSISTENT_SEQUENTIAL_WITH_TTL 持久ttl节点,持久有序ttl节点

Create2Callback:异步回调方法
StringCallback:异步回调方法
ctx:上下文参数;
ttl:超时时间(ms),在规定时间节点一直没被修改,就删除节点。
stat:保存节点创建后的属性信息。

 //创建节点
    public static void createNode(ZooKeeper zkClient){

        try {
        	//需要手动创建父节点
            zkClient.create("/demo/node1",
                    "test".getBytes(),
                    //zookeeper提供了几个默认的权限列表,在ZooDefs.Ids中
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
            System.out.println("节点创建成功!");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

	//创建节点
    public static void createNode2(ZooKeeper zkClient){

        try {
        	//Perms的值包含:int READ = 1;int WRITE = 2;int CREATE = 4;int DELETE = 8;
            //int ADMIN = 16;int ALL = 31;表示权限类型,ALL代表所有权限
            //Id(String scheme, String id)。scheme表示权限模式
            //有world、auth、digest、ip四种模式
            //DigestAuthenticationProvider.generateDigest("user:123456")是将
            //'user:123456'加密(先用SHA1再用base64)后和'user:'拼接
    
            List<ACL> acls = new ArrayList<>();
            //digest模式
            //ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
            
            //world模式,可以使用“|”赋予多个权限
            //ACL acl = new ACL(ZooDefs.Perms.READ |ZooDefs.Perms.WRITE | ZooDefs.Perms.ADMIN,new Id("world","anyone"));
            
            //auth模式,需要先添加验证口令,再创建
            zkClient.addAuthInfo("digest","user:123456".getBytes());
            ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("auth","user:123456"));
            //ip模式
			//ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("ip","127.0.0.1"));

            acls.add(acl);

            zkClient.create("/demo/node2", "node2".getBytes(), acls, CreateMode.PERSISTENT);
            System.out.println("节点创建成功!");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	//异步创建节点
    public static void createNode3(ZooKeeper zkClient){

        try {
            List<ACL> acls = new ArrayList<>();
            ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
            acls.add(acl);

            String ctxParam = "test";
            Create2Callback
            zkClient.create("/demo/node8", "node8".getBytes(), acls, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback() {
                @Override
                public void processResult(int i, String s, Object o, String s1, Stat stat) {
                   //o就是ctxParam 
                   System.out.println("i:"+i+",s:"+s+",o:"+o+",s1:"+s1+",stat:"+stat);
                }
            },ctxParam);
			
			//StringCallback
			/*zkClient.create("/demo/node8", "node8".getBytes(), acls, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int i, String s, Object o, String s1, Stat stat) {
                    System.out.println("i:"+i+",s:"+s+",o:"+o+",s1:"+s1);
                }
            },ctxParam);*/
            System.out.println("节点创建成功!");
            Thread.sleep(3000);
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

	//创建节点
    public static void createNode4(ZooKeeper zkClient){

        try {
            List<ACL> acls = new ArrayList<>();
            ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
            acls.add(acl);

            Stat stat = new Stat();
            String str = zkClient.create("/demo/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT,stat);
            System.out.println("节点创建成功!");
            System.out.println("stat:"+stat.toString());
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

(2)查看节点
使用getData方法查看节点。getData方法声明如下:

public void getData(String path, boolean watch, DataCallback cb, Object ctx) 
public byte[] getData(String path, boolean watch, Stat stat)
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public byte[] getData(String path, Watcher watcher, Stat stat)

参数列表:
path:节点路径
boolean watch:是否使用连接时的watch监听
watcher:watch监听器
cb:回调方法
ctx:上下文参数
stat:保存节点的属性

	//查看节点
    public static void getData(ZooKeeper zkClient){
        try {
            Stat stat = new Stat();
            byte[] resByte = zkClient.getData("/demo/node7",true,stat);
            System.out.println("nodeVal:"+new String(resByte));
            System.out.println("stat:"+stat);

            byte[] resByte1 = zkClient.getData("/demo/node8", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("receive node:"+watchedEvent.getPath());
                    System.out.println("watchedEvent"+watchedEvent.toString());
                }
            },stat);
            System.out.println("nodeVal:"+new String(resByte1));
            System.out.println("stat:"+stat);

            String ctxParam = "ctxVal";
            zkClient.getData("/demo/node8", true, new AsyncCallback.DataCallback() {
                @Override
                public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                    System.out.println("i:"+i+",s:"+s+",o:"+o+",bytes:"+new String(bytes)+",stat:"+stat);
                }
            },ctxParam);

            zkClient.getData("/demo/node8", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("receive node:"+watchedEvent.getPath());
                    System.out.println("watchedEvent"+watchedEvent.toString());
                }
            }, new AsyncCallback.DataCallback() {
                @Override
                public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                    System.out.println("i:" + i + ",s:" + s + ",o:" + o + ",bytes:" + new String(bytes) + ",stat:" + stat);
                }
            }, ctxParam);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

验证watch监听需要客户端一直保持连接。可以在main方法中使用System.in.read()。让连接一直保持。
(3)更新节点
使用setData方法,更新节点的value值。setData方法声明如下:

public Stat setData(String path, byte[] data, int version)
public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)

参数列表:
path:节点路径
data:更新后的值
version:版本号,如果节点的当前value版本号与version不一致,则不更新。
StatCallback :异步回调方法
ctx:上下文参数

//更新节点
    public static void setData(ZooKeeper zkClient){
        try {
            Stat stat = new Stat();
            zkClient.getData("/demo/node7",false, stat);
            //先查询版本号,再根据版本号更新
            Stat res = zkClient.setData("/demo/node7","updateVal1".getBytes(),stat.getVersion());
            System.out.println("res:"+res.toString());


            Stat stat1 = new Stat();
            zkClient.getData("/demo/node7",false, stat);
            String ctxParam = "ctxVal";
            zkClient.setData("/demo/node7","updateVal2".getBytes(),stat1.getVersion(), new AsyncCallback.StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {
                    System.out.println("i:"+i+",s:"+s+",o:"+o+",stat:"+stat);
                }
            },ctxParam);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

(4)删除节点
删除节点使用delete方法。方法声明如下:

public void delete(String path, int version)
public void delete(String path, int version, VoidCallback cb, Object ctx)

参数列表:
path:节点路径
version:版本号。节点当前版本号与version一致时,删除
cb:异步回调方法
ctx:上下文参数

//删除节点
    public static void deleteNode(ZooKeeper zkClient){
        try {
            Stat stat = new Stat();
            zkClient.getData("/demo/node8",false, stat);
            zkClient.delete("/demo/node8",stat.getVersion());


            String ctxParam = "ctxVal";
            zkClient.delete("/demo/node7", -1, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int i, String s, Object o) {
                    System.out.println("i:"+i+",s:"+s+",o:"+o);
                }
            },ctxParam);

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

(5)查看子节点
使用getChildren方法查看子节点,方法声明如下:

public List<String> getChildren(String path, boolean watch)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, boolean watch, Stat stat)
public List<String> getChildren(String path, Watcher watcher)
public void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, Watcher watcher, Stat stat)

参数列表:
path:节点路路径
watch:是否使用建立连接时watch监听器
watcher:watch监听器
cb:异步回调方法,有Children2Callback 和ChildrenCallback 两种
ctx:上下文参数
stat:保存查询时的

//查看子节点
    public static void getChildrenNodes(ZooKeeper zkClient){
        List<String> childrens = new ArrayList<>();
        try {
            //childrens = zkClient.getChildren("/demo",false);
            //Stat stat = new Stat();
            //childrens = zkClient.getChildren("/demo",false,stat);
            /*childrens = zkClient.getChildren("/demo", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("receive nodeChange:"+watchedEvent.getPath());
                    System.out.println("watchedEvent:"+watchedEvent);
                }
            },stat);*/
            /*System.out.println("stat:"+stat);
            for (String children:childrens){
                System.out.println(children);
            }*/
            String ctxParam = "ctxVal";
            zkClient.getChildren("/demo", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("receive nodeChange:"+watchedEvent.getPath());
                    System.out.println("watchedEvent:"+watchedEvent);
                }
            }, new AsyncCallback.Children2Callback() {
                @Override
                public void processResult(int i, String s, Object o, List<String> list, Stat stat) {
                    System.out.println("i:" + i + ",s:" + s + ",o:" + o + ",stat:" + stat);
                    for (String str:list){
                        System.out.println(str);
                    }
                }
            },ctxParam);
           

           
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Watch监听只执行一次,再次监听需要重新诸注册

三、使用curator创建zookeeper的java客户端

1、依赖

		<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--curator封装好的一些基于zookeeper的功能:如分布式锁、分布式队列等等-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

2、建立连接

	public static CuratorFramework getClient(String connStr, int sessionTimeout){
		CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")	//zk服务器地址
                .sessionTimeoutMs(sessionTimeout)	//心跳检测时间周期(毫秒)
                .connectionTimeoutMs(5000)	//连接超时时间
                //重试策略
                //ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
				//RetryNTimes:指定最大重试次数的重试策略
	  			//RetryOneTime:仅重试一次
				//RetryUntilElapsed:一直重试直到达到规定的时间
                .retryPolicy(new ExponentialBackoffRetry(1000,3))	
                //带上验证口令(访问权限节点时需要)
				.authorization("digest","user:123456".getBytes())
				//表示该客户只再/aclnodes下操作
                .namespace("aclnodes")
                .build();
        //启动
        client.start();
	}

3、节点的增删查改

(1)创建节点

//创建节点
    public static void createData(CuratorFramework client)  {
        try {
            client.create()
                    //父节点不存在时会自动创建父节点
                    .creatingParentsIfNeeded()
                    //节点类型
                    //PERSISTENT 、PERSISTENT_SEQUENTIAL 持久节点/持久有序节点
                    //EPHEMERAL、EPHEMERAL_SEQUENTIAL    临时节点/临时有序节点
                    //CONTAINER   容器节点
                    //PERSISTENT_WITH_TTL、PERSISTENT_SEQUENTIAL_WITH_TTL    持久ttl节点,持久有序ttl节点
                    .withMode(CreateMode.PERSISTENT)
                    //节点路径
                    .forPath("/curator/data","test".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

(2)修改节点

//修改节点
    public static void setData(CuratorFramework client)  {
        try {
            client.setData().forPath("/curator/data","settest".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

(3)查询节点

//查询节点
    public static String getData(CuratorFramework client)  {
        String str = null;
        try {
            Stat stat = new Stat();
            str = new String(client.getData().
                    //查询时带上节点属性
                    storingStatIn(stat).
                    forPath("/curator/data"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(str);
        return str;
    }

(4)删除节点

 //删除节点
    public static void delete(CuratorFramework client)  {
        try {
            Stat stat = new Stat();
            client.getData().storingStatIn(stat).forPath("/curator/data");
            client.delete()
                    //根据版本号删除(乐观锁)
                    .withVersion(stat.getVersion()).forPath("/curator/data");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4、权限设置

Zookeeper提供了一套ACL权限控制机制来保证数据的安全。有两种方式给节点加上权限:通过setACl命令;在创建节点时加上权限。

    //通过setACL设置权限
    public static void setACL(CuratorFramework client){
        try {
            List<ACL> acls = new ArrayList<>();
            //Perms的值包含:int READ = 1;int WRITE = 2;int CREATE = 4;int DELETE = 8;
            //int ADMIN = 16;int ALL = 31;表示权限类型,ALL代表所有权限
            //Id(String scheme, String id)。scheme表示权限模式
            //有world、auth、digest、ip四种模式
            //DigestAuthenticationProvider.generateDigest("user:123456")是将
            //'user:123456'加密(先用SHA1再用base64)后和'user:'拼接
            ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("digest",
                    DigestAuthenticationProvider.generateDigest("user:123456")));
            acls.add(acl);

            client.setACL().withACL(acls).forPath("/aclnodes/node1");
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


	//创建节点时赋予权限
    public static void createWithACL(CuratorFramework client){
        try {
            List<ACL> acls = new ArrayList<>();
            //设置多个权限
            //使用auth模式时,客户端需要带上验证口令(authorization方法),否则会报错
            ACL acl = new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN | ZooDefs.Perms.WRITE,
                    new Id("auth","user:123456"));
            //world模式
            //ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("world","anyone"));
            //ip模式
            //ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("ip","127.0.0.1"));
            acls.add(acl);

            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(acls)
                    .forPath("/aclnodes/node3","test".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

5、watcher监听

watcher监听机制是zookeeper中非常重要的特性,我们基于zookeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于zookeeper实现分布式锁、集群管理等功能。zookeeper的watcher只生效一次。但是curator对此功能做了封装,不需要我们关心了。curator提供了curator-recipes包,封装了一些功能。比如分布式锁、分布式队列、共享id等。我们可以使用curator-recipes来实现watcher监听。
Curator 提供了三种 Watcher 来监听节点的变化
• PathChildCache:监视一个路径下孩子结点的创建、删除、更新。
• NodeCache:监视当前结点的创建、更新、删除,并将结点的数据缓存在本地。
• TreeCache:PathChildCache 和 NodeCache 的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

	//监视当前结点的创建、更新、删除
    public static void addListener(CuratorFramework client){
        try {
            NodeCache nodeCache = new NodeCache(client,"/watch",false);
            NodeCacheListener nodeCacheListener = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("receive Node Changed");
                    System.out.println(nodeCache.getCurrentData().getPath()+"---"+new String(nodeCache.getCurrentData().getData()));

                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
            System.out.println("注册监听成功!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
	//监视一个路径下孩子结点的创建、删除、更新
    private static void addListenerChild(CuratorFramework client) {
        try {
            PathChildrenCache nodeCache=new PathChildrenCache(client,"/watch",true);
            PathChildrenCacheListener nodeCacheListener= new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    System.out.println(pathChildrenCacheEvent.getType()+"->"
                            +new String(pathChildrenCacheEvent.getData().getData()));
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start(PathChildrenCache.StartMode.NORMAL);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 类似资料: