一、使用netflix的ZooKeeperConfigurationSource
(1)参考文档
https://github.com/Netflix/archaius/wiki/ZooKeeper-Dynamic-Configuration
(2)添加依赖
<dependency> <groupId>com.netflix.archaius</groupId> <artifactId>archaius-core</artifactId> <version>0.6.5</version> <exclusions> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.netflix.archaius</groupId> <artifactId>archaius-zookeeper</artifactId> <version>0.6.5</version> <exclusions> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> </exclusion> </exclusions> </dependency>
如果之前pom文件已经引入了curator-client,则需要在引入archaius时exclude下
(3)代码
@Before public void before() throws Exception { String zkConfigRootPath = "/cfg"; CuratorFramework client = ZkClientUtil.getClient(); ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(client, zkConfigRootPath); zkConfigSource.start(); DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource); ConfigurationManager.install(zkDynamicConfig); } /** * 仅仅是在本地内存中更新 * 即ConcurrentMapConfiguration中的map更新 * 不会更新到zk */ @Test public void locatSet(){ String key = "com.fluxcapacitor.my.property"; System.out.println("before set zk config"); ConfigurationManager.getConfigInstance().setProperty(key,"123"); System.out.println("after set zk config"); String myProperty = DynamicPropertyFactory.getInstance() .getStringProperty(key, "<none>") .get(); Assert.assertEquals(myProperty,"123"); }
二、自定义的zkconfig
(1)ZooKeeperConfigurationSource的局限
它是在start的时候,去注册节点的监听事件,默认采用的是pathChildrenCache只能监听子节点的变化事件,如果需要监听整个树的话,则需要TreeCache。
它是在new DynamicWatchedConfiguration的时候去读取zk的所有数据的。
public DynamicWatchedConfiguration(WatchedConfigurationSource source, boolean ignoreDeletesFromSource,
DynamicPropertyUpdater updater) {
this.source = source;
this.ignoreDeletesFromSource = ignoreDeletesFromSource;
this.updater = updater;
// get a current snapshot of the config source data
try {
Map<String, Object> currentData = source.getCurrentData();
WatchedUpdateResult result = WatchedUpdateResult.createFull(currentData);
updateConfiguration(result);
} catch (final Exception exc) {
logger.error("could not getCurrentData() from the WatchedConfigurationSource", exc);
}
// add a listener for subsequent config updates
this.source.addUpdateListener(this);
}
内存数据存储在ConcurrentMapConfiguration中的map中,对于set方法,只是作用于此map,并不会对远程zk进行set操作
(2)zk默认的最终一致性
zk默认采用的是最终一致性的,即对于一个更新操作,leader在获得超半数follower的支持后,返回更新。这里有两个问题容易导致一致性问题:
A、如果应用程序是通过本地内存读取的数据,则由于zk的事件通知延迟,可能读到脏数据
B、如果应用程序是通过远程读取zk的数据,则由于zk leader同步数据到follower的延迟,可能连接到的follower尚未更新,读到脏数据
(3)zk读取最新数据
如果要读取最新数据,则需要采取上边提到的B方法,但是在该方法之前先,调用下zk的client方法,跟leader同步数据,只要是在sync之前更新的数据,sync之后,读到的是最新的。但是会有性能消耗问题:
/** * 强一致性的读取方法 * 不从本地内存缓存读取 * 也不从zk的follower读取 * 通过sync从zk的leader读取 * @param key * @param defaultValue * @return */ private Object getRawPropertySync(String key,Object defaultValue){ String fullPath = ZkClientUtil.getFullPath(rootPath,key); ZkClientUtil.sync(fullPath); Object value = ZkClientUtil.getValue(fullPath); if(value != null){ return value; } //if read from zk is null,return defaultValue return defaultValue; }
(4)根据大部分场景的折中方案
大部分场景是读多写少,对于新增配置的操作,从本地内存读取,读取不到,则从zk去读取,这样能最大限度地保证读到最新数据;对于更新操作,始终都是更新到zk,然后本地内存等待zk事件通知,才更新本地数据。
由于配置通常不区分add操作还是update操作,因此对zk的操作,有几种策略:
/** * 几种实现策略选择: * 1、先更新、出错在新增 * 2、先新增、出错在更新 * 3、先删除再新增 * @param fullPath * @param value */ public static void setValue(String fullPath,String value) { if(value == null){ value = ""; } // try { // client.setData().forPath(fullPath, value.getBytes()); // } catch (KeeperException.NoNodeException exc) { // try { // client.create().creatingParentsIfNeeded().forPath(fullPath, value.getBytes()); // } catch (Exception e) { // e.printStackTrace(); // } // } catch (Exception e) { // e.printStackTrace(); // } try { client.inTransaction().check().forPath(fullPath).and() .setData().forPath(fullPath,value.getBytes()).and() .commit(); } catch (Exception e) { try { client.create().creatingParentsIfNeeded().forPath(fullPath, value.getBytes()); } catch (Exception e1) { e1.printStackTrace(); } e.printStackTrace(); } }