Apache Curator简单使用(一)

范承教
2023-12-01
转载自: http://ifeve.com/zookeeper-curato-framework/
               http://blog.csdn.net/dc_726/article/details/46475633
               http://macrochen.iteye.com/blog/1366136

       Curator是Netflix开源的一套ZooKeeper客户端框架,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
       Curator主要解决了三类问题: 
        • 封装ZooKeeper client与ZooKeeper server之间的连接处理; 
        • 提供了一套Fluent风格的操作API; 
        •提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装. 

       Curator主要从以下几个方面降低了zk使用的复杂性: 
       重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿). 
       连接状态监控: Curator初始化之后会一直的对zk连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理.
       zk客户端实例管理:Curator对zk客户端到server集群连接进行管理. 并在需要的情况, 重建zk实例, 保证与zk集群的可靠连接.
       各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景), 这些实现都遵循了zk的最佳实践, 并考虑了各种极端情况.

       Curator有四个包,curator-client,curator-test,curator-framework,curator-recipes,其中curator-client和curator-test是最基础的,curator-framework依赖这两个包,curator-recipes又依赖curator-framework。Curator组成部分有:
         • Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法. 
         • Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理 
         • Recipes: 实现了通用ZooKeeper的recipe, 该组件基于Framework
         • Utilities:各种ZooKeeper的工具类 
         • Errors: 异常处理, 连接, 恢复等. 
         • Extensions: 对curator-recipes的扩展实现,拆分为 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.

名称空间(Namespace)
       因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下: 
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();  
…  
client.create().forPath("/test", data);// node was actually written to: "/MyApp/test",namespace也可以是多级” MyApp/app1”


方法说明: 
        • create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾 
        • delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾 
        • checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾 
        • getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾 
        • setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾 
        • getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾 
        • inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交 

监听器
       Curator提供了三种Watcher(Cache)来监听结点的变化:
        • Path Cache:监视一个路径下直接子结点的创建、删除、以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
        • Node Cache:仅监视指定结点的创建、更新、删除。产生的事件会传递给注册的NodeCacheListener。
        • Tree Cache:监视路径下的子结点(所有子结节,不管有多少层子结点)的创建、更新、删除事件。产生的事件会传递给注册的TreeCacheListener。

简单示例:
创建client
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
client.start();
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
		.sessionTimeoutMs(30000)
		.connectionTimeoutMs(30000)
		.canBeReadOnly(false)
		.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
		.namespace(namespace)
		.defaultData(null)
		.build();
client.start();
节点操作:
//节点创建
if (client.checkExists().forPath("/curator") == null)
    client.create().withMode(CreateMode.PERSISTENT).forPath("/curator");


//获取子节点
System.out.println(client.getChildren().forPath("/curator"));


//设置并获取数据
client.setData().forPath("/curator", "zero".getBytes());
System.out.println(client.getData().forPath("/curator"));


//删除节点
client.delete().forPath("/curator");
监视器:
        PathChildrenCache watcher = new PathChildrenCache(client, "/curator", true);
        watcher.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if (data == null) {
                    System.out.println("No data in event[" + event + "]");
                } else {
                    System.out.println("Receive event: "
                            + "type=[" + event.getType() + "]"
                            + ", path=[" + data.getPath() + "]"
                            + ", data=[" + new String(data.getData()) + "]"
                            + ", stat=[" + data.getStat() + "]");
                }
            }
        });
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        final NodeCache nodeCache = new NodeCache(client, C_PATH);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("catch node data change");
                ChildData childData = nodeCache.getCurrentData();
                if(childData == null){
                    System.out.println("===delete, path=" + C_PATH + ", childData=" + childData);
                }else{
                    System.out.println("===update or add, path=" + C_PATH + ", childData=" + new String(childData.getData(), CHARSET));
                }
            }
        });
        nodeCache.start();
        final TreeCache treeCache = new TreeCache(client, C_PATH);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("catch tree change");
                if(event.getData() == null){
                    System.out.println("===init," + event.getType());
                    return;
                }


                if(event.getData().getData() == null){
                    System.out.println("===delete," + event.getType() + "," + event.getData().getPath());
                }else{
                    System.out.println("===update or add," + event.getType() + "," + event.getData().getPath() + "," + new String(event.getData().getData(), TreeListener.CHARSET));
                }
            }
        });
        treeCache.start();
 类似资料: