分布式系统中,对于一个复杂的任务,我们经常会选举一个Master节点,借助zk我们可以轻易的实现此项任务,在kafka以及spark的standalone模式下都是使用zk来实现Master选举,本文先从原理方面介绍为什么zk可以实现Master选举,然后利用curator的api进行Master选举实战。
zookeeper的节点有两种类型: 持久节点和临时节点。临时节点有个非常重要的性质,如果注册这个节点的机器失去连接,那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,作为从机存在。同时,抢主失败的当前服务器需要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操作。Maser选举的过程,其实就是简单的争抢在zookeeper注册临时节点的操作,谁注册了约定的临时节点,谁就是master。
curator对原生api进行了封装,将节点创建,时间监听和自动选举过程进行了封装,我们只需要调用API即可实现Master选举。本文分别使用LeaderSelector和LeaderLatch两种方式,进行server模拟真实的运行场景。
LeaderSelector是利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader。
LeaderSelector
org.apache.curator.framework.recipes.leader.LeaderSelector
//开始抢主
void start()
//在抢到leader权限并释放后,自动加入抢主队列,重新抢主
void autoRequeue()
LeaderSelectorListener是LeaderSelector客户端节点成为Leader后回调的一个监听器,在takeLeadership()回调方法中编写获得Leader权利后的业务处理逻辑。
`org.apache.curator.framework.recipes.leader.LeaderSelectorListener`
//抢主成功后的回调
void takeLeadership()
LeaderSelectorListenerAdapter是实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类。
我们模拟的server继承LeaderSelectorListenerAdapter
,该监听器会在获取到Master权利时候回调改监听器,执行takeLeadership
方法,在这里我们只打印改server的名字,当一个实例成为Master后,其他实例会进入等待,直到当前Master挂了或者退出后才会进行新的选举。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import java.io.Closeable;
public class Server extends LeaderSelectorListenerAdapter implements Closeable {
/** server name **/
private final String serverName;
/** listener */
private final LeaderSelector leaderSelector;
/** takeLeadership方法中设置线程阻塞多长时间,单位ms */
private final int SLEEP_MILLISECOND = 100000;
public Server(CuratorFramework client, String path, String serverName) {
this.serverName = serverName;
/** client, zk-path, listener */
leaderSelector = new LeaderSelector(client, path, this);
/** 允许多次参与选主**/
leaderSelector.autoRequeue();
}
public void start() {
leaderSelector.start();
System.out.println(getServerName() + "开始运行了!");
}
@Override
public void close() {
leaderSelector.close();
System.out.println(getServerName() + "释放资源了!");
}
@Override
public void takeLeadership(CuratorFramework client) {
try {
System.out.println(getServerName() + "是Master, 执行到takeLeadership()方法了!");
Thread.sleep(SLEEP_MILLISECOND); // sleep等待测试程序关闭改server,中断处理
} catch (InterruptedException e) {
System.err.println(getServerName() + " was interrupted!");
Thread.currentThread().interrupt();
}
}
public String getServerName() {
return serverName;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
}
我们用四个线程来模拟实际的场景,当第一个server启动时候会竞争.
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MasterElectorMain {
/** 重试策略:重试间隔时间为1000ms; 最多重试3次; */
private static RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
public static void main(String[] args) throws InterruptedException {
List<CuratorFramework> clients = new ArrayList<CuratorFramework>(16);
List<Server> workServers = new ArrayList<Server>(16);
String name = "server-";
try {
// 模拟是个server,进行争抢master
for (int i = 0; i < 4; i++) {
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString("localhost:2181").
sessionTimeoutMs(5000).
connectionTimeoutMs(5000).
retryPolicy(retryPolicy).
build();
clients.add(client);
Server server = new Server(client, "/zk-master", name + i);
workServers.add(server);
client.start();
server.start();
}
} finally {
System.out.println("开始关闭!");
Server server;
// 关闭server会进行master切换
for (int i = 0; i < workServers.size(); i++) {
server = workServers.get(i);
CloseableUtils.closeQuietly(server);
// 为方便观察,这里阻塞几秒
TimeUnit.SECONDS.sleep(3);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
程序运行结果:
server-0开始运行了!
server-0是Master, 执行到takeLeadership()方法了!
server-1开始运行了!
server-2开始运行了!
server-3开始运行了!
开始关闭!
server-0 was interrupted!
server-0释放资源了!
server-1是Master, 执行到takeLeadership()方法了!
server-1 was interrupted!
server-1释放资源了!
server-2是Master, 执行到takeLeadership()方法了!
server-2 was interrupted!
server-2释放资源了!
server-3是Master, 执行到takeLeadership()方法了!
server-3释放资源了!
server-3 was interrupted!
运行过程中我们使用zkCli查看我们指定的path下的节点信息。
[zk: localhost:2181(CONNECTED) 7] ls /zk-master
[_c_a38fbc7c-11e5-4a6d-8658-a74d6ea8fa97-lock-0000000021, _c_2813ae94-6827-4a76-841e-71c77e523637-lock-0000000022, _c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023] // 此时四个server都启动了
[zk: localhost:2181(CONNECTED) 8] ls /zk-master
[_c_2813ae94-6827-4a76-841e-71c77e523637-lock-0000000022, _c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023] // 此时第一个server退出了
[zk: localhost:2181(CONNECTED) 9] ls /zk-master
[_c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024, _c_913f84e8-852e-4612-8dfe-e2236091dde3-lock-0000000023] // 第二个server退出了
[zk: localhost:2181(CONNECTED) 12] ls /zk-master
[_c_404b9519-a71c-4799-bfe0-abae86c9acfc-lock-0000000024] // 第三个server推出了
[zk: localhost:2181(CONNECTED) 15] ls /zk-master
[] // 全部退出
可以看出来,符合我们的预期,当master节点意外退出时候会重新进行选举,得到新的Master
LeaderLatch实现Master选举的原理&步骤如下:
LeaderLatch关键方法
org.apache.curator.framework.recipes.leader.LeaderLatch
//调用start方法开始抢主
void start()
//调用close方法释放leader权限
void close()
//await方法阻塞线程,尝试获取leader权限,但不一定成功,超时失败
boolean await(long, java.util.concurrent.TimeUnit)
//判断是否拥有leader权限
boolean hasLeadership()
2.
LeaderLatchListener是LeaderLatch客户端节点成为Leader后的回调方法,有isLeader(),notLeader()两个方法
org.apache.curator.framework.recipes.leader.LeaderLatchListener
//抢主成功时触发
void isLeader()
//抢主失败时触发
void notLeader()
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import java.io.Closeable;
import java.io.IOException;
public class LeaderLatchServer implements LeaderLatchListener , Closeable {
/** server name **/
private final String serverName;
private LeaderLatch leaderLatch;
LeaderLatchServer(CuratorFramework client, String path, String serverName) {
this.serverName = serverName;
/** 传入客户端、监听路径、监听器 */
leaderLatch = new LeaderLatch(client, path);
leaderLatch.addListener(this);
}
public void start() throws Exception {
leaderLatch.start();
System.out.println(serverName + " started done");
}
public void close() throws IOException {
leaderLatch.close();
}
@Override
public void isLeader() {
synchronized(this) {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership()) {
return ;
}
System.out.println(serverName + " has gained leadership");
}
}
@Override
public void notLeader() {
synchronized(this) {
// could have gained leadership by now.
if (leaderLatch.hasLeadership()) {
return;
}
System.out.println(serverName + " has lost leadership");
}
}
}
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MasterTestMain {
/** 重试策略:重试间隔时间为1000ms; 最多重试3次; */
private static RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = new ArrayList<CuratorFramework>(16);
List<LeaderLatchServer> workServers = new ArrayList<LeaderLatchServer>(16);
String name = "server-";
try {
// 模拟是个server,进行争抢master
for (int i = 0; i < 4; i++) {
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString("localhost:2181").
sessionTimeoutMs(5000).
connectionTimeoutMs(5000).
retryPolicy(retryPolicy).
build();
clients.add(client);
LeaderLatchServer server = new LeaderLatchServer(client, "/zk-master", name + i);
workServers.add(server);
client.start();
server.start();
}
} finally {
System.out.println("开始关闭!");
LeaderLatchServer server;
// 关闭server会进行master切换
for (int i = 0; i < workServers.size(); i++) {
server = workServers.get(i);
CloseableUtils.closeQuietly(server);
// 为方便观察,这里阻塞几秒
TimeUnit.SECONDS.sleep(3);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
程序员性结果:
server-0 started done
server-1 started done
server-2 started done
server-3 started done
开始关闭!
server-1 has gained leadership
server-2 has gained leadership
server-3 has gained leadership
zookeeper客户端观察节点情况:
[zk: localhost:2181(CONNECTED) 30] ls /zk-master
[]
[zk: localhost:2181(CONNECTED) 30] ls /zk-master // 三个server启动,最小的竞争为主
[_c_cbaa899b-3e04-47fc-a5da-3fa0147d00bd-latch-0000000029, _c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030, _c_679f028d-1733-4cf8-a400-50c86130a71b-latch-0000000028]
[zk: localhost:2181(CONNECTED) 23] ls /zk-master // 最小序号的server退出,次小竞争为主
[_c_cbaa899b-3e04-47fc-a5da-3fa0147d00bd-latch-0000000029, _c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030]
[zk: localhost:2181(CONNECTED) 26] ls /zk-master // 29退出,30上位
[_c_f40439aa-beee-4ea8-b28f-f10ccd20f621-latch-0000000030]
[zk: localhost:2181(CONNECTED) 30] ls /zk-master
[]