当前位置: 首页 > 工具软件 > zkclient > 使用案例 >

zkclient操作zookeeper

包丁雨
2023-12-01

目录

1、概述

2、创建zookeeper节点

引入zkclient 包

自定义ZkClient 工具类

zookeeper节点操作类

运行结果

分析

3、写入zookeeper节点数据

序列化类

User类

ZkClient 工具类增加方法

测试类

运行结果

分析

4、zookeeper 监听

监听数据变化

示例代码

运行结果

分析

监听节点变化

示例代码

运行结果

分析


1、概述

Zookeeper 是树状结构的注册中心,每个节点的类型分为持久节点、持久顺序节点、临时节点、临时顺序节点。

  • 持久节点:服务注册后保证节点不会丢失,注册中心重启也会存在。

  • 在持久节点特性的基础上增加了节点先后顺序的能力。

  • 临时节点:服务注册后连接丢失或 session 超时,注册的节点会自动被移除。

  • 临时顺序节点:在临时节点特性的基础上增加了节点先后顺序的能力。

2、创建zookeeper节点

引入zkclient 包

<dependency>
	<groupId>com.101tec</groupId>
	<artifactId>zkclient</artifactId>
	<version>0.10</version>
</dependency>

自定义ZkClient 工具类

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.ZkClient;

import java.util.List;

public class ZkClientUtil {
    private static String zkServer = "127.0.0.1:2181";//zookeeper地址
    private static ZkClient zkClient;

    public static ZkClient connectZkClient() {
        if (zkClient != null) return zkClient;
        zkClient = new ZkClient(zkServer);//创建zookeeper的java客户端连接
        return zkClient;
    }
    public static void closeZkClient() {
        if (zkClient != null) {
            zkClient.close();
            zkClient = null;
        }
    }

    /**
     *  遍历展示目录下的所有节点
     * @author LAN
     * @date 2018年12月3日
     * @param root
     */
    public static void showZkPath(String root) {
        showZkPath(connectZkClient(), root);
    }
    public static void showZkPath(ZkClient zkClient, String root) {
        List<String> children = zkClient.getChildren(root);//获取节点下的所有直接子节点
        if (children.isEmpty()) {
            return;
        }
        for (String s : children) {
            String childPath = root.endsWith("/") ? (root + s) : (root + "/" + s);
            System.err.println(childPath);
            showZkPath(zkClient, childPath);//递归获取所有子节点
        }
    }

}

zookeeper节点操作类

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.ZkClient;

public class ZookeeperOperation {

    public static void main(String[] args) {
        ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接

        if(!zkClient.exists("/test")) {
            zkClient.createPersistent("/test");
        }
        System.err.println("***************在/test下分别创建4种节点***************");
        if(!zkClient.exists("/test/永久节点1")) {
            zkClient.createPersistent("/test/永久节点1");
        }
        String s1 = zkClient.createPersistentSequential("/test/永久顺序节点", null);
        zkClient.createEphemeral("/test/临时节点1");
        zkClient.createEphemeralSequential("/test/临时顺序节点1", null);
        ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录

        System.err.println("***************关闭客户端再创建新的客户端***************");
        ZkClientUtil.closeZkClient();
        zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接

        ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录

        System.err.println("***************删除节点s1***************");
        zkClient.delete(s1);//删除某个节点,如果该节点下有子节点,则会报错
        ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录

        zkClient.deleteRecursive("/test");//强制删除某个节点,并且删除节点下的所有子节点
        zkClient.close();
    }

}

运行结果

***************在/test下分别创建4种节点***************
/test/永久顺序节点0000000004
/test/临时顺序节点10000000006
/test/永久节点1
/test/临时节点1
/test/永久顺序节点0000000001
***************关闭客户端再创建新的客户端***************
[01/04/20 02:16:24:024 CST] ZkClient-EventThread-11-127.0.0.1:2181  INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
[01/04/20 02:16:24:024 CST] main  INFO zookeeper.ZooKeeper: Session: 0x10001204ed20003 closed
[01/04/20 02:16:24:024 CST] main  INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@12f40c25
[01/04/20 02:16:24:024 CST] main-EventThread  INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x10001204ed20003
[01/04/20 02:16:24:024 CST] main  INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
[01/04/20 02:16:24:024 CST] ZkClient-EventThread-14-127.0.0.1:2181  INFO zkclient.ZkEventThread: Starting ZkClient event thread.
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181)  INFO zookeeper.ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181)  INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
[01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181)  INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x10001204ed20004, negotiated timeout = 30000
[01/04/20 02:16:24:024 CST] main-EventThread  INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
/test/永久顺序节点0000000004
/test/永久节点1
/test/永久顺序节点0000000001
***************删除节点s1***************
/test/永久节点1
/test/永久顺序节点0000000001

分析

首先创建四种节点,当断开与zookeeper连接后,临时节点自动删除,再次连接时永久节点依然存在。

3、写入zookeeper节点数据

序列化类

要在zookeeper节点上写入数据以及读取数据,需要对数据进行序列化及反序列化。

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.*;

public class ZkSerialize implements ZkSerializer {
    /**
     * 序列化
     */
    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            // 序列化
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(data);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(baos!=null) baos.close();
                if(oos!=null) oos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    /**
     * 反序列化
     */
    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        ByteArrayInputStream bais = null;
        ObjectInputStream ois = null;
        try {
            // 反序列化
            bais = new ByteArrayInputStream(bytes);
            ois = new ObjectInputStream(bais);
            return ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                bais.close();
                ois.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

}

User类

package com.alibaba.dubbo.samples.zkclient;

import java.io.Serializable;

public class User implements Serializable {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "[User:name=" + name + "]";
    }
}

