Flink 1.13 源码解析——JobManager启动流程之Dispatcher启动

宰父桐
2023-12-01

​点击这里查看 Flink 1.13 源码解析 目录汇总

点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览

点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动

点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动

目录

一、前言:

二、DispatcherRunner启动流程

2.1、DispatcherRunner的Leader选举

2.2、开始准备构建Dispatcher

2.2.1、启动JobGraphStore

2.2.2、寻找中断的Job

2.2.3、构建Dispatcher并启动

三、总结


一、前言:

        在之前的章节里,我们分析了Flink主节点(逻辑JobManager)的启动过程,包括了8个基础环境的创建,核心实例工厂类的创建,以及通过工厂类构建并启动WebMonitorEndpoint、ResourceManager的过程,在这一节中我们来看最后的一部分Dispatcher的启动流程,当然在此之前还是先来复习一下JobManager的一些重要概念以及Dispatcher组件的功能是什么。

        关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。

        JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:

ResourceManager:

        Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责

Dispatcher:

        1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。

        2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph

WebMonitorEndpoint:

        Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理

用一个例子来描述这三个组件的功能:

        当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。 

二、DispatcherRunner启动流程

        Dispatcher的初始化构成与之前的WebMonitorEndpoint和ResourceManager稍有不同,在构建核心工厂类后,Dispatcher并没有像WebMonitorEndpoint和ResourceManager一样直接构建实例,而是构建了一个DispatcherRunner,并在内部构建了Dispatcher实例并启动。我们来看它是如何实现的,首先还是来到dispatcherResourceManagerComponentFactory.create()方法:

/*
TODO 在该代码的内部会创建Dispatcher组件,并调用start() 方法启动
 */
dispatcherRunner =
        dispatcherRunnerFactory.createDispatcherRunner(
                highAvailabilityServices.getDispatcherLeaderElectionService(),
                fatalErrorHandler,
                new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                ioExecutor,
                rpcService,
                partialDispatcherServices);

可以看到,这里并没有构建Dispatcher,也没有启动Dispatcher,我们进入createDispatcherRunner方法

    @Override
    public DispatcherRunner createDispatcherRunner(
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler,
            JobGraphStoreFactory jobGraphStoreFactory,
            Executor ioExecutor,
            RpcService rpcService,
            PartialDispatcherServices partialDispatcherServices)
            throws Exception {

        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                dispatcherLeaderProcessFactoryFactory.createFactory(
                        jobGraphStoreFactory,
                        ioExecutor,
                        rpcService,
                        partialDispatcherServices,
                        fatalErrorHandler);

        // TODO
        return DefaultDispatcherRunner.create(
                leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
    }

根据变量名,我们可以看出在这里构建了一个Dispatcher的Leader竞选线程工厂,并将该对象作为参数传入了DispatcherRunner的构建方法里,我们进入DefaultDispatcherRunner.create方法:

    public static DispatcherRunner create(
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler,
            DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
            throws Exception {
        final DefaultDispatcherRunner dispatcherRunner =
                new DefaultDispatcherRunner(
                        leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
        // TODO 进入此方法
        return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
                dispatcherRunner, leaderElectionService);
    }

在构建了DispatcherRunner之后,将该实例传入了DispatcherRunner竞选Leader的生命周期管理方法,我们进入DispatcherRunnerLeaderElectionLifecycleManager.createFor方法继续分析

    public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
        // TODO 来看构造方法
        return new DispatcherRunnerLeaderElectionLifecycleManager<>(
                dispatcherRunner, leaderElectionService);
    }

继续进入DispatcherRunnerLeaderElectionLifecycleManager的构造方法:

    private DispatcherRunnerLeaderElectionLifecycleManager(
            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
        this.dispatcherRunner = dispatcherRunner;
        this.leaderElectionService = leaderElectionService;

        // TODO 开始竞选,竞选者为 dispatcherRunner
        leaderElectionService.start(dispatcherRunner);
    }

又看到了我们熟悉的方法,开始Leader竞选!

2.1、DispatcherRunner的Leader选举

我们进入start方法,选择DefaultLeaderElectionService实现:

    @Override
    public final void start(LeaderContender contender) throws Exception {
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");

        synchronized (lock) {
            /*
             TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
              在ResourceManager中调用时,contender为ResourceManager
              在DispatcherRunner中调用时,contender为DispatcherRunner
             */
            leaderContender = contender;

            // TODO 此处创建选举对象 leaderElectionDriver
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this,
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);

            running = true;
        }
    }

