在yarn-session模式下,共享一个dispatcher
org.apache.flink.runtime.dispatcher.Dispatcher 这个类,这里接收到客户端传上来的作业图
//这里就是分发器接收到作业图
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new DuplicateJobSubmissionException(jobGraph.getJobID()));
} else if (isPartialResourceConfigured(jobGraph)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
"resources configured. The limitation will be removed in future versions."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}
从这个方法跳出这个类,创建jobmanager ,实际上也就是这个类
JobManagerRunnerImpl
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());
}
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl 看这个的构造函数
创建jobmaster
public JobManagerRunnerImpl(
final JobGraph jobGraph,
final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
final LibraryCacheManager libraryCacheManager,
final Executor executor,
final FatalErrorHandler fatalErrorHandler) throws Exception {
this.resultFuture = new CompletableFuture<>();
this.terminationFuture = new CompletableFuture<>();
this.leadershipOperation = CompletableFuture.completedFuture(null);
// make sure we cleanly shut down out JobManager services if initialization fails
try {
this.jobGraph = checkNotNull(jobGraph);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
// libraries and class loader first
try {
libraryCacheManager.registerJob(
jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
}
final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
if (userCodeLoader == null) {
throw new Exception("The user code class loader could not be initialized.");
}
// high availability services next
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
this.leaderGatewayFuture = new CompletableFuture<>();
// now start the JobManager
//这里创建 jobmaster
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
}
catch (Throwable t) {
terminationFuture.completeExceptionally(t);
resultFuture.completeExceptionally(t);
throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
}
}
实际创建jobmaster的代码
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
@Override
public JobMaster createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader) throws Exception {
return new JobMaster(
rpcService,
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolFactory,
schedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
schedulerNGFactory,
shuffleMaster,
lookup -> new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(),
shuffleMaster,
lookup
));
}
从这里分析得到yarn-session 集群中只有一个dispatcher,每一个作业对应一个jobmanager,jobmaster