转载自: http://ifeve.com/zookeeper-curato-framework/
http://blog.csdn.net/dc_726/article/details/46475633
http://macrochen.iteye.com/blog/1366136
Curator是Netflix开源的一套ZooKeeper客户端框架,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
Curator主要解决了三类问题:
• 封装ZooKeeper client与ZooKeeper server之间的连接处理;
• 提供了一套Fluent风格的操作API;
•提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.
Curator主要从以下几个方面降低了zk使用的复杂性:
重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿).
连接状态监控: Curator初始化之后会一直的对zk连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理.
zk客户端实例管理:Curator对zk客户端到server集群连接进行管理. 并在需要的情况, 重建zk实例, 保证与zk集群的可靠连接.
各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景), 这些实现都遵循了zk的最佳实践, 并考虑了各种极端情况.
Curator有四个包,curator-client,curator-test,curator-framework,curator-recipes,其中curator-client和curator-test是最基础的,curator-framework依赖这两个包,curator-recipes又依赖curator-framework。Curator组成部分有:
• Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法.
• Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理
• Recipes: 实现了通用ZooKeeper的recipe, 该组件基于Framework
• Utilities:各种ZooKeeper的工具类
• Errors: 异常处理, 连接, 恢复等.
• Extensions: 对curator-recipes的扩展实现,拆分为 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.
名称空间(Namespace)
因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下:
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
…
client.create().forPath("/test", data);// node was actually written to: "/MyApp/test",namespace也可以是多级” MyApp/app1”
方法说明:
• create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾
• delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
• checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾
• getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
• setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
• getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
• inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
• Path Cache:监视一个路径下直接子结点的创建、删除、以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
• Node Cache:仅监视指定结点的创建、更新、删除。产生的事件会传递给注册的NodeCacheListener。
• Tree Cache:监视路径下的子结点(所有子结节,不管有多少层子结点)的创建、更新、删除事件。产生的事件会传递给注册的TreeCacheListener。
简单示例:
创建client
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
client.start();
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(30000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.namespace(namespace)
.defaultData(null)
.build();
client.start();
节点操作:
//节点创建
if (client.checkExists().forPath("/curator") == null)
client.create().withMode(CreateMode.PERSISTENT).forPath("/curator");
//获取子节点
System.out.println(client.getChildren().forPath("/curator"));
//设置并获取数据
client.setData().forPath("/curator", "zero".getBytes());
System.out.println(client.getData().forPath("/curator"));
//删除节点
client.delete().forPath("/curator");
监视器:
PathChildrenCache watcher = new PathChildrenCache(client, "/curator", true);
watcher.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
final NodeCache nodeCache = new NodeCache(client, C_PATH);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("catch node data change");
ChildData childData = nodeCache.getCurrentData();
if(childData == null){
System.out.println("===delete, path=" + C_PATH + ", childData=" + childData);
}else{
System.out.println("===update or add, path=" + C_PATH + ", childData=" + new String(childData.getData(), CHARSET));
}
}
});
nodeCache.start();
final TreeCache treeCache = new TreeCache(client, C_PATH);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("catch tree change");
if(event.getData() == null){
System.out.println("===init," + event.getType());
return;
}
if(event.getData().getData() == null){
System.out.println("===delete," + event.getType() + "," + event.getData().getPath());
}else{
System.out.println("===update or add," + event.getType() + "," + event.getData().getPath() + "," + new String(event.getData().getData(), TreeListener.CHARSET));
}
}
});
treeCache.start();