序
本文主要研究一下flink JobManager的High Availability
配置
flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share
- high-availability的可选值为NONE或者zookeeper;high-availability.zookeeper.quorum用于指定zookeeper的peers;high-availability.zookeeper.path.root用于指定在zookeeper的root node路径;high-availability.cluster-id用于指定当前cluster的node名称,该cluster node位于root node下面;high-availability.storageDir用于指定JobManager metadata的存储路径
masters文件
localhost:8081
localhost:8082
- masters文件用于指定jobmanager的地址
HighAvailabilityMode
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
public enum HighAvailabilityMode {
NONE(false),
ZOOKEEPER(true),
FACTORY_CLASS(true);
private final boolean haActive;
HighAvailabilityMode(boolean haActive) {
this.haActive = haActive;
}
/**
* Return the configured {@link HighAvailabilityMode}.
*
* @param config The config to parse
* @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
* configured.
*/
public static HighAvailabilityMode fromConfig(Configuration config) {
String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
if (haMode == null) {
return HighAvailabilityMode.NONE;
} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
// Map old default to new default
return HighAvailabilityMode.NONE;
} else {
try {
return HighAvailabilityMode.valueOf(haMode.toUpperCase());
} catch (IllegalArgumentException e) {
return FACTORY_CLASS;
}
}
}
/**
* Returns true if the defined recovery mode supports high availability.
*
* @param configuration Configuration which contains the recovery mode
* @return true if high availability is supported by the recovery mode, otherwise false
*/
public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
HighAvailabilityMode mode = fromConfig(configuration);
return mode.haActive;
}
}
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability
HighAvailabilityOptions
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java
@PublicEvolving
@ConfigGroups(groups = {
@ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {
// ------------------------------------------------------------------------
// Required High Availability Options
// ------------------------------------------------------------------------
/**
* Defines high-availability mode used for the cluster execution.
* A value of "NONE" signals no highly available setup.
* To enable high-availability, set this mode to "ZOOKEEPER".
* Can also be set to FQN of HighAvailability factory class.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_MODE =
key("high-availability")
.defaultValue("NONE")
.withDeprecatedKeys("recovery.mode")
.withDescription("Defines high-availability mode used for the cluster execution." +
" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
/**
* The ID of the Flink cluster, used to separate multiple Flink clusters
* Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
*/
public static final ConfigOption<String> HA_CLUSTER_ID =
key("high-availability.cluster-id")
.defaultValue("/default")
.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
.withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
" Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");
/**
* File system path (URI) where Flink persists metadata in high-availability setups.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_STORAGE_PATH =
key("high-availability.storageDir")
.noDefaultValue()
.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
// ------------------------------------------------------------------------
// Recovery Options
// ------------------------------------------------------------------------
/**
* Optional port (range) used by the job manager in high-availability mode.
*/
public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
key("high-availability.jobmanager.port")
.defaultValue("0")
.withDeprecatedKeys("recovery.jobmanager.port")
.withDescription("Optional port (range) used by the job manager in high-availability mode.");
/**
* The time before a JobManager after a fail over recovers the current jobs.
*/
public static final ConfigOption<String> HA_JOB_DELAY =
key("high-availability.job.delay")
.noDefaultValue()
.withDeprecatedKeys("recovery.job.delay")
.withDescription("The time before a JobManager after a fail over recovers the current jobs.");
// ------------------------------------------------------------------------
// ZooKeeper Options
// ------------------------------------------------------------------------
/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum")
.withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");
/**
* The root path under which Flink stores its entries in ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
key("high-availability.zookeeper.path.root")
.defaultValue("/flink")
.withDeprecatedKeys("recovery.zookeeper.path.root")
.withDescription("The root path under which Flink stores its entries in ZooKeeper.");
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch")
.withDescription("Defines the znode of the leader latch which is used to elect the leader.");
/** ZooKeeper root path (ZNode) for job graphs. */
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
key("high-availability.zookeeper.path.jobgraphs")
.defaultValue("/jobgraphs")
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
.withDescription("ZooKeeper root path (ZNode) for job graphs");
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader")
.withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
" leader session ID.");
/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");
/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");
/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
key("high-availability.zookeeper.path.mesos-workers")
.defaultValue("/mesos-workers")
.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
.withDescription(Description.builder()
.text("The ZooKeeper root path for persisting the Mesos worker information.")
.build());
// ------------------------------------------------------------------------
// ZooKeeper Client Settings
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
key("high-availability.zookeeper.client.session-timeout")
.defaultValue(60000)
.withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
.withDescription("Defines the session timeout for the ZooKeeper session in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
key("high-availability.zookeeper.client.connection-timeout")
.defaultValue(15000)
.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
.withDescription("Defines the connection timeout for ZooKeeper in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
key("high-availability.zookeeper.client.retry-wait")
.defaultValue(5000)
.withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
.withDescription("Defines the pause between consecutive retries in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
key("high-availability.zookeeper.client.max-retry-attempts")
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
.withDescription("Defines the number of connection retries before the client gives up.");
public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
key("high-availability.zookeeper.path.running-registry")
.defaultValue("/running_job_registry/");
public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
key("high-availability.zookeeper.client.acl")
.defaultValue("open")
.withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
" set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
" SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
private HighAvailabilityOptions() {}
}
- HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
HighAvailabilityServicesUtils
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
public class HighAvailabilityServicesUtils {
public static HighAvailabilityServices createAvailableOrEmbeddedServices(
Configuration config,
Executor executor) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
switch (highAvailabilityMode) {
case NONE:
return new EmbeddedHaServices(executor);
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(config),
executor,
config,
blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(config, executor);
default:
throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
}
}
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
JobMaster.JOB_MANAGER_NAME,
addressResolution,
configuration);
final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
ResourceManager.RESOURCE_MANAGER_NAME,
addressResolution,
configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
Dispatcher.DISPATCHER_NAME,
addressResolution,
configuration);
final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
"%s must be set",
RestOptions.ADDRESS.key());
final int port = configuration.getInteger(RestOptions.PORT);
final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
final String protocol = enableSSL ? "https://" : "http://";
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl,
String.format("%s%s:%s", protocol, address, port));
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
/**
* Returns the JobManager's hostname and port extracted from the given
* {@link Configuration}.
*
* @param configuration Configuration to extract the JobManager's address from
* @return The JobManager's hostname and port
* @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
*/
public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
final int port = configuration.getInteger(JobManagerOptions.PORT);
if (hostname == null) {
throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
"' is missing (hostname/address of JobManager to connect to).");
}
if (port <= 0 || port >= 65536) {
throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
"' (port of the JobManager actor system) : " + port +
". it must be greater than 0 and less than 65536.");
}
return Tuple2.of(hostname, port);
}
private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
try {
highAvailabilityServicesFactory = InstantiationUtil.instantiate(
haServicesClassName,
HighAvailabilityServicesFactory.class,
classLoader);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
haServicesClassName),
e);
}
try {
return highAvailabilityServicesFactory.createHAServices(config, executor);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
haServicesClassName),
e);
}
}
/**
* Enum specifying whether address resolution should be tried or not when creating the
* {@link HighAvailabilityServices}.
*/
public enum AddressResolution {
TRY_ADDRESS_RESOLUTION,
NO_ADDRESS_RESOLUTION
}
}
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices
- 其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServicesUtils还提供了getJobManagerAddress静态方法,用于获取JobManager的hostname及port
HighAvailabilityServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
/**
* The HighAvailabilityServices give access to all services needed for a highly-available
* setup. In particular, the services provide access to highly available storage and
* registries, as well as distributed counters and leader election.
*
* <ul>
* <li>ResourceManager leader election and leader retrieval</li>
* <li>JobManager leader election and leader retrieval</li>
* <li>Persistence for checkpoint metadata</li>
* <li>Registering the latest completed checkpoint(s)</li>
* <li>Persistence for the BLOB store</li>
* <li>Registry that marks a job's status</li>
* <li>Naming of RPC endpoints</li>
* </ul>
*/
public interface HighAvailabilityServices extends AutoCloseable {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/**
* This UUID should be used when no proper leader election happens, but a simple
* pre-configured leader is used. That is for example the case in non-highly-available
* standalone setups.
*/
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
/**
* This JobID should be used to identify the old JobManager when using the
* {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
* distinct JobID assigned.
*/
JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
/**
* Gets the leader retriever for the cluster's resource manager.
*/
LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
* Gets the leader retriever for the dispatcher. This leader retrieval service
* is not always accessible.
*/
LeaderRetrievalService getDispatcherLeaderRetriever();
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @return Leader retrieval service to retrieve the job manager for the given job
* @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
*/
@Deprecated
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @param defaultJobManagerAddress JobManager address which will be returned by
* a static leader retrieval service.
* @return Leader retrieval service to retrieve the job manager for the given job
*/
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
LeaderRetrievalService getWebMonitorLeaderRetriever();
/**
* Gets the leader election service for the cluster's resource manager.
*
* @return Leader election service for the resource manager leader election
*/
LeaderElectionService getResourceManagerLeaderElectionService();
/**
* Gets the leader election service for the cluster's dispatcher.
*
* @return Leader election service for the dispatcher leader election
*/
LeaderElectionService getDispatcherLeaderElectionService();
/**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
* @return Leader election service for the job manager leader election
*/
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderElectionService getWebMonitorLeaderElectionService();
/**
* Gets the checkpoint recovery factory for the job manager
*
* @return Checkpoint recovery factory
*/
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/**
* Gets the submitted job graph store for the job manager
*
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
/**
* Gets the registry that holds information about whether jobs are currently running.
*
* @return Running job registry to retrieve running jobs
*/
RunningJobsRegistry getRunningJobsRegistry() throws Exception;
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*
* @return Blob store
* @throws IOException if the blob store could not be created
*/
BlobStore createBlobStore() throws IOException;
// ------------------------------------------------------------------------
// Shutdown and Cleanup
// ------------------------------------------------------------------------
/**
* Closes the high availability services, releasing all resources.
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores
* (file systems, ZooKeeper, etc). Another instance of the high availability
* services will be able to recover the job.
*
* <p>If an exception occurs during closing services, this method will attempt to
* continue closing other services and report exceptions only after all services
* have been attempted to be closed.
*
* @throws Exception Thrown, if an exception occurred while closing these services.
*/
@Override
void close() throws Exception;
/**
* Closes the high availability services (releasing all resources) and deletes
* all data stored by these services in external stores.
*
* <p>After this method was called, the any job or session that was managed by
* these high availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to
* continue the cleanup and report exceptions only after all cleanup steps have
* been attempted.
*
* @throws Exception Thrown, if an exception occurred while closing these services
* or cleaning up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;
}
- HighAvailabilityServices定义了highly-available所需的各种services的get方法
ZooKeeperHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
/**
* An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
* The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
*
* <pre>
* /flink
* +/cluster_id_1/resource_manager_lock
* | |
* | +/job-id-1/job_manager_lock
* | | /checkpoints/latest
* | | /latest-1
* | | /latest-2
* | |
* | +/job-id-2/job_manager_lock
* |
* +/cluster_id_2/resource_manager_lock
* |
* +/job-id-1/job_manager_lock
* |/checkpoints/latest
* | /latest-1
* |/persisted_job_graph
* </pre>
*
* <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
* This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
* accommodate specific permission.
*
* <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster".
* This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
* on a framework like YARN or Mesos (in a "per-job-cluster" mode).
*
* <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
* automatically by the client or dispatcher that submits the Job to YARN or Mesos.
*
* <p>In the case of a standalone cluster, that cluster-id needs to be configured via
* {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
* cluster and participate in the execution of the same set of jobs.
*/
public class ZooKeeperHaServices implements HighAvailabilityServices {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);
private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
// ------------------------------------------------------------------------
/** The ZooKeeper client to use */
private final CuratorFramework client;
/** The executor to run ZooKeeper callbacks on */
private final Executor executor;
/** The runtime configuration */
private final Configuration configuration;
/** The zookeeper based running jobs registry */
private final RunningJobsRegistry runningJobsRegistry;
/** Store for arbitrary blobs */
private final BlobStoreService blobStoreService;
public ZooKeeperHaServices(
CuratorFramework client,
Executor executor,
Configuration configuration,
BlobStoreService blobStoreService) {
this.client = checkNotNull(client);
this.executor = checkNotNull(executor);
this.configuration = checkNotNull(configuration);
this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
this.blobStoreService = checkNotNull(blobStoreService);
}
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerLeaderRetriever(jobID);
}
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
}
@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
}
@Override
public RunningJobsRegistry getRunningJobsRegistry() {
return runningJobsRegistry;
}
@Override
public BlobStore createBlobStore() throws IOException {
return blobStoreService;
}
// ------------------------------------------------------------------------
// Shutdown
// ------------------------------------------------------------------------
@Override
public void close() throws Exception {
Throwable exception = null;
try {
blobStoreService.close();
} catch (Throwable t) {
exception = t;
}
internalClose();
if (exception != null) {
ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
}
}
@Override
public void closeAndCleanupAllData() throws Exception {
LOG.info("Close and clean up all data for ZooKeeperHaServices.");
Throwable exception = null;
try {
blobStoreService.closeAndCleanupAllData();
} catch (Throwable t) {
exception = t;
}
internalClose();
if (exception != null) {
ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
}
}
/**
* Closes components which don't distinguish between close and closeAndCleanupAllData
*/
private void internalClose() {
client.close();
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static String getPathForJobManager(final JobID jobID) {
return "/" + jobID + JOB_MANAGER_LEADER_PATH;
}
}
- ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs
JobClient.submitJob
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/client/JobClient.java
public class JobClient {
private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
//......
/**
* Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
* passed to {@code awaitJobResult} to get the result of the submission.
* @return JobListeningContext which may be used to retrieve the JobExecutionResult via
* {@code awaitJobResult(JobListeningContext context)}.
*/
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// for this job, we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job submission, checks the success/failure responses, logs
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
Future<Object> submissionFuture = Patterns.ask(
jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
jobGraph.getJobID(),
submissionFuture,
jobClientActor,
timeout,
classLoader,
highAvailabilityServices);
}
//......
}
- 像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job
小结
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability;HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices;其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServices定义了highly-available所需的各种services的get方法;ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs;像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job