当前位置: 首页 > 工具软件 > F-Curator > 使用案例 >

Zookeeper实战-使用Curator实现Leader选举

子车凌龙
2023-12-01

分布式系统中,对于一个复杂的任务,我们经常会选举一个Master节点,借助zk我们可以轻易的实现此项任务,在kafka以及spark的standalone模式下都是使用zk来实现Master选举,本文先从原理方面介绍为什么zk可以实现Master选举,然后利用curator的api进行Master选举实战。

zk进行Master选举原理

zookeeper的节点有两种类型: 持久节点和临时节点。临时节点有个非常重要的性质,如果注册这个节点的机器失去连接,那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,作为从机存在。同时,抢主失败的当前服务器需要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操作。Maser选举的过程,其实就是简单的争抢在zookeeper注册临时节点的操作,谁注册了约定的临时节点,谁就是master

Master选举实战

curator对原生api进行了封装,将节点创建,时间监听和自动选举过程进行了封装,我们只需要调用API即可实现Master选举。本文分别使用LeaderSelector和LeaderLatch两种方式,进行server模拟真实的运行场景。

LeaderSelector

LeaderSelector是利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader。

  1. LeaderSelector

    org.apache.curator.framework.recipes.leader.LeaderSelector
    //开始抢主
    void start()
    //在抢到leader权限并释放后,自动加入抢主队列,重新抢主
    void autoRequeue()
    
  2. LeaderSelectorListener是LeaderSelector客户端节点成为Leader后回调的一个监听器,在takeLeadership()回调方法中编写获得Leader权利后的业务处理逻辑。

    `org.apache.curator.framework.recipes.leader.LeaderSelectorListener`
    //抢主成功后的回调
    void takeLeadership()
    
  3. LeaderSelectorListenerAdapter是实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类

Server

我们模拟的server继承LeaderSelectorListenerAdapter,该监听器会在获取到Master权利时候回调改监听器,执行takeLeadership方法,在这里我们只打印改server的名字,当一个实例成为Master后,其他实例会进入等待,直到当前Master挂了或者退出后才会进行新的选举。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

import java.io.Closeable;

public class Server extends LeaderSelectorListenerAdapter implements Closeable {
    /** server name **/
    private final String serverName;

    /** listener */
    private final LeaderSelector leaderSelector;

    /** takeLeadership方法中设置线程阻塞多长时间,单位ms */
    private final int SLEEP_MILLISECOND = 100000;

    public Server(CuratorFramework client, String path, String serverName) {
        this.serverName = serverName;
        /** client, zk-path, listener */
        leaderSelector = new LeaderSelector(client, path, this);
        /** 允许多次参与选主**/
        leaderSelector.autoRequeue();
    }

    public void start() {
        leaderSelector.start();
        System.out.println(getServerName() + "开始运行了!");
    }

    @Override
    public void close() {
        leaderSelector.close();
        System.out.println(getServerName() + "释放资源了!");
    }

    @Override
    public void takeLeadership(CuratorFramework client) {
        try {
            System.out.println(getServerName() + "是Master, 执行到takeLeadership()方法了!");
            Thread.sleep(SLEEP_MILLISECOND); // sleep等待测试程序关闭改server,中断处理
        } catch (InterruptedException e) {
            System.err.println(getServerName() + " was interrupted!");
            Thread.currentThread().interrupt();
        }
    }

    public String getServerName() {
        return serverName;
    }

    public LeaderSelector getLeaderSelector() {
        return leaderSelector;
    }
}

测试