ZkClient 工具类增加方法


    /**
     *  遍历展示目录下的所有节点
     * @author test
     * @date 2018年12月3日
     * @param root
     */
    public static void showZkPathData(String root, ZkSerialize serializer) {
        showZkPathData(connectZkClient(), root, serializer);
    }
    public static void showZkPathData(ZkClient zkClient, String root, ZkSerialize serializer) {
        zkClient.setZkSerializer(serializer);
        List<String> children = zkClient.getChildren(root);
        if(children.isEmpty()){
            return;
        }
        for(String s:children){
            String childPath = root.endsWith("/")?(root+s):(root+"/"+s);
            Object data = zkClient.readData(childPath, true);
            if(data!=null) {
                System.err.println(data.getClass());
            }
            System.err.println(childPath+"("+data+")");
            showZkPathData(zkClient, childPath, serializer);
        }
    }

测试类

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.ZkClient;

public class ZookeeperDataOperation {

    public static void main(String[] args) {
        ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接

        if(!zkClient.exists("/test")) {
            zkClient.createPersistent("/test");
        }
        User user = new User();
        User user2 = new User();
        user.setName("阮浩");
        user2.setName("张三");
        zkClient.setZkSerializer(new ZkSerialize());//这里先设置好序列化工具再写入数据
        zkClient.createEphemeral("/test/ruanhao", user);
        zkClient.createEphemeral("/test/zhangsan");
        zkClient.writeData("/test/zhangsan", user2);
        ZkClientUtil.showZkPathData(zkClient, "/test", new ZkSerialize());//展示test目录下的所有子目录

        ZkClientUtil.closeZkClient();
    }

}

运行结果

class com.alibaba.dubbo.samples.zkclient.User
/test/zhangsan([User:name=张三])
class com.alibaba.dubbo.samples.zkclient.User
/test/ruanhao([User:name=阮浩])

分析

可以在创建路径时存入数据,也可以调用 writeData 存入数据。

4、zookeeper 监听

通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。

监听节点的数据变化事件包括:1、节点被创建; 2、节点上写入数据; 3、节点数据变化; 4、节点数据被删除; 5、节点本身被删除。

监听数据变化

示例代码

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

/**
 * 监听测试类
 */
public class ZookeeperSubscribeListener {

    public static void main(String[] args) {
        new ZookeeperSubscribeListener().subscribeDataChanges();
    }

    private void subscribeDataChanges() {
        ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
        if(!zkClient.exists("/test")) {
            zkClient.createPersistent("/test");
        }
        //注册监听事件
        zkClient.subscribeDataChanges("/test/Node", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("DataDeleted:"+dataPath);
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("DataChange:"+dataPath+",data:"+data);
            }
        });
        System.out.println("****************************************");
        zkClient.createPersistent("/test/Node");
        for (int i = 0; i< 5; i++) {
            sleep(100);
            zkClient.writeData("/test/Node", i);
        }
        zkClient.delete("/test/Node");
        sleep(2000);
        zkClient.unsubscribeAll();
        zkClient.close();
    }

    private static void sleep(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行结果

****************************************
DataChange:/test/Node,data:null
DataChange:/test/Node,data:0
DataChange:/test/Node,data:1
DataChange:/test/Node,data:2
DataChange:/test/Node,data:3
DataDeleted:/test/Node

分析

数据改变,触发监听器事件。

监听节点变化

示例代码

package com.alibaba.dubbo.samples.zkclient;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;

/**
 * 监听节点变化测试类
 */
public class ZookeeperSubscribeNodeListener {

    public static void main(String[] args) {
        new ZookeeperSubscribeNodeListener().subscribeDataChanges();
    }

    private void subscribeDataChanges() {
        ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
        if (!zkClient.exists("/test")) {
            zkClient.createPersistent("/test");
        }
     
        zkClient.subscribeChildChanges("/test/Node", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                String childs = "";
                if (currentChilds != null && currentChilds.size() > 0) {
                    childs += "[";
                    for (String s : currentChilds) {
                        childs += s + ",";
                    }
                    childs += "]";
                }
                System.out.println("ChildChange:" + parentPath + ",childs:" + childs);
            }
        });
        zkClient.createPersistent("/test/Node");
        sleep(100);
        zkClient.createPersistent("/test/Node/n1");
        sleep(100);
        zkClient.createPersistent("/test/Node/n2");
        sleep(100);
        zkClient.createPersistent("/test/Node/n3");
        sleep(100);
        zkClient.delete("/test/Node/n1");
        sleep(100);
        zkClient.delete("/test/Node/n2");
        sleep(100);
        zkClient.delete("/test/Node/n3");
        sleep(3000);
        System.out.println("****");
        zkClient.deleteRecursive("/test/Node");
        sleep(3000);
        zkClient.close();
    }
    
    private static void sleep(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行结果

ChildChange:/test/Node,childs:
ChildChange:/test/Node,childs:[n1,]
ChildChange:/test/Node,childs:[n1,n2,]
ChildChange:/test/Node,childs:[n1,n2,n3,]
ChildChange:/test/Node,childs:[n2,n3,]
ChildChange:/test/Node,childs:[n3,]
ChildChange:/test/Node,childs:
****
ChildChange:/test/Node,childs:

分析

创建节点,监听该节点的监听器就会打印。

 类似资料: