10. Zookeeper JavaAPI-Watcher监听节点变化

宇文修文
2023-12-01

Zookeeper 可以监听节点数据变化或子节点数量变化. 当监听的事件发生后, 会通知监听器对事件做相应的处理.

1. Zookeeper监听

1.1 监听器类型

从监听器范围来区分, Zookeeper 监听器分为两种, 一种是全局监听器,一种是局部监听器.

  • 全局监听器: 创建Zookeeper 连接时指定, 全局唯一. 一旦注册不能删除, 默认无限监听事件.
  • 局部监听器: 对节点进行操作时, 指定具体场景的监听器. 默认一次注册监听, 只能处理一次事件触发.

1.2 事件类型

Zookeeper 监听的事件类型有四种:

  • EventType.NodeCreated: 节点创建事件
  • EventType.NodeDeleted: 节点删除事件
  • EventType.NodeDataChanged: 节点数据内容变更
  • EventType.NodeChildrenChanged: 子节点数量发生变化, 新增或删除子节点. 子节点内容发生变化, 不会触发.

2. 使用全局监听器监听

  • 一条Zookeeper 连接只有一个全局监听器, 在创建Zookeeper对象时指定
  • 全局监听器中监听的事件, 只能注册, 不能删除. 除非断开连接, 重新连接. 因此要慎用
  • 全局监听器中监听的事件, 会循环监听. 并非触发一次就终止了.
  • 全局监听器中需要根据节点路径和事件类型综合判断来做相应的业务逻辑.

2.1 全局监听器API

  • 使用全局监听器的API 都会传一个boolean 类型为true的参数, 声明此操作需要监听, 会在全局监听器中注册一条监听信息, 但不能删除.
  • 当监听的事件发生后, 会触发全局监听器进行处理. 因此需要在全局监听器中根据节点路径和事件类型做判断.
public byte[] getData(final String path, Boolean watcher, Stat stat);
public List<String> getChildren(final String path, Boolean watcher);
public Stat exists(final String path, Boolean watcher);
....

2.2 测试用例

public class TestWatch {

    // zookeeper 地址, 多个地址以逗号割开
    private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    // 会话超时时间
    private static int sessionTimeout = 3000;

    // zookeeper 客户端连接
    private static ZooKeeper zooKeeper;

    @BeforeClass
    public static void init() throws Exception{
        // 创建zookeeper 连接时, 设置全局监听器
        zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{
            try {
                // 获取发生事件的节点路径
                String path = watchedEvent.getPath();

                // 根据节点路径和事件类型做不同的业务的处理
                if ("/".equals(path) && Watcher.Event.EventType.NodeChildrenChanged.equals(watchedEvent.getType())) {

                    // 获取数据, 并继续监控
                    List<String> children = zooKeeper.getChildren("/", true);
                    System.out.println("系统主监控-根节点/下的子节点: " + children);
                }


            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    // 获取节点时, 不指定监控器Watcher ,使用默认的Watcher
    // 获取节点时, 声明需要监控节点数量, 但并为指定监控器. 因此, 子节点数量发生变化时, 会交由创建Zookeeper 连接时, 设置的Watcher 处理
    // getChildren 只注册监听NodeChildrenChanged 事件, 即当自节点数量发生变化时, 会通知.
    @Test
    public void test_getChildren() throws KeeperException, InterruptedException {

        // 一旦注册便会一直监听,只要有变化, 就会通知全局监听器
        zooKeeper.getChildren("/", true);

        // 线程休眠, 否则不能监控到数据
        Thread.sleep(Long.MAX_VALUE);
    }
}

2.3 使用Shell 客户端添加节点

[zk: 127.0.0.1:2181(CONNECTED) 84] create /h ""
Created /h
[zk: 127.0.0.1:2181(CONNECTED) 85] create /hh ""
Created /hh

2.4 控制台输出

新增一个节点, 打印一条日志.

系统主监控-根节点/下的子节点: [zookeeper, h]
系统主监控-根节点/下的子节点: [zookeeper, hh]

3. 局部监听器

3.1 局部监听API

  • 局部监听器都需要传入一个Watcher 类型的监听器.
  • 当监听的事件发生后, 会同自定义的Watcher处理.
  • 默认情况下, 一次监听注册只能处理一次事件变更. 如果需要持续监听, 则需要在处理逻辑中再次监听.
public byte[] getData(final String path, Watcher watcher, Stat stat);
public List<String> getChildren(final String path, Watcher watcher);
public Stat exists(final String path, Watcher watcher);
....

3.2 测试用例

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;

/**
 * @Description: 测试监听
 * @author: zongf
 * @date: 2019-02-17 10:20
 */
public class TestWatch {

    // zookeeper 地址, 多个地址以逗号割开
    private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    // 会话超时时间
    private static int sessionTimeout = 3000;

    // zookeeper 客户端连接
    private static ZooKeeper zooKeeper;

    @BeforeClass
    public static void init() throws Exception{
        // 创建zookeeper 连接时, 设置全局监听器
        zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{});
    }

   
    // 获取节点时, 指定监控器Watcher. 当有事件发生时, 会由自定义的Watcher 处理而非系统监控器处理
    // getChildren 指定的监控器Watcher 只能注册监控odeChildrenChanged事件.
    @Test
    public void test_getChildren_Watch() throws KeeperException, InterruptedException {

        final String nodePath = "/";

        List<String> children = zooKeeper.getChildren(nodePath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 获取数据, 并继续监控. this 表示将监控器本身作为监控器继续监控
                try {
                    List<String> childs = zooKeeper.getChildren(nodePath, this);
                    System.out.println("自定义节点监控-根节点/下的子节点: " + childs + ", event-type:" + event.getType());
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

        // 线程休眠, 否则不能监控到数据
        Thread.sleep(Long.MAX_VALUE);
    }


    // 获取数据时, 添加监控, 只能监控NodeDataChanged 事件. 只能监控一次
    @Test
    public void test_getData_watch() throws KeeperException, InterruptedException {
        final String nodePath = "/node-anyone";

        zooKeeper.getData(nodePath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    // this表示获取节点数据时再次使用此watcher监控, 这样之后节点数据再发生变化,依然可以监控到
                    byte[] data = zooKeeper.getData(nodePath, this, new Stat());
                    System.out.println("event:" + event + ", data:" + new String(data));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, new Stat());

        // 线程休眠, 否则不能监控到数据
        Thread.sleep(Long.MAX_VALUE);
    }

    // 监控节点是否存在, 只监控一次.
    @Test
    public void test_exsists_watch() throws KeeperException, InterruptedException {
        zooKeeper.exists("/host1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {

                System.out.println("节点:/host1 存在, event-type:" + event.getType());
            }
        });

        // 线程休眠, 否则不能监控到数据
        Thread.sleep(Long.MAX_VALUE);
    }
}

3.2 测试

测试方式也是一样, 借助Shell 命令操作Zookeeper节点, 然后观察控制台输出接口.

 类似资料: