1 SessionConnectionStateListener
/**
* 监听Session连接状态
*/
public class SessionConnectionStateListener implements ConnectionStateListener
{
private static final Logger log = LoggerFactory.getLogger(SessionConnectionStateListener.class);
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState)
{
if (connectionState == ConnectionState.LOST)
{
log.info("ConnectionState: lost");
try
{
for (;;)
{
if (curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut())
{
log.info("reconnection: success");
break;
}
}
return;
}
catch (InterruptedException e)
{
log.info("reconnection: failure");
}
catch (Exception e)
{
log.info("reconnection: failure");
}
}
}
}
2 AbstractNodeCacheListener 和 MyNodeCacheListener
public abstract class AbstractNodeCacheListener implements NodeCacheListener {
private static final Logger log = LoggerFactory.getLogger(MyNodeCacheListener.class);
private NodeCache nodeCache;
public AbstractNodeCacheListener(){
}
public AbstractNodeCacheListener(NodeCache nodeCache) {
this.nodeCache = nodeCache;
}
public NodeCache getNodeCache() {
return nodeCache;
}
public void setNodeCache(NodeCache nodeCache) {
this.nodeCache = nodeCache;
}
}
MyNodeCacheListener
/**
* 监听本节点
*/
public class MyNodeCacheListener extends AbstractNodeCacheListener {
private static final Logger log = LoggerFactory.getLogger(MyNodeCacheListener.class);
public MyNodeCacheListener() {
super();
}
public MyNodeCacheListener(NodeCache nodeCache) {
super(nodeCache);
}
@Override
public void nodeChanged() throws Exception {
ChildData childData = this.getNodeCache().getCurrentData();
/**
* 创建和更新 childData 不为 null
* 删除 childData 为 null
*/
if(childData != null){
log.info("Path: " + childData.getPath());
log.info("Stat:" + childData.getStat());
log.info("Data: "+ new String(childData.getData(),"utf-8"));
}
}
}
3 MyPathChildrenCacheListener
/**
* 监听节点的所有子目录
*/
public class MyPathChildrenCacheListener implements PathChildrenCacheListener {
private static final Logger log = LoggerFactory.getLogger(MyPathChildrenCacheListener.class);
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case INITIALIZED:
//必须异步模式 PathChildrenCache.StartMode.POST_INITIALIZED_EVENT
log.info("子节点初始化成功");
break;
case CHILD_ADDED:
log.info("添加子节点路径:" + event.getData().getPath());
log.info("添加子节点数据:" + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
log.info("修改子节点路径:" + event.getData().getPath());
log.info("修改子节点数据:" + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
log.info("删除子节点:" + event.getData().getPath());
break;
case CONNECTION_LOST:
log.info("连接丢失");
break;
case CONNECTION_SUSPENDED:
log.info("连接被挂起");
break;
case CONNECTION_RECONNECTED:
log.info("恢复连接");
break;
}
}
}
4 MyTreeCacheListener
/**
* 监听本节点和子目录
*/
public class MyTreeCacheListener implements TreeCacheListener {
private static final Logger log = LoggerFactory.getLogger(MyTreeCacheListener.class);
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (childData != null) {
//System.out.println("Path: " + childData.getPath());
//System.out.println("Stat:" + childData.getStat());
//System.out.println("Data: " + new String(childData.getData()));
}
switch (event.getType()) {
case INITIALIZED:
log.info("节点初始化成功");
break;
case NODE_ADDED:
log.info("添加节点路径:" + event.getData().getPath());
log.info("节点数据:" + new String(event.getData().getData()));
break;
case NODE_UPDATED:
log.info("修改节点路径:" + event.getData().getPath());
log.info("修改节点数据:" + new String(event.getData().getData()));
break;
case NODE_REMOVED:
log.info("删除节点:" + event.getData().getPath());
break;
case CONNECTION_LOST:
System.out.println("连接丢失");
break;
case CONNECTION_SUSPENDED:
System.out.println("连接被挂起");
break;
case CONNECTION_RECONNECTED:
System.out.println("恢复连接");
break;
}
}
}
5 测试代码
public static void testNodeCacheListener(){
ZkUtil.initialize();
String parent = "/zkconfig";
String child = "chynode";
String path = parent + "/" + child;
MyNodeCacheListener myNodeCacheListener=new MyNodeCacheListener();
//注册当前节点监听器
ZkUtil.registerNodeCacheListener(path,myNodeCacheListener);
//如果不存在节点增加节点
try {
if (!ZkUtil.checkExist(path)) {
ZkUtil.addNode(parent, child, "chy");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void testPathChildrenCacheListener(){
ZkUtil.initialize();
String parent = "/zkconfig";
String child = "chynode";
String path = parent + "/" + child;
MyPathChildrenCacheListener myPathChildrenCacheListener=new MyPathChildrenCacheListener();
//子节点监听器
ZkUtil.registerPathChildListener(parent,myPathChildrenCacheListener);
//如果不存在节点增加节点
try {
if (!ZkUtil.checkExist(path)) {
ZkUtil.addNode(parent, child, "chy");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void testTreeCacheListener(){
ZkUtil.initialize();
String parent = "/zkconfig";
String child = "chynode";
String path = parent + "/" + child;
MyTreeCacheListener treeCacheListener = new MyTreeCacheListener();
// 0 只监听 /zkconfig
//ZkUtil.registerTreeCacheListener(parent, 0, treeCacheListener);
// 1 监听 /zkconfig 和 下一级节点
ZkUtil.registerTreeCacheListener(parent, 1, treeCacheListener);
//如果不存在节点增加节点
try {
if (!ZkUtil.checkExist(path)) {
ZkUtil.addNode(parent, child, "chy");
}
if (ZkUtil.checkExist(parent)) {
ZkUtil.updateNodeData(parent, "root");
}
} catch (Exception e) {
e.printStackTrace();
}
}