当前位置: 首页 > 工具软件 > F-Curator > 使用案例 >

Curator Framework操作zookeeper(2)-节点监听

彭硕
2023-12-01

Curator Framework操作zookeeper(1)-基本操作

1 SessionConnectionStateListener

/**
 * 监听Session连接状态
 */
public class SessionConnectionStateListener  implements ConnectionStateListener
{
    private static final Logger log = LoggerFactory.getLogger(SessionConnectionStateListener.class);

    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState)
    {
        if (connectionState == ConnectionState.LOST)
        {
            log.info("ConnectionState: lost");
            try
            {
                for (;;)
                {
                    if (curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut())
                    {
                        log.info("reconnection: success");
                        break;
                    }
                }
                return;
            }
            catch (InterruptedException e)
            {
                log.info("reconnection: failure");
            }
            catch (Exception e)
            {
                log.info("reconnection: failure");
            }
        }
    }
}

2 AbstractNodeCacheListener 和 MyNodeCacheListener

public abstract class AbstractNodeCacheListener  implements NodeCacheListener {

    private static final Logger log = LoggerFactory.getLogger(MyNodeCacheListener.class);

    private NodeCache nodeCache;

    public AbstractNodeCacheListener(){

    }

    public AbstractNodeCacheListener(NodeCache nodeCache) {
        this.nodeCache = nodeCache;
    }

    public NodeCache getNodeCache() {
        return nodeCache;
    }

    public void setNodeCache(NodeCache nodeCache) {
        this.nodeCache = nodeCache;
    }
}

MyNodeCacheListener

/**
 * 监听本节点
 */
public class MyNodeCacheListener extends AbstractNodeCacheListener {
    private static final Logger log = LoggerFactory.getLogger(MyNodeCacheListener.class);

    public MyNodeCacheListener() {
         super();
    }

    public MyNodeCacheListener(NodeCache nodeCache) {
        super(nodeCache);
    }

    @Override
    public void nodeChanged() throws Exception {
        ChildData childData = this.getNodeCache().getCurrentData();
        /**
         * 创建和更新 childData 不为 null
         * 删除  childData 为 null
         */
        if(childData != null){
            log.info("Path: " + childData.getPath());
            log.info("Stat:" + childData.getStat());
            log.info("Data: "+ new String(childData.getData(),"utf-8"));
        }
    }
}

3 MyPathChildrenCacheListener

/**
 * 监听节点的所有子目录
 */
public class MyPathChildrenCacheListener implements PathChildrenCacheListener {

    private static final Logger log = LoggerFactory.getLogger(MyPathChildrenCacheListener.class);

    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        switch (event.getType()) {
            case INITIALIZED:
                //必须异步模式 PathChildrenCache.StartMode.POST_INITIALIZED_EVENT
                log.info("子节点初始化成功");
                break;
            case CHILD_ADDED:
                log.info("添加子节点路径:" + event.getData().getPath());
                log.info("添加子节点数据:" + new String(event.getData().getData()));
                break;
            case CHILD_UPDATED:
                log.info("修改子节点路径:" + event.getData().getPath());
                log.info("修改子节点数据:" + new String(event.getData().getData()));
                break;
            case CHILD_REMOVED:
                log.info("删除子节点:" + event.getData().getPath());
                break;
            case CONNECTION_LOST:
                log.info("连接丢失");
                break;
            case CONNECTION_SUSPENDED:
                log.info("连接被挂起");
                break;
            case CONNECTION_RECONNECTED:
                log.info("恢复连接");
                break;
        }
    }
}

4 MyTreeCacheListener

/**
 * 监听本节点和子目录
 */
public class MyTreeCacheListener implements TreeCacheListener {

    private static final Logger log = LoggerFactory.getLogger(MyTreeCacheListener.class);

    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        if (childData != null) {
            //System.out.println("Path: " + childData.getPath());
            //System.out.println("Stat:" + childData.getStat());
            //System.out.println("Data: " + new String(childData.getData()));
        }
        switch (event.getType()) {
            case INITIALIZED:
                log.info("节点初始化成功");
                break;
            case NODE_ADDED:
                log.info("添加节点路径:" + event.getData().getPath());
                log.info("节点数据:" + new String(event.getData().getData()));
                break;
            case NODE_UPDATED:
                log.info("修改节点路径:" + event.getData().getPath());
                log.info("修改节点数据:" + new String(event.getData().getData()));
                break;
            case NODE_REMOVED:
                log.info("删除节点:" + event.getData().getPath());
                break;
            case CONNECTION_LOST:
                System.out.println("连接丢失");
                break;
            case CONNECTION_SUSPENDED:
                System.out.println("连接被挂起");
                break;
            case CONNECTION_RECONNECTED:
                System.out.println("恢复连接");
                break;
        }
    }
}

5 测试代码

 public static void testNodeCacheListener(){
        ZkUtil.initialize();
        String parent = "/zkconfig";
        String child = "chynode";
        String path = parent + "/" + child;

        MyNodeCacheListener myNodeCacheListener=new MyNodeCacheListener();

        //注册当前节点监听器
        ZkUtil.registerNodeCacheListener(path,myNodeCacheListener);

        //如果不存在节点增加节点
        try {
            if (!ZkUtil.checkExist(path)) {
                ZkUtil.addNode(parent, child, "chy");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void testPathChildrenCacheListener(){
        ZkUtil.initialize();
        String parent = "/zkconfig";
        String child = "chynode";
        String path = parent + "/" + child;

        MyPathChildrenCacheListener myPathChildrenCacheListener=new MyPathChildrenCacheListener();

        //子节点监听器
        ZkUtil.registerPathChildListener(parent,myPathChildrenCacheListener);

        //如果不存在节点增加节点
        try {
            if (!ZkUtil.checkExist(path)) {
                ZkUtil.addNode(parent, child, "chy");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void testTreeCacheListener(){
        ZkUtil.initialize();
        String parent = "/zkconfig";
        String child = "chynode";
        String path = parent + "/" + child;

        MyTreeCacheListener treeCacheListener = new MyTreeCacheListener();

        // 0 只监听 /zkconfig
        //ZkUtil.registerTreeCacheListener(parent, 0, treeCacheListener);

        // 1 监听 /zkconfig 和 下一级节点
        ZkUtil.registerTreeCacheListener(parent, 1, treeCacheListener);

        //如果不存在节点增加节点
        try {
            if (!ZkUtil.checkExist(path)) {
                ZkUtil.addNode(parent, child, "chy");
            }

            if (ZkUtil.checkExist(parent)) {
                ZkUtil.updateNodeData(parent, "root");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 

 类似资料: