如需参考源码解析,请访问:https://gitee.com/lidishan/apollo-code-analysis
阅读前声明:本文不做相关用法说明,只解析Apollo-client源码
阅读本文前请参考《Apollo-client 初始化入口流程解析》
上文解析到了initialize()初始化,其是通过ConfigService.getConfig()来加载数据,那这个调用方式是怎么加载的?见下面
// 获取namespace=application 默认命名空间键为key的字符串内容
String config = ConfigService.getConfig("application").getProperty("key", "默认值");
/**
* 实时获取apollo配置信息工具类的manager
* @author Jason Song(song_s@ctrip.com)
*/
public class DefaultConfigManager implements ConfigManager {
private ConfigFactoryManager m_factoryManager;
private Map<String, Config> m_configs = Maps.newConcurrentMap();
private Map<String, ConfigFile> m_configFiles = Maps.newConcurrentMap();
public DefaultConfigManager() {
m_factoryManager = ApolloInjector.getInstance(ConfigFactoryManager.class);
}
/**
* 这里是调用ConfigService.getConfig() 对应的方法
* @param namespace the namespace
*/
@Override
public Config getConfig(String namespace) {
// 维护类一个concurrentHashMap的缓存
// -- 获取对应命名空间的缓存config
Config config = m_configs.get(namespace);
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
// 不存在,则用factory进行创建
// -- 默认实现:DefaultConfigFactoryManager
ConfigFactory factory = m_factoryManager.getFactory(namespace);
// -- 接下来看一下factory.create的实现(DefaultConfigFactory)
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
// ......省略其他代码
}
public class DefaultConfigFactory implements ConfigFactory {
private static final Logger logger = LoggerFactory.getLogger(DefaultConfigFactory.class);
private ConfigUtil m_configUtil;
public DefaultConfigFactory() {
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
}
@Override
public Config create(String namespace) {
// 决定文件解析类型:
// -- 默认支持解析的顺序(比如命名空间=namespace.json,那就会优先匹配这种格式的配置文件):
// ------ properties > xml > json > yml > yaml > txt
ConfigFileFormat format = determineFileFormat(namespace);// 返回支持的文件格式
if (ConfigFileFormat.isPropertiesCompatible(format)) {
// yml、yaml这两种格式走这里(因为要换行,解析方式有点区别)
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// 1. 先创建本地配置仓库。createLocalConfigRepository
// eg:C:\opt\data\222537\config-cache\222537+default+application.properties
// C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
// C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.properties
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
/**
// 先创建本地配置仓库。createLocalConfigRepository
// eg:C:\opt\data\222537\config-cache\222537+default+application.properties
// C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
// C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.properties
**/
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
// 可配置为local,只访问本地的缓存配置,但这样就没意义了。
// -- 配置方式:VM options:-Denv = local
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
// 本地缓存模式,区别是upstream为空
return new LocalFileConfigRepository(namespace);
}
// 创建远程配置仓库类
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
// ..... 省略其他逻辑
}
创建本地配置仓库,最终会调用到trySync()
// 位置:com.ctrip.framework.apollo.internals.LocalFileConfigRepository
public LocalFileConfigRepository(String namespace, ConfigRepository upstream) {
m_namespace = namespace;
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
// 设置本地缓存路径
this.setLocalCacheDir(findLocalCacheDir(), false);
// 本地缓存模式是null,这个方法就不会调用。远程模式下调用这个就不为空
this.setUpstreamRepository(upstream);
this.trySync();
}
protected boolean trySync() {
try {
// 主要分为两种模式:local本地=LocalFileConfigRepository#sync()、remote远程=RemoteConfigRepository#sync()
// -- local(-Denv=local):加载本地文件配置进properties
// -- remote(配置类=RemoteConfigRepository):
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
local本地模式
C:\opt\data\222537\config-cache\222537+default+application.properties
C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.propertie
remote远程模式
/**
* Constructor.
* 远程配置仓库初始化构造函数时,步骤如下:
* - 1 初始化各种参数和工具类
* - 2 加载远程、本地的类
* - 3 启动定时拉取apollo远程服务数据的线程
* - 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
* @param namespace the namespace
*/
public RemoteConfigRepository(String namespace) {
// 1 初始化各种参数和工具类
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 2 加载远程、本地的类
this.trySync();
// 3 启动定时拉取apollo远程服务数据的线程
// -- 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
this.schedulePeriodicRefresh();
// 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
// -- scheduleLongPollingRefresh->submit()->startLongPolling()->doLongPollingRefresh()
this.scheduleLongPollingRefresh();
}
// 2 加载远程、本地的类
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
// 获取缓存
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
// 引用相等意味着响应304 reference equals means HTTP 304
if (previous != current) {
// 不相等说明缓存变更,这时候重新设置缓存
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
// 触发监听器变更执行,this.getConfig()是将缓存变为properties
this.fireRepositoryChange(m_namespace, this.getConfig());
}
// 如果新加载的数据不为空,就打日志记录
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
// 3 启动定时拉取apollo远程服务数据的线程
// -- 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
/**
* 定时拉取数据 m_executorService.scheduleAtFixedRate
* - 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
*/
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
// 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
// - 长轮询这步只是调用一个notiry接口来检测是否有变化,客户端会与服务端建立一个HTTP长连接,然后如果有变化,服务端就会推数据过来,如果超过90s没变动就会断开重新建立连接
// -- 如果返回状态码=200,就会调用notiry()方法,最后会调用到trySync()走上面(2 加载远程、本地的类)拉取远程数据的流程
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
// 这里根据 长轮询是否停止 && 线程是否终止 来进行终止长轮询
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
// 这段代码感觉是多余的。因为上面的m_longPollStarted这个atomic已经确保了只有一个线程能进来了
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
// lastServiceDto为空,可能是如下几种情况:
// - 1 如果之前没有请求过导致没有把服务信息设置进lastServiceDto。比如第一次进行长轮询
// - 2 如果是304,会进行随机true或false的判断,如果true,也置为空
// - 3 长轮询过程中出错,catch会重新把这个置为null
if (lastServiceDto == null) {
// 获取服务端的配置信息
List<ServiceDTO> configServices = getConfigServices();
// 随机取一个
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
// 备注:拼接获取配置请求路径,其实这次长轮询请求的路径并不是直接获取数据,而是请求判断是否变更过的接口notifications/v2,如果变更过就调用trySync()进行请求触发
// 备注:拼接获取配置请求路径,其实这次长轮询请求的路径并不是直接获取数据,而是请求判断是否变更过的接口notifications/v2,如果变更过就调用trySync()进行请求触发
url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
// 注意了!!! 请求正常,并返回200,调用notify进行通知所有lisetener进行填充新的配置
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}