点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
目录
在之前的章节里,我们分析了Flink主节点(逻辑JobManager)的启动过程,包括了8个基础环境的创建,核心实例工厂类的创建,以及通过工厂类构建并启动WebMonitorEndpoint、ResourceManager的过程,在这一节中我们来看最后的一部分Dispatcher的启动流程,当然在此之前还是先来复习一下JobManager的一些重要概念以及Dispatcher组件的功能是什么。
关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。
JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:
ResourceManager:
Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责
Dispatcher:
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。
2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
WebMonitorEndpoint:
Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理
用一个例子来描述这三个组件的功能:
当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。
Dispatcher的初始化构成与之前的WebMonitorEndpoint和ResourceManager稍有不同,在构建核心工厂类后,Dispatcher并没有像WebMonitorEndpoint和ResourceManager一样直接构建实例,而是构建了一个DispatcherRunner,并在内部构建了Dispatcher实例并启动。我们来看它是如何实现的,首先还是来到dispatcherResourceManagerComponentFactory.create()方法:
/*
TODO 在该代码的内部会创建Dispatcher组件,并调用start() 方法启动
*/
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
可以看到,这里并没有构建Dispatcher,也没有启动Dispatcher,我们进入createDispatcherRunner方法
@Override
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
dispatcherLeaderProcessFactoryFactory.createFactory(
jobGraphStoreFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
// TODO
return DefaultDispatcherRunner.create(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
}
根据变量名,我们可以看出在这里构建了一个Dispatcher的Leader竞选线程工厂,并将该对象作为参数传入了DispatcherRunner的构建方法里,我们进入DefaultDispatcherRunner.create方法:
public static DispatcherRunner create(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
throws Exception {
final DefaultDispatcherRunner dispatcherRunner =
new DefaultDispatcherRunner(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
// TODO 进入此方法
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
dispatcherRunner, leaderElectionService);
}
在构建了DispatcherRunner之后,将该实例传入了DispatcherRunner竞选Leader的生命周期管理方法,我们进入DispatcherRunnerLeaderElectionLifecycleManager.createFor方法继续分析
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
// TODO 来看构造方法
return new DispatcherRunnerLeaderElectionLifecycleManager<>(
dispatcherRunner, leaderElectionService);
}
继续进入DispatcherRunnerLeaderElectionLifecycleManager的构造方法:
private DispatcherRunnerLeaderElectionLifecycleManager(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
// TODO 开始竞选,竞选者为 dispatcherRunner
leaderElectionService.start(dispatcherRunner);
}
又看到了我们熟悉的方法,开始Leader竞选!
我们进入start方法,选择DefaultLeaderElectionService实现:
@Override
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock) {
/*
TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/
leaderContender = contender;
// TODO 此处创建选举对象 leaderElectionDriver
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
running = true;
}
}
又是熟悉的方法,在前两章中,ResourceManager、WebMonitorEndpoint组件的Leader竞选都使用的该方法,此处是DispatcherRunner的竞选,所以此处的contender为DispatcherRunner,我们继续看竞选流程,进入leaderElectionDriverFactory.createLeaderElectionDriver方法,由于是基于standalone模式分析源码,Leader的竞选依赖于zookeeper,我们进入ZooKeeperLeaderElectionDriverFactory实现:
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler,
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
return new ZooKeeperLeaderElectionDriver(
client,
latchPath,
leaderPath,
leaderEventHandler,
fatalErrorHandler,
leaderContenderDescription);
}
再进入ZooKeeperLeaderElectionDriver的构造方法:
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String latchPath,
String leaderPath,
LeaderElectionEventHandler leaderElectionEventHandler,
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
this.client = checkNotNull(client);
this.leaderPath = checkNotNull(leaderPath);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
cache = new NodeCache(client, leaderPath);
client.getUnhandledErrorListenable().addListener(this);
running = true;
// TODO 开始选举
leaderLatch.addListener(this);
leaderLatch.start();
/*
TODO 选举开始后,不就会接收到响应:
1.如果竞选成功,则回调该类的isLeader方法
2.如果竞选失败,则回调该类的notLeader方法
每一个竞选者对应一个竞选Driver
*/
cache.getListenable().addListener(this);
cache.start();
client.getConnectionStateListenable().addListener(listener);
}
又是熟悉的地方,根据前两章的分析,Leader竞选完成后会根据竞选结果回调isLeader方法或notLeader方法,此处我们直接去看isLeader方法:
/*
选举成功
*/
@Override
public void isLeader() {
leaderElectionEventHandler.onGrantLeadership();
}
在点进来:
@Override
@GuardedBy("lock")
public void onGrantLeadership() {
synchronized (lock) {
if (running) {
issuedLeaderSessionID = UUID.randomUUID();
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);
}
/*
TODO 有4中竞选者类型,LeaderContender有4中情况
1.Dispatcher = DefaultDispatcherRunner
2.JobMaster = JobManagerRunnerImpl
3.ResourceManager = ResourceManager
4.WebMonitorEndpoint = WebMonitorEndpoint
*/
leaderContender.grantLeadership(issuedLeaderSessionID);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ignoring the grant leadership notification since the {} has "
+ "already been closed.",
leaderElectionDriver);
}
}
}
}
再进入leaderContender.grantLeadership方法,由于当前是DispatcherRunner的选举,我们选择DefaultDispatcherRunner实现:
// ---------------------------------------------------------------
// Leader election
// ---------------------------------------------------------------
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
// TODO
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
根据方法名不难猜出,接下来是启动一个新的DispatcherLeader,我们进入startNewDispatcherLeaderProcess方法:
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
// TODO 如果当前有DispatcherLeader则先关闭
stopDispatcherLeaderProcess();
// TODO 然后再创建
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
// TODO 启动
newDispatcherLeaderProcess::start));
}
在该方法里一共做了三件事:
1、先判断当前是否有正在运行的DispatcherLeader,如果有则先关闭,保证当前环境中只有一个且是最新的DispatcherLeader。
2、然后再创建DispatcherLeader
3、启动DispatcherLeader
我们来看newDispatcherLeaderProcess的start方法:
@Override
public final void start() {
// TODO
runIfStateIs(State.CREATED, this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
// TODO
onStart();
}
再来看startInternal的onStart方法,选择SessionDispatcherLeaderProcess实现:
@Override
protected void onStart() {
// TODO 启动Dispatcher服务,启动JobGraphStore
startServices();
// TODO 异步编程, 若JobGraphStore启动后发现内部有未执行完毕的Job,则先通过recoverJobsAsync恢复JobGraph
// TODO 再用过createDispatcherIfRunning启动Dispatcher
onGoingRecoveryOperation =
recoverJobsAsync()
// TODO 构建Dispatcher并启动
.thenAccept(this::createDispatcherIfRunning)
.handle(this::onErrorIfRunning);
}
在这个方法里一共做了三件事:
1、启动Dispatcher所需的基础服务,启动JobGraphStore
2、恢复之前因为非正常原因没有执行完的Job
3、构建并启动Dispatcher
下面我们来详细聊聊这几个部分
我们先来看JobGraphStore的启动,进入startServices方法:
private void startServices() {
try {
// TODO 启动JobGraphStore
jobGraphStore.start(this);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format(
"Could not start %s when trying to start the %s.",
jobGraphStore.getClass().getSimpleName(), getClass().getSimpleName()),
e);
}
}
进入start方法,选择DefaultJobGraphStore实现:
@Override
public void start(JobGraphListener jobGraphListener) throws Exception {
synchronized (lock) {
if (!running) {
// TODO 启动监听
// TODO 此处的监听,若有JobGraph添加则会回调 onAddedJobGraph方法
// TODO 若有JobGraph删除则会回调 onRemovedJobGraph 方法
this.jobGraphListener = checkNotNull(jobGraphListener);
jobGraphStoreWatcher.start(this);
running = true;
}
}
}
可以看到此处启动了一个JobGraph的监听服务,当有JobGraph提交进来时会触发onAddedJobGraph方法,当有JobGraph移除时会回调onRemovedJobGraph方法,详细内容我们会在后续的Job提交源码分析力介绍。现在我们回到之前的onStart方法
若JobGraphStore启动后发现内部有未执行完毕的Job,在recoverJobsAsync()方法里会遍历这些Job并加入集合中:
private Collection<JobGraph> recoverJobs() {
log.info("Recover all persisted job graphs.");
final Collection<JobID> jobIds = getJobIds();
final Collection<JobGraph> recoveredJobGraphs = new ArrayList<>();
for (JobID jobId : jobIds) {
recoveredJobGraphs.add(recoverJob(jobId));
}
log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size());
return recoveredJobGraphs;
}
在完成中断Job的恢复工作后,开始真正的构建Dispatcher实例,并启动,我们来看createDispatcherIfRunning方法:
private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
}
再进入createDispatcher方法:
private void createDispatcher(Collection<JobGraph> jobGraphs) {
final DispatcherGatewayService dispatcherService =
// TODO 构建Dispatcher并启动
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);
completeDispatcherSetup(dispatcherService);
}
可以看到此处已经开始构建Dispatcher了,我们再点入create方法,选择DefaultDispatcherGatewayServiceFactory实现:
@Override
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
// TODO 构建Dispatcher
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
// TODO 启动DIspatcher
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
可以看到在这里真正构建了Dispatcher实例,并调用了start方法启动Dispatcher,我们先来看createDispatcher方法,选择SessionDispatcherFactory实现:
@Override
public StandaloneDispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)
throws Exception {
// create the default dispatcher
// TODO 继承了RpcEndpoint,创建完成后会回调onStart方法
return new StandaloneDispatcher(
rpcService,
fencingToken,
recoveredJobs,
dispatcherBootstrapFactory,
DispatcherServices.from(
partialDispatcherServicesWithJobGraphStore,
JobMasterServiceLeadershipRunnerFactory.INSTANCE));
}
我们再来看StandaloneDispatcher的构造方法:
public class StandaloneDispatcher extends Dispatcher {
public StandaloneDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
super(
rpcService,
fencingToken,
recoveredJobs,
dispatcherBootstrapFactory,
dispatcherServices);
}
}
再进入super,我们来到了Dispatcher类内部,因为Dispatcher继承了RpcEndpoint,根据我们在FlinkRPC章节讲到的内容,此刻我们知道在Dispatcher初始化之后会调用onStart方法,我们直接去看onStart方法:
// ------------------------------------------------------
// Lifecycle methods
// ------------------------------------------------------
@Override
public void onStart() throws Exception {
try {
// TODO 启动Dispatcher基础服务
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception =
new DispatcherException(
String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
// TODO 启动待恢复的Job
startRecoveredJobs();
this.dispatcherBootstrap =
this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
}
这里做了三件事:
1、启动Dispatcher的基础服务
2、开始恢复之前添加到集合中的中断的Job
3、构建DIspatcher实例
在Dispatcher的基础服务中只启动了一个Metric服务,没什么好看的,我们来看中断Job的恢复:
private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
}
recoveredJobs.clear();
}
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
try {
// TODO 以Recover模式运行Job
// TODO 内部具体实现等后面分析作业提交流程时再来分析
runJob(recoveredJob, ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(
new DispatcherException(
String.format(
"Could not start recovered job %s.", recoveredJob.getJobID()),
throwable));
}
}
我们可以看到,此处会遍历之前的中断Job集合,并对每一个中断Job以RECOVER模式恢复运行,具体的实现我们后面再来分析。我们继续来看Dispatcher的构建,回到之前的方法,我们俩看dispatcherBootstrapFactory.create,选择DefaultDispatcherGatewayServiceFactory,我们又回到了这里:
@Override
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
// TODO 构建Dispatcher
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
// TODO 启动DIspatcher
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
至此,Dispatcher实例已经构建完毕,接下来就是启动Dispatcher,在start方法里,Dispatcher向自己发送了一条消息,告知已启动完毕:
@Override
public void start() {
// 向自己发送消息,告知已启动
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
到此为止,Dispatcher服务已构建完毕也已启动完毕,我们总结一下。
Dispatcher的构建其实一共就做了两件事:
1、启动 JobGraphStore 服务
2、从 JobGraphStrore 恢复执行 Job, 要启动 Dispatcher
只不过Dispatcher的构建之前,Flink先构建了一个DispatcherRunner,并进行了Leader选举,选举完成之后才由LeaderDispatcherRunner构建Dispatcher并启动。在这里需要注意两点:
1、DispatcherRunner的选举环节会回调isLeader方法。
2、Dispatcher对象继承了RpcEndpoint,所以在构建完成后会调用onStart方法。
在前三章中,我们介绍了主节点(逻辑JobManager)的启动流程,以及8大基础服务的构建和启动,并且在前两章中我们介绍了WebMonitorEndpoint组件和ResourceManager组价你的启动,到此为止Dispatcher也已启动完毕,主节点也在这里完成了它所有的启动工作。在下一章中,我们来看看从节点TaskManager的启动流程!