Flink源码-5-dispatcher,jobmanager,jobmaster关系

夏侯昊明
2023-12-01

模式

在yarn-session模式下,共享一个dispatcher
org.apache.flink.runtime.dispatcher.Dispatcher 这个类,这里接收到客户端传上来的作业图

代码分析

 //这里就是分发器接收到作业图
	@Override
	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
		log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

		try {
			if (isDuplicateJob(jobGraph.getJobID())) {
				return FutureUtils.completedExceptionally(
					new DuplicateJobSubmissionException(jobGraph.getJobID()));
			} else if (isPartialResourceConfigured(jobGraph)) {
				return FutureUtils.completedExceptionally(
					new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
							"resources configured. The limitation will be removed in future versions."));
			} else {
				return internalSubmitJob(jobGraph);
			}
		} catch (FlinkException e) {
			return FutureUtils.completedExceptionally(e);
		}
	}

从这个方法跳出这个类,创建jobmanager ,实际上也就是这个类
JobManagerRunnerImpl

private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
		final RpcService rpcService = getRpcService();

		return CompletableFuture.supplyAsync(
			CheckedSupplier.unchecked(() ->
				jobManagerRunnerFactory.createJobManagerRunner(
					jobGraph,
					configuration,
					rpcService,
					highAvailabilityServices,
					heartbeatServices,
					jobManagerSharedServices,
					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
					fatalErrorHandler)),
			rpcService.getExecutor());
	}

org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl 看这个的构造函数
创建jobmaster

public JobManagerRunnerImpl(
			final JobGraph jobGraph,
			final JobMasterServiceFactory jobMasterFactory,
			final HighAvailabilityServices haServices,
			final LibraryCacheManager libraryCacheManager,
			final Executor executor,
			final FatalErrorHandler fatalErrorHandler) throws Exception {

		this.resultFuture = new CompletableFuture<>();
		this.terminationFuture = new CompletableFuture<>();
		this.leadershipOperation = CompletableFuture.completedFuture(null);

		// make sure we cleanly shut down out JobManager services if initialization fails
		try {
			this.jobGraph = checkNotNull(jobGraph);
			this.libraryCacheManager = checkNotNull(libraryCacheManager);
			this.executor = checkNotNull(executor);
			this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

			checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

			// libraries and class loader first
			try {
				libraryCacheManager.registerJob(
						jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
			} catch (IOException e) {
				throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
			}

			final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
			if (userCodeLoader == null) {
				throw new Exception("The user code class loader could not be initialized.");
			}

			// high availability services next
			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

			this.leaderGatewayFuture = new CompletableFuture<>();

			// now start the JobManager
			//这里创建 jobmaster
			
			this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
		}
		catch (Throwable t) {
			terminationFuture.completeExceptionally(t);
			resultFuture.completeExceptionally(t);

			throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
		}
	}

实际创建jobmaster的代码
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory

@Override
	public JobMaster createJobMasterService(
			JobGraph jobGraph,
			OnCompletionActions jobCompletionActions,
			ClassLoader userCodeClassloader) throws Exception {

		return new JobMaster(
			rpcService,
			jobMasterConfiguration,
			ResourceID.generate(),
			jobGraph,
			haServices,
			slotPoolFactory,
			schedulerFactory,
			jobManagerSharedServices,
			heartbeatServices,
			jobManagerJobMetricGroupFactory,
			jobCompletionActions,
			fatalErrorHandler,
			userCodeClassloader,
			schedulerNGFactory,
			shuffleMaster,
			lookup -> new JobMasterPartitionTrackerImpl(
				jobGraph.getJobID(),
				shuffleMaster,
				lookup
			));
	}

总结

从这里分析得到yarn-session 集群中只有一个dispatcher,每一个作业对应一个jobmanager,jobmaster

 类似资料: