Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。
名称 | 描述 |
---|---|
Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
Utilities | 为Zookeeper提供的各种实用程序。 |
Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
Curator的jar包已经发布到Maven中心,由以下几个artifact的组成。根据需要选择引入具体的artifact。但大多数情况下只用引入curator-recipes即可。
GroupID/Org | ArtifactID/Name | 描述 |
---|---|---|
org.apache.curator | curator-recipes | 所有典型应用场景。需要依赖client和framework,需设置自动获取依赖。 |
org.apache.curator | curator-framework | 同组件中framework介绍。 |
org.apache.curator | curator-client | 同组件中client介绍。 |
org.apache.curator | curator-test | 包含TestingServer、TestingCluster和一些测试工具。 |
org.apache.curator | curator-examples | 各种使用Curator特性的案例。 |
org.apache.curator | curator-x-discovery | 在framework上构建的服务发现实现。 |
org.apache.curator | curator-x-discoveryserver | 可以和Curator Discovery一起使用的RESTful服务器。 |
org.apache.curator | curator-x-rpc | Curator framework和recipes非java环境的桥接。 |
根据上面的描述,开发人员大多数情况下使用的都是curator-recipes的依赖,此依赖的maven配置如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。其中,此工厂类提供了三种创建客户端的方法。 前两种方法是通过newClient来实现,仅参数不同而已。
4.1.1创建客户端方法API
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) // 使用上面方法创建出一个CuratorFramework之后,需要再调用其start()方法完成会话创建。 //demo1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start(); //demo2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy); client.start(); //其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。默认提供了以下实现,分别为ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。
其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。默认提供了以下实现,分别为ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。
进一步查看源代码可以得知,其实这两种方法内部实现一样,只是对外包装成不同的方法。它们的底层都是通过第三个方法builder来实现的。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework Client = CuratorFrameworkFactory.builder() .connectString("hadoop1:2181,hadoop2:2181,hadoop3:2181") .sessionTimeoutMs(3000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); client.blockUntilConnected();
参数:
connectString:zk的server地址,多个server之间使用英文逗号分隔开
connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
retryPolicy:失败重试策略
ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
maxRetries:最大重试次数
maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间
其他,查看org.apache.curator.RetryPolicy接口的实现类
start() 开始创建会话。
blockUntilConnected() 直到连接成功或超时。
Curator默认创建的是持久节点,内容为空。 可以在创建时选择节点类型:
持久节点(PERSISTENT)
持久顺序节点(PERSISTENT_SEQUENTIAL)
临时节点(EPHEMERAL)
临时顺序节点(EPHEMERAL_SEQUENTIAL)
4.2.1、创建一个初始内容为空的节点
client.create().forPath(path);
4.2.2、创建一个包含内容的节点
client.create().forPath(path,"我是内容".getBytes());
4.2.3、创建临时节点,并递归创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); //此处Curator和ZkClient一样封装了递归创建父节点的方法。在递归创建父节点时,父节点为持久节点。
4.3.1、删除一个子节点
client.delete().forPath(path);
4.3.2、删除节点并递归删除其子节点
client.delete().deletingChildrenIfNeeded().forPath(path);
4.3.3、指定版本进行删除
client.delete().withVersion(1).forPath(path); //如果版本不存在,则删除异常,信息如下: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
4.3.4、强制保证删除一个节点
client.delete().guaranteed().forPath(path);
读取节点数据内容API相当简单,Curator提供了传入一个Stat,使用节点当前的Stat替换到传入的Stat的方法,查询方法执行完成之后,Stat引用已经执行当前最新的节点Stat。
// 普通查询 client.getData().forPath(path); // 包含状态查询 Stat stat = new Stat(); client.getData().storingStatIn(stat()).forPath(path);
更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常。
// 普通更新 client.setData().forPath(path,"新内容".getBytes()); // 指定版本更新 client.setData().withVersion(1).forPath(path); 更新出错,版本不一致异常: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
在使用以上针对节点的操作API时,我们会发现每个接口都有一个inBackground()方法可供调用。此接口就是Curator提供的异步调用入口。对应的异步处理接口为BackgroundCallback。此接口指提供了一个processResult的方法,用来处理回调结果。其中processResult的参数event中的getType()包含了各种事件类型,getResultCode()包含了各种响应码。
重点说一下inBackground的以下接口:
public T inBackground(BackgroundCallback callback, Executor executor); //此接口就允许传入一个Executor实例,用一个专门线程池来处理返回结果之后的业务逻辑。
/** * 异步创建节点 * * 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定, * 那么就会使用Zookeeper的EventThread线程对事件进行串行处理 * */ client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() + ",type:" + event.getType()); } }, Executors.newFixedThreadPool(10)).forPath("/async-node01"); client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() + ",type:" + event.getType()); } }).forPath("/async-node02");
/** 事务管理:碰到异常,事务会回滚 * 使用transaction()来控制事务 * @throws Exception */ public void testTransaction() throws Exception{ //定义几个基本操作 CuratorOp createOp = client.transactionOp().create() .forPath("/curator/one_path","some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData() .forPath("/curator","other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete() .forPath("/curator"); //事务执行结果 List<CuratorTransactionResult> results = client.transaction() .forOperations(createOp,setDataOp,deleteOp); //遍历输出结果 for(CuratorTransactionResult result : results){ System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType()); } } //因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
Curator提供了三种Watcher(Cache)来监听结点的变化:
Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
/** * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 */ ExecutorService pool = Executors.newFixedThreadPool(2); /** * 监听数据节点的变化情况 */ final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); }}, pool); /** * 监听子节点的变化情况 */ final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool ); client.setData().forPath("/zk-huey/cnode", "world".getBytes()); Thread.sleep(10 * 1000); pool.shutdown(); client.close();
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }