根据前两章节的分析,我们知道eureka客户端通过创建DiscoveryClient对象调用构造方法来实现了register,renew, heartbeat, registries fetch的功能。
本章节主要分析registries fetch在源码中的实现。
DiscoveryClient在构造方法中创建了一个cacheRefreshExecutor的守护线程池,核心线程数为1,默认cacheRefreshExecutorThreadPoolSize为2。
顺着源码往下看,该线程池启动操作在initScheduledTasks()方法中
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,//30秒
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()//
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
……
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
跟踪refreshRegistry()代码最终定位到fetchRegistry(boolean forceFullRegistryFetch)方法,根据clientConfig的配置进行getAndStoreFullRegistry()或者getAndUpdateDelta(applications)增量更新。这里我选择增量更新getAndUpdateDelta方法,通过指定的remoteRegions发送http请求到服务端面。代码如下
……
//remoteRegionsRef指向 fetchRemoteRegionsRegistry 配置,表示获取注册服务的远程地区
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
……
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
……
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
……
//根据instance中的ActionType来更新本地服务。ADDED,MODIFIED添加到Applications中,DELETED则从Applications中删除
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
updateDelta(delta)更新获取到的服务注册信息。Applications为服务列表的封装类,包含Map<String,Application> appNameApplicationMap,Map<String,VipIndexSupport> virtualHostNameAppMap,Map<String,VipIndexSupport> secureVirtualHostNameAppMap
其中Applicaton类对InstanceInfo进行了封装。包含属性Set<InstanceInfo>instances,Map<String,InstanceInfo>instancesMap
总结一下:
1) 定义fetchRegistry方法发送http请求获取Applications服务列表。根据instance中的ActionType来更新本地服务。ADDED,MODIFIED添加到Applications中,
DELETED则从Applications中删除。
2) 通过EurekaClientConfig的shouldFetchRegistry配置确认是否进行服务获取。forceFullRegistryFetch以属性用于判断全量抓取服务列表还是增量抓取,fetchRemoteRegionsRegistry配置指定要抓取的服务。
下一篇:Eureka服务fetch registries(二) server端分析