HealthMonitor是一个周期性工作的后台线程,它在一个循环中周期性的同HA服务进行心跳,负责跟踪NameNode服务的健康状况,并在健康状况变化时调用failover控制器的回调方法。
HealthMonitor内部有如下主要成员变量:
1)后台工作线程:真正卖力干活的
private Daemon daemon;// 后台工作线程
2)一些时间间隔、rpc超时参数等配置信息,见注释
// 重连、周期性检查、失连后的睡眠时间等时间间隔
private long connectRetryInterval;
private long checkIntervalMillis;
private long sleepAfterDisconnectMillis;
// rpc超时时间
private int rpcTimeout;
// 配置信息
private final Configuration conf;
3)连接代理相关
/** The connected proxy */
// 连接代理
private HAServiceProtocol proxy;
/** The HA service to monitor */
// HA服务到监视器的对象,通过它来获取连接代理proxy,代表一个客户端HA管理命令的目标。
private final HAServiceTarget targetToMonitor;
4)监视器s,状态发生变化时需要周知监视器s,故它们都实现了相同的接口
/**
* Listeners for state changes
* 状态变更监视器列表,里面的元素是实现Callback的对象
*/
private List<Callback> callbacks = Collections.synchronizedList(
new LinkedList<Callback>());
/**
* 服务状态监视器列表,里面的元素是实现ServiceStateCallback的对象
*/
private List<ServiceStateCallback> serviceStateCallbacks = Collections
.synchronizedList(new LinkedList<ServiceStateCallback>());
5)状态
// 服务状态,默认为正在初始化
private State state = State.INITIALIZING;
/**
* 最后一次服务状态,默认为正在初始化
*/
private HAServiceStatus lastServiceState = new HAServiceStatus(
HAServiceState.INITIALIZING);
2.1、状态
HealthMonitor定义了几个状态,如下:
@InterfaceAudience.Private
public enum State {
/**
* The health monitor is still starting up.
* 健康监视器正在启动
*/
INITIALIZING,
/**
* The service is not responding to health check RPCs.
* 健康监测RPCs服务没有响应
*/
SERVICE_NOT_RESPONDING,
/**
* The service is connected and healthy.
* 服务已连接且健康
*/
SERVICE_HEALTHY,
/**
* The service is running but unhealthy.
* 服务正在运行但是不健康
*/
SERVICE_UNHEALTHY,
/**
* The health monitor itself failed unrecoverably and can
* no longer provide accurate information.
* 健康监视器自己发生不可恢复故障且不能再提供准确信息
*/
HEALTH_MONITOR_FAILED;
}
分别对应了HealthMonitor监视服务健康状况过程中的一些状态,如下:
1)INITIALIZING:健康监视器正在启动;
2)SERVICE_NOT_RESPONDING:健康监测RPCs服务没有响应,有可能是连接、rpc通讯问题等等;
3)SERVICE_HEALTHY:服务已连接且健康,这是一种比较理想的结果;
4)SERVICE_UNHEALTHY:服务正在运行但是不健康,这个结果也不赖,至少知道了服务不健康,也属于正常的检测结果;
5)HEALTH_MONITOR_FAILED:健康监视器自己发生不可恢复故障且不能再提供准确信息,糟糕透顶,监视线程本身出故障了。
2.2、监视器回调接口
HealthMonitor定义了两个接口,Callback和ServiceStateCallback,分别是在状态发生变更和服务状态发生变更时的,需要周知所有监听器的回调方法,如下:
/**
* Callback interface for state change events.
* 状态变更事件的回调接口,需要实现enteredState()方法,即进入一种新的状态
*
* This interface is called from a single thread which also performs
* the health monitoring. If the callback processing takes a long time,
* no further health checks will be made during this period, nor will
* other registered callbacks be called.
* 这个接口被一个执行健康监测的特定线程调用。如果回调处理需要很长时间,在此期间没有进一步的健康检查进行,其他注册回调函数也不会被调用。
*
* If the callback itself throws an unchecked exception, no other
* callbacks following it will be called, and the health monitor
* will terminate, entering HEALTH_MONITOR_FAILED state.
*
*
*/
static interface Callback {
void enteredState(State newState);
}
/**
* Callback interface for service states.
* 服务状态的回调接口
*/
static interface ServiceStateCallback {
void reportServiceStatus(HAServiceStatus status);
}
至于上述状态和接口如何调用,在下面会详解。
HealthMonitor的工作流程,主要看内部工作线程的执行,如下:
/**
* 监视器后台线程
*
*/
private class MonitorDaemon extends Daemon {
/**
* 私有内部类的私有构造函数
*/
private MonitorDaemon() {
super();
// 设置线程名称
setName("Health Monitor for " + targetToMonitor);
// 设置异常处理:调用enterState()方法,并传入健康监控器自己失败的状态
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.fatal("Health monitor failed", e);
enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED);
}
});
}
/**
* 线程核心run()方法
*/
@Override
public void run() {
while (shouldRun) {// 在一个while循环内,循环的条件是shouldRun标志位
try {
loopUntilConnected();// 循环直到连接上
doHealthChecks();// 执行健康检查
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}
}
首先,监视后台线程的核心run()方法,在一个while循环内,循环的条件是shouldRun标志位:
1)循环直到连接上:不停的连接,直到连接上;
2)执行健康检查。
如此简单的两步,下面我们需要看下以下问题:
1)如何连接?
2)如何执行健康检查,得到检查结果后如何处理?
1、如何连接?
分析loopUntilConnected()方法如下:
/**
* 循环直到连接上
*/
private void loopUntilConnected() throws InterruptedException {
// 尝试连接
tryConnect();
// tryConnect()成功的话,proxy应该不为null
// proxy为null,说明还需要重试,在一个while循环内进行,循环的条件就是proxy为null
while (proxy == null) {
// 线程休眠一段时间
Thread.sleep(connectRetryInterval);
// 再次尝试连接
tryConnect();
}
// 最后的assert需要确保proxy不为null
assert proxy != null;
}
private void tryConnect() {
// 仅当代理proxy为空时才尝试连接
Preconditions.checkState(proxy == null);
try {
synchronized (this) {// 同步代码块内创建代理proxy
proxy = createProxy();
}
} catch (IOException e) {
// 代理proxy创建发生异常时,proxy设置为null,方便下次重试,并且调用enterState()方法,确定状态为健康监测RPCs服务没有响应
LOG.warn("Could not connect to local service at " + targetToMonitor +
": " + e.getMessage());
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
}
}
/**
* Connect to the service to be monitored. Stubbed out for easier testing.
*/
protected HAServiceProtocol createProxy() throws IOException {
// 通过目标到监视器的对象targetToMonitor获取代理
return targetToMonitor.getProxy(conf, rpcTimeout);
}
很简单,通过targetToMonitor获得代理,获取成功的话,直接返回,失败的话线程休眠一段时间,继续重试。
2、如何执行健康检查,得到检查结果后如何处理?
分析doHealthChecks()方法如下:
在一个while循环内,循环的依据同样是shouldRun标志位为true,且线程会周期性休眠:
1)HA服务状态status设置为null;
2)标志位healthy默认为false,即不健康;
3)获取服务代理的服务状态,并由代理执行健康检查;
4)1、如果健康检查能够正确返回,标志位healthy设置为true,表明服务健康;
2、抛出了异常:
2.1、如果是健康检查失败异常,调用enterState()方法,确定状态为服务正在运行但是不健康;
2.1、否则调用enterState()方法,确定状态为健康监测RPCs服务没有响应,停止并清空代理,线程休眠一段时间,避免异常情况下没有必要的重复尝试;
5)设置上次服务状态;
6)如果检测结果为健康,则调用enterState()方法,确定状态为服务已连接且健康;
7)工作线程周期性休眠。
可以看到,健康状况的检查是通过代理完成的,且,如果有检查结果(无论是正常还是不正常,这里的正常是指返回了明确的结果,不正常只通讯、连接或者线程本身出现问题),会通过enterState()方法通知监听器s,如下:
private synchronized void enterState(State newState) {
// 如果状态变更,调用所有监听器的enteredState()方法
if (newState != state) {
LOG.info("Entering state " + newState);
state = newState;
synchronized (callbacks) {
for (Callback cb : callbacks) {
cb.enteredState(newState);
}
}
}
}
而整体HA服务状态也会通过setLastServiceStatus()方法,设置lastServiceState成员变量,并通知服务状态监听器,如下:
// 设置上次服务状态,同步所有ServiceStateCallback服务状态监听器的服务状态
private synchronized void setLastServiceStatus(HAServiceStatus status) {
this.lastServiceState = status;
for (ServiceStateCallback cb : serviceStateCallbacks) {
cb.reportServiceStatus(lastServiceState);
}
}
这也就是上述状态和接口存在的意义。而监听器的注册和注销则是通过如下实现的:
/**
* 以下四个为增减状态、服务状态监视器方法
*/
public void addCallback(Callback cb) {
this.callbacks.add(cb);
}
public void removeCallback(Callback cb) {
callbacks.remove(cb);
}
public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
this.serviceStateCallbacks.add(cb);
}
public synchronized void removeServiceStateCallback(ServiceStateCallback cb) {
serviceStateCallbacks.remove(cb);
}
HealthMonitor的构造及启动是在ZKFailoverController的initHM()方法内完成的,它是整个Hadoop HDFS HA中的一个控制组件,初始化及启动如下:
// ------------------------------------------
// Begin actual guts of failover controller
// ------------------------------------------
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}
构造一个HealthMonitor实例,传入的代理为localTarget,并注册了一个state change的监听器HealthCallbacks和service state change的监听器ServiceStateCallBacks,然后调用start()方法启动。
至于HealthMonitor如何与其它组件一起工作的,如何获得的代理,代理怎么执行监控检查,监听器如何处理健康检查的结果,请关注后续文章。