又是熟悉的方法,在前两章中,ResourceManager、WebMonitorEndpoint组件的Leader竞选都使用的该方法,此处是DispatcherRunner的竞选,所以此处的contender为DispatcherRunner,我们继续看竞选流程,进入leaderElectionDriverFactory.createLeaderElectionDriver方法,由于是基于standalone模式分析源码,Leader的竞选依赖于zookeeper,我们进入ZooKeeperLeaderElectionDriverFactory实现:

    @Override
    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        return new ZooKeeperLeaderElectionDriver(
                client,
                latchPath,
                leaderPath,
                leaderEventHandler,
                fatalErrorHandler,
                leaderContenderDescription);
    }

再进入ZooKeeperLeaderElectionDriver的构造方法:

    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String latchPath,
            String leaderPath,
            LeaderElectionEventHandler leaderElectionEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        this.client = checkNotNull(client);
        this.leaderPath = checkNotNull(leaderPath);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

        leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
        cache = new NodeCache(client, leaderPath);

        client.getUnhandledErrorListenable().addListener(this);

        running = true;

        // TODO 开始选举
        leaderLatch.addListener(this);
        leaderLatch.start();

        /*
        TODO 选举开始后,不就会接收到响应:
         1.如果竞选成功,则回调该类的isLeader方法
         2.如果竞选失败,则回调该类的notLeader方法
         每一个竞选者对应一个竞选Driver
         */

        cache.getListenable().addListener(this);
        cache.start();

        client.getConnectionStateListenable().addListener(listener);
    }

又是熟悉的地方,根据前两章的分析,Leader竞选完成后会根据竞选结果回调isLeader方法或notLeader方法,此处我们直接去看isLeader方法:

 /*
    选举成功
     */
    @Override
    public void isLeader() {
        leaderElectionEventHandler.onGrantLeadership();
    }

在点进来:

 @Override
    @GuardedBy("lock")
    public void onGrantLeadership() {
        synchronized (lock) {
            if (running) {
                issuedLeaderSessionID = UUID.randomUUID();
                clearConfirmedLeaderInformation();

                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Grant leadership to contender {} with session ID {}.",
                            leaderContender.getDescription(),
                            issuedLeaderSessionID);
                }

                /*
                TODO 有4中竞选者类型,LeaderContender有4中情况
                 1.Dispatcher = DefaultDispatcherRunner
                 2.JobMaster = JobManagerRunnerImpl
                 3.ResourceManager = ResourceManager
                 4.WebMonitorEndpoint = WebMonitorEndpoint
                 */
                leaderContender.grantLeadership(issuedLeaderSessionID);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Ignoring the grant leadership notification since the {} has "
                                    + "already been closed.",
                            leaderElectionDriver);
                }
            }
        }
    }

再进入leaderContender.grantLeadership方法,由于当前是DispatcherRunner的选举,我们选择DefaultDispatcherRunner实现:

 // ---------------------------------------------------------------
    // Leader election
    // ---------------------------------------------------------------

    @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    // TODO
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

根据方法名不难猜出,接下来是启动一个新的DispatcherLeader,我们进入startNewDispatcherLeaderProcess方法:

    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
        // TODO 如果当前有DispatcherLeader则先关闭
        stopDispatcherLeaderProcess();

        // TODO 然后再创建
        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
        FutureUtils.assertNoException(
                previousDispatcherLeaderProcessTerminationFuture.thenRun(
                        // TODO 启动
                        newDispatcherLeaderProcess::start));
    }

在该方法里一共做了三件事:

1、先判断当前是否有正在运行的DispatcherLeader,如果有则先关闭,保证当前环境中只有一个且是最新的DispatcherLeader。

2、然后再创建DispatcherLeader

3、启动DispatcherLeader

2.2、开始准备构建Dispatcher

我们来看newDispatcherLeaderProcess的start方法:

  @Override
    public final void start() {
        // TODO
        runIfStateIs(State.CREATED, this::startInternal);
    }

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
        // TODO
        onStart();
    }