我们用四个线程来模拟实际的场景,当第一个server启动时候会竞争.

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MasterElectorMain {

    /** 重试策略:重试间隔时间为1000ms; 最多重试3次; */
    private static RetryPolicy retryPolicy = new RetryNTimes(3, 1000);

    public static void main(String[] args) throws InterruptedException {
        List<CuratorFramework> clients = new ArrayList<CuratorFramework>(16);

        List<Server> workServers = new ArrayList<Server>(16);

        String name = "server-";

        try {
            // 模拟是个server,进行争抢master
            for (int i = 0; i < 4; i++) {
                CuratorFramework client = CuratorFrameworkFactory.builder().
                        connectString("localhost:2181").
                        sessionTimeoutMs(5000).
                        connectionTimeoutMs(5000).
                        retryPolicy(retryPolicy).
                        build();
                clients.add(client);
                Server server = new Server(client, "/zk-master", name + i);
                workServers.add(server);
                client.start();
                server.start();
            }
        } finally {
            System.out.println("开始关闭!");
            Server server;
            // 关闭server会进行master切换
            for (int i = 0; i < workServers.size(); i++) {
                server = workServers.get(i);
                CloseableUtils.closeQuietly(server);
                // 为方便观察,这里阻塞几秒
                TimeUnit.SECONDS.sleep(3);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

运行

程序运行结果:

server-0开始运行了!
server-0是Master, 执行到takeLeadership()方法了!
server-1开始运行了!
server-2开始运行了!
server-3开始运行了!
开始关闭!
server-0 was interrupted!
server-0释放资源了!
server-1是Master, 执行到takeLeadership()方法了!
server-1 was interrupted!
server-1释放资源了!
server-2是Master, 执行到takeLeadership()方法了!
server-2 was interrupted!
server-2释放资源了!
server-3是Master, 执行到takeLeadership()方法了!
server-3释放资源了!
server-3 was interrupted!

运行过程中我们使用zkCli查看我们指定的path下的节点信息。

[zk: localhost:2181(CONNECTED) 7] ls /zk-master
[_c_a38fbc7c-11e5-4a6d-8658-a74d6ea8fa97-lock-0000000021, _c_2813ae94-6827-4a76-841e-71c77e523637-lock-0000000022, _c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023] // 此时四个server都启动了

[zk: localhost:2181(CONNECTED) 8] ls /zk-master
[_c_2813ae94-6827-4a76-841e-71c77e523637-lock-0000000022, _c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023] // 此时第一个server退出了

[zk: localhost:2181(CONNECTED) 9] ls /zk-master
[_c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023]  // 第二个server退出了

[zk: localhost:2181(CONNECTED) 12] ls /zk-master
[_c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024]  // 第三个server推出了 

[zk: localhost:2181(CONNECTED) 15] ls /zk-master
[] // 全部退出

可以看出来,符合我们的预期,当master节点意外退出时候会重新进行选举,得到新的Master

LeaderLatch

LeaderLatch实现Master选举的原理&步骤如下:

  1. 创建节点:所有主机节点在同一个目录下创建一个顺序临时z节点,Zookeeper会在路径后添加10位顺序号;
  2. Leader选择:z节点中顺序号最小的主机会被成为Leader,其他的节点都是follower,每一台follower主机节点监控下一个具有最小顺序号的z节点;
  3. 宕机处理:如果Leader宕机了,对应的z节点会被删除,按顺序下一个follower主机节点会通过对Leader注册的监控得到Leader已经移除的消息[开始会注册监控,回调函数接收到Leader挂掉的消息];
  4. Leader重新选取:按顺序下一个follower主机节点会检查是否还有别的具有最小顺序号的z节点,如果没有,他就会成为新的Leader。否则,就会查找创建最小z节点的主机节点作为新的leader;所有其他follower节点也会选择创建最小顺序号z节点的主机节点作为Leader。

leaderLatch介绍

  1. LeaderLatch关键方法

    org.apache.curator.framework.recipes.leader.LeaderLatch
    //调用start方法开始抢主
    void start()
    
    //调用close方法释放leader权限
    void close()
    
    //await方法阻塞线程,尝试获取leader权限,但不一定成功,超时失败
    boolean await(long, java.util.concurrent.TimeUnit)
    
    //判断是否拥有leader权限
    boolean hasLeadership() 
    
    2. 
    
  2. LeaderLatchListener是LeaderLatch客户端节点成为Leader后的回调方法,有isLeader(),notLeader()两个方法

    org.apache.curator.framework.recipes.leader.LeaderLatchListener
    //抢主成功时触发
    void isLeader()
    
    //抢主失败时触发
    void notLeader()     
    

server

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

import java.io.Closeable;
import java.io.IOException;

public class LeaderLatchServer implements LeaderLatchListener , Closeable {
    /** server name **/
    private final String serverName;

    private LeaderLatch leaderLatch;

    LeaderLatchServer(CuratorFramework client, String path, String serverName) {
        this.serverName = serverName;
        /** 传入客户端、监听路径、监听器 */
        leaderLatch = new LeaderLatch(client, path);
        leaderLatch.addListener(this);
    }

    public void start() throws Exception {
        leaderLatch.start();
        System.out.println(serverName + " started done");
    }

    public void close() throws IOException {
        leaderLatch.close();
    }

    @Override
    public void isLeader() {
        synchronized(this) {
            // could have lost leadership by now.
            if (!leaderLatch.hasLeadership()) {
                return ;
            }
            System.out.println(serverName + " has gained leadership");
        }
    }

    @Override
    public void notLeader() {
        synchronized(this) {
            // could have gained leadership by now.
            if (leaderLatch.hasLeadership()) {
                return;
            }
            System.out.println(serverName + " has lost leadership");
        }
    }
}

测试

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MasterTestMain {

    /** 重试策略:重试间隔时间为1000ms; 最多重试3次; */
    private static RetryPolicy retryPolicy = new RetryNTimes(3, 1000);

    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = new ArrayList<CuratorFramework>(16);

        List<LeaderLatchServer> workServers = new ArrayList<LeaderLatchServer>(16);

        String name = "server-";

        try {
            // 模拟是个server,进行争抢master
            for (int i = 0; i < 4; i++) {
                CuratorFramework client = CuratorFrameworkFactory.builder().
                        connectString("localhost:2181").
                        sessionTimeoutMs(5000).
                        connectionTimeoutMs(5000).
                        retryPolicy(retryPolicy).
                        build();
                clients.add(client);
                LeaderLatchServer server = new LeaderLatchServer(client, "/zk-master", name + i);
                workServers.add(server);
                client.start();
                server.start();
            }
        } finally {
            System.out.println("开始关闭!");
            LeaderLatchServer server;
            // 关闭server会进行master切换
            for (int i = 0; i < workServers.size(); i++) {
                server = workServers.get(i);
                CloseableUtils.closeQuietly(server);
                // 为方便观察,这里阻塞几秒
                TimeUnit.SECONDS.sleep(3);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

运行

程序员性结果:

server-0 started done
server-1 started done
server-2 started done
server-3 started done
开始关闭!
server-1 has gained leadership
server-2 has gained leadership
server-3 has gained leadership

zookeeper客户端观察节点情况:

[zk: localhost:2181(CONNECTED) 30] ls /zk-master
[]
[zk: localhost:2181(CONNECTED) 30] ls /zk-master  // 三个server启动,最小的竞争为主
[_c_cbaa899b-3e04-47fc-a5da-3fa0147d00bd-latch-0000000029, _c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030, _c_679f028d-1733-4cf8-a400-50c86130a71b-latch-0000000028]
[zk: localhost:2181(CONNECTED) 23] ls /zk-master  // 最小序号的server退出,次小竞争为主
[_c_cbaa899b-3e04-47fc-a5da-3fa0147d00bd-latch-0000000029, _c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030]
[zk: localhost:2181(CONNECTED) 26] ls /zk-master  // 29退出,30上位
[_c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030]
[zk: localhost:2181(CONNECTED) 30] ls /zk-master
[]

参考

  1. https://blog.csdn.net/justry_deng/article/details/84584191
  2. 从Paxo到Zookeeper: 分布式一致性原理与实战
  3. https://blog.csdn.net/hosaos/article/details/88727817
 类似资料: