Zookeeper 可以监听节点数据变化或子节点数量变化. 当监听的事件发生后, 会通知监听器对事件做相应的处理.
从监听器范围来区分, Zookeeper 监听器分为两种, 一种是全局监听器,一种是局部监听器.
Zookeeper 监听的事件类型有四种:
public byte[] getData(final String path, Boolean watcher, Stat stat);
public List<String> getChildren(final String path, Boolean watcher);
public Stat exists(final String path, Boolean watcher);
....
public class TestWatch {
// zookeeper 地址, 多个地址以逗号割开
private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
// 会话超时时间
private static int sessionTimeout = 3000;
// zookeeper 客户端连接
private static ZooKeeper zooKeeper;
@BeforeClass
public static void init() throws Exception{
// 创建zookeeper 连接时, 设置全局监听器
zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{
try {
// 获取发生事件的节点路径
String path = watchedEvent.getPath();
// 根据节点路径和事件类型做不同的业务的处理
if ("/".equals(path) && Watcher.Event.EventType.NodeChildrenChanged.equals(watchedEvent.getType())) {
// 获取数据, 并继续监控
List<String> children = zooKeeper.getChildren("/", true);
System.out.println("系统主监控-根节点/下的子节点: " + children);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 获取节点时, 不指定监控器Watcher ,使用默认的Watcher
// 获取节点时, 声明需要监控节点数量, 但并为指定监控器. 因此, 子节点数量发生变化时, 会交由创建Zookeeper 连接时, 设置的Watcher 处理
// getChildren 只注册监听NodeChildrenChanged 事件, 即当自节点数量发生变化时, 会通知.
@Test
public void test_getChildren() throws KeeperException, InterruptedException {
// 一旦注册便会一直监听,只要有变化, 就会通知全局监听器
zooKeeper.getChildren("/", true);
// 线程休眠, 否则不能监控到数据
Thread.sleep(Long.MAX_VALUE);
}
}
[zk: 127.0.0.1:2181(CONNECTED) 84] create /h ""
Created /h
[zk: 127.0.0.1:2181(CONNECTED) 85] create /hh ""
Created /hh
新增一个节点, 打印一条日志.
系统主监控-根节点/下的子节点: [zookeeper, h]
系统主监控-根节点/下的子节点: [zookeeper, hh]
public byte[] getData(final String path, Watcher watcher, Stat stat);
public List<String> getChildren(final String path, Watcher watcher);
public Stat exists(final String path, Watcher watcher);
....
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
/**
* @Description: 测试监听
* @author: zongf
* @date: 2019-02-17 10:20
*/
public class TestWatch {
// zookeeper 地址, 多个地址以逗号割开
private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
// 会话超时时间
private static int sessionTimeout = 3000;
// zookeeper 客户端连接
private static ZooKeeper zooKeeper;
@BeforeClass
public static void init() throws Exception{
// 创建zookeeper 连接时, 设置全局监听器
zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{});
}
// 获取节点时, 指定监控器Watcher. 当有事件发生时, 会由自定义的Watcher 处理而非系统监控器处理
// getChildren 指定的监控器Watcher 只能注册监控odeChildrenChanged事件.
@Test
public void test_getChildren_Watch() throws KeeperException, InterruptedException {
final String nodePath = "/";
List<String> children = zooKeeper.getChildren(nodePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 获取数据, 并继续监控. this 表示将监控器本身作为监控器继续监控
try {
List<String> childs = zooKeeper.getChildren(nodePath, this);
System.out.println("自定义节点监控-根节点/下的子节点: " + childs + ", event-type:" + event.getType());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 线程休眠, 否则不能监控到数据
Thread.sleep(Long.MAX_VALUE);
}
// 获取数据时, 添加监控, 只能监控NodeDataChanged 事件. 只能监控一次
@Test
public void test_getData_watch() throws KeeperException, InterruptedException {
final String nodePath = "/node-anyone";
zooKeeper.getData(nodePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
// this表示获取节点数据时再次使用此watcher监控, 这样之后节点数据再发生变化,依然可以监控到
byte[] data = zooKeeper.getData(nodePath, this, new Stat());
System.out.println("event:" + event + ", data:" + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
// 线程休眠, 否则不能监控到数据
Thread.sleep(Long.MAX_VALUE);
}
// 监控节点是否存在, 只监控一次.
@Test
public void test_exsists_watch() throws KeeperException, InterruptedException {
zooKeeper.exists("/host1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("节点:/host1 存在, event-type:" + event.getType());
}
});
// 线程休眠, 否则不能监控到数据
Thread.sleep(Long.MAX_VALUE);
}
}
测试方式也是一样, 借助Shell 命令操作Zookeeper节点, 然后观察控制台输出接口.