再来看startInternal的onStart方法,选择SessionDispatcherLeaderProcess实现:

    @Override
    protected void onStart() {
        // TODO 启动Dispatcher服务,启动JobGraphStore
        startServices();

        // TODO 异步编程, 若JobGraphStore启动后发现内部有未执行完毕的Job,则先通过recoverJobsAsync恢复JobGraph
        // TODO 再用过createDispatcherIfRunning启动Dispatcher
        onGoingRecoveryOperation =
                recoverJobsAsync()
                        // TODO 构建Dispatcher并启动
                        .thenAccept(this::createDispatcherIfRunning)
                        .handle(this::onErrorIfRunning);
    }

在这个方法里一共做了三件事:

1、启动Dispatcher所需的基础服务,启动JobGraphStore

2、恢复之前因为非正常原因没有执行完的Job

3、构建并启动Dispatcher

下面我们来详细聊聊这几个部分

2.2.1、启动JobGraphStore

我们先来看JobGraphStore的启动,进入startServices方法:

    private void startServices() {
        try {
            // TODO 启动JobGraphStore
            jobGraphStore.start(this);
        } catch (Exception e) {
            throw new FlinkRuntimeException(
                    String.format(
                            "Could not start %s when trying to start the %s.",
                            jobGraphStore.getClass().getSimpleName(), getClass().getSimpleName()),
                    e);
        }
    }

进入start方法,选择DefaultJobGraphStore实现:

    @Override
    public void start(JobGraphListener jobGraphListener) throws Exception {
        synchronized (lock) {
            if (!running) {
                // TODO 启动监听
                // TODO 此处的监听,若有JobGraph添加则会回调 onAddedJobGraph方法
                // TODO 若有JobGraph删除则会回调 onRemovedJobGraph 方法
                this.jobGraphListener = checkNotNull(jobGraphListener);
                jobGraphStoreWatcher.start(this);
                running = true;
            }
        }
    }

可以看到此处启动了一个JobGraph的监听服务,当有JobGraph提交进来时会触发onAddedJobGraph方法,当有JobGraph移除时会回调onRemovedJobGraph方法,详细内容我们会在后续的Job提交源码分析力介绍。现在我们回到之前的onStart方法

2.2.2、寻找中断的Job

        若JobGraphStore启动后发现内部有未执行完毕的Job,在recoverJobsAsync()方法里会遍历这些Job并加入集合中:

    private Collection<JobGraph> recoverJobs() {
        log.info("Recover all persisted job graphs.");
        final Collection<JobID> jobIds = getJobIds();
        final Collection<JobGraph> recoveredJobGraphs = new ArrayList<>();

        for (JobID jobId : jobIds) {
            recoveredJobGraphs.add(recoverJob(jobId));
        }

        log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size());

        return recoveredJobGraphs;
    }

2.2.3、构建Dispatcher并启动

在完成中断Job的恢复工作后,开始真正的构建Dispatcher实例,并启动,我们来看createDispatcherIfRunning方法:

    private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
    }

再进入createDispatcher方法:

    private void createDispatcher(Collection<JobGraph> jobGraphs) {

        final DispatcherGatewayService dispatcherService =
                // TODO 构建Dispatcher并启动
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);

        completeDispatcherSetup(dispatcherService);
    }

可以看到此处已经开始构建Dispatcher了,我们再点入create方法,选择DefaultDispatcherGatewayServiceFactory实现:

    @Override
    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        final Dispatcher dispatcher;
        try {
            // TODO 构建Dispatcher
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    new NoOpDispatcherBootstrap(),
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        // TODO 启动DIspatcher
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

可以看到在这里真正构建了Dispatcher实例,并调用了start方法启动Dispatcher,我们先来看createDispatcher方法,选择SessionDispatcherFactory实现:

    @Override
    public StandaloneDispatcher createDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)
            throws Exception {
        // create the default dispatcher
        // TODO 继承了RpcEndpoint,创建完成后会回调onStart方法
        return new StandaloneDispatcher(
                rpcService,
                fencingToken,
                recoveredJobs,
                dispatcherBootstrapFactory,
                DispatcherServices.from(
                        partialDispatcherServicesWithJobGraphStore,
                        JobMasterServiceLeadershipRunnerFactory.INSTANCE));
    }

