Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。
当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息(Watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果。
特性 | 说明 |
---|---|
一次性 | watcher 是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
客户端顺序回调 | watcher 回调是顺序串行的,只有回调后客户端才能看到最新数据状态。一个 watcher 回调逻辑不应太多,以免影响其他 watcher 的执行 |
轻量级 | watchEvent 是最小的通信单位,结构上只包含通知状态、事件类型和节点路径。,并不会告诉节点变化前后的具体内容 |
时效性 | watcher 只有在当前session 彻底失效时才会无效,若session 有效期内快速重连成功,则 watcher 依然存在,仍可接收通知 |
ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理Watcher 和客户端回调 Watcher客户端。
注册 watcher 有 3 种方式,exists、getData、getChildren
事件类型 | 说明 |
---|---|
None | 客户端与服务端成功建立会话 |
NodeCreated | 节点创建事件 |
NodeDeleted | 节点删除事件 |
NodeDataChanged | 节点数据变化事件 |
NodeChildrenChanged | 子节点变化事件(创建、删除) |
POM的JAR版本根据使用Zookeeper服务的版本而定(zookeeper-3.6.2)
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.data.Stat;
import org.jboss.netty.util.internal.StringUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class Zk_Watcher {
ZooKeeper zk;
@Before
public void init() throws IOException, KeeperException, InterruptedException {
zk= new ZooKeeper("127.0.0.1:2181", Integer.MAX_VALUE,new Watcher() {
//全局监听
public void process(WatchedEvent watchedEvent) {
//客户端回调Watcher
System.out.println("-----------------------------------------");
System.out.println("连接状态:" + watchedEvent.getState());
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("节点路径:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
}
}
);
}
/**
* exists监听事件:
* NodeCreated:节点创建
* NodeDeleted:节点删除
* NodeDataChanged:节点内容变化
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test1() throws IOException, KeeperException, InterruptedException {
//exists注册监听
zk.exists("/watcher-exists", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("连接状态:" + watchedEvent.getState());
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("节点路径:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.exists("/watcher-exists",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//不开启ACL,以持久化自动生成序列方式创建
zk.create("/watcher-exists", "watcher-exists".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//通过修改的事务类型操作来触发监听事件
zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1);
//删除节点看看能否触发监听事件
zk.delete("/watcher-exists", -1);
Thread.sleep(Integer.MAX_VALUE);
}
/**
* getData监听事件:
* NodeDeleted:节点删除
* NodeDataChange:节点内容发生变化
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test2() throws IOException, KeeperException, InterruptedException {
//不开启ACL,以持久化自动生成序列方式创建
zk.create("/watcher-getData", "watcher-getData".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//getData注册监听
zk.getData("/watcher-getData", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("连接状态:" + watchedEvent.getState());
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("节点路径:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.exists("/watcher-getData",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},null);
//通过修改的事务类型操作来触发监听事件
zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1);
///删除节点看看能否触发监听事件
zk.delete("/watcher-getData", -1);
//Thread.sleep(Integer.MAX_VALUE);
}
/**
* getChildren监听事件:
* NodeChildrenChanged:子节点发生变化(创建,删除)
* NodeDeleted:节点删除
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test3() throws IOException, KeeperException, InterruptedException {
zk.create("/watcher-getChildren",null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//getChildren注册监听
zk.getChildren("/watcher-getChildren", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("连接状态:" + watchedEvent.getState());
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("节点路径:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.getChildren("/watcher-getChildren",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点
zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点
zk.setData("/watcher-getChildren","ccccc".getBytes(),-1);
zk.delete("/watcher-getChildren", -1);//删除根节点
}
}