我们再来看StandaloneDispatcher的构造方法:

public class StandaloneDispatcher extends Dispatcher {
    public StandaloneDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            DispatcherServices dispatcherServices)
            throws Exception {
        super(
                rpcService,
                fencingToken,
                recoveredJobs,
                dispatcherBootstrapFactory,
                dispatcherServices);
    }
}

再进入super,我们来到了Dispatcher类内部,因为Dispatcher继承了RpcEndpoint,根据我们在FlinkRPC章节讲到的内容,此刻我们知道在Dispatcher初始化之后会调用onStart方法,我们直接去看onStart方法:

// ------------------------------------------------------
    // Lifecycle methods
    // ------------------------------------------------------

    @Override
    public void onStart() throws Exception {
        try {
            // TODO 启动Dispatcher基础服务
            startDispatcherServices();
        } catch (Throwable t) {
            final DispatcherException exception =
                    new DispatcherException(
                            String.format("Could not start the Dispatcher %s", getAddress()), t);
            onFatalError(exception);
            throw exception;
        }

        // TODO 启动待恢复的Job
        startRecoveredJobs();
        this.dispatcherBootstrap =
                this.dispatcherBootstrapFactory.create(
                        getSelfGateway(DispatcherGateway.class),
                        this.getRpcService().getScheduledExecutor(),
                        this::onFatalError);
    }

这里做了三件事:

1、启动Dispatcher的基础服务

2、开始恢复之前添加到集合中的中断的Job

3、构建DIspatcher实例

在Dispatcher的基础服务中只启动了一个Metric服务,没什么好看的,我们来看中断Job的恢复:

    private void startRecoveredJobs() {
        for (JobGraph recoveredJob : recoveredJobs) {
            runRecoveredJob(recoveredJob);
        }
        recoveredJobs.clear();
    }

    private void runRecoveredJob(final JobGraph recoveredJob) {
        checkNotNull(recoveredJob);
        try {
            // TODO 以Recover模式运行Job
            // TODO 内部具体实现等后面分析作业提交流程时再来分析
            runJob(recoveredJob, ExecutionType.RECOVERY);
        } catch (Throwable throwable) {
            onFatalError(
                    new DispatcherException(
                            String.format(
                                    "Could not start recovered job %s.", recoveredJob.getJobID()),
                            throwable));
        }
    }

我们可以看到,此处会遍历之前的中断Job集合,并对每一个中断Job以RECOVER模式恢复运行,具体的实现我们后面再来分析。我们继续来看Dispatcher的构建,回到之前的方法,我们俩看dispatcherBootstrapFactory.create,选择DefaultDispatcherGatewayServiceFactory,我们又回到了这里:

@Override
    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        final Dispatcher dispatcher;
        try {
            // TODO 构建Dispatcher
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    new NoOpDispatcherBootstrap(),
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        // TODO 启动DIspatcher
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

至此,Dispatcher实例已经构建完毕,接下来就是启动Dispatcher,在start方法里,Dispatcher向自己发送了一条消息,告知已启动完毕:

    @Override
    public void start() {
        // 向自己发送消息,告知已启动
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

到此为止,Dispatcher服务已构建完毕也已启动完毕,我们总结一下。

三、总结

Dispatcher的构建其实一共就做了两件事:

1、启动 JobGraphStore 服务

2、从 JobGraphStrore 恢复执行 Job, 要启动 Dispatcher

只不过Dispatcher的构建之前,Flink先构建了一个DispatcherRunner,并进行了Leader选举,选举完成之后才由LeaderDispatcherRunner构建Dispatcher并启动。在这里需要注意两点:

1、DispatcherRunner的选举环节会回调isLeader方法。

2、Dispatcher对象继承了RpcEndpoint,所以在构建完成后会调用onStart方法。

在前三章中,我们介绍了主节点(逻辑JobManager)的启动流程,以及8大基础服务的构建和启动,并且在前两章中我们介绍了WebMonitorEndpoint组件和ResourceManager组价你的启动,到此为止Dispatcher也已启动完毕,主节点也在这里完成了它所有的启动工作。在下一章中,我们来看看从节点TaskManager的启动流程!

 类似资料: