Eureka服务fetch registries(一) client端分析

谯皓君
2023-12-01

根据前两章节的分析,我们知道eureka客户端通过创建DiscoveryClient对象调用构造方法来实现了register,renew, heartbeat, registries fetch的功能。

本章节主要分析registries fetch在源码中的实现。

Clinet端的处理逻辑

DiscoveryClient在构造方法中创建了一个cacheRefreshExecutor的守护线程池,核心线程数为1,默认cacheRefreshExecutorThreadPoolSize为2。

顺着源码往下看,该线程池启动操作在initScheduledTasks()方法中

private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,//30秒
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()//
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
	……
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

跟踪refreshRegistry()代码最终定位到fetchRegistry(boolean forceFullRegistryFetch)方法,根据clientConfig的配置进行getAndStoreFullRegistry()或者getAndUpdateDelta(applications)增量更新。这里我选择增量更新getAndUpdateDelta方法,通过指定的remoteRegions发送http请求到服务端面。代码如下

……
//remoteRegionsRef指向 fetchRemoteRegionsRegistry 配置,表示获取注册服务的远程地区
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
……
if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
……

private void updateDelta(Applications delta) {
        int deltaCount = 0;
        for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                ……
	            //根据instance中的ActionType来更新本地服务。ADDED,MODIFIED添加到Applications中,DELETED则从Applications中删除
                if (ActionType.ADDED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

                } else if (ActionType.DELETED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp != null) {
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        existingApp.removeInstance(instance);
                        if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                            applications.removeApplication(existingApp);
                        }
                    }
                }
            }
        }

updateDelta(delta)更新获取到的服务注册信息。Applications为服务列表的封装类,包含Map<String,Application> appNameApplicationMap,Map<String,VipIndexSupport> virtualHostNameAppMap,Map<String,VipIndexSupport> secureVirtualHostNameAppMap

其中Applicaton类对InstanceInfo进行了封装。包含属性Set<InstanceInfo>instances,Map<String,InstanceInfo>instancesMap

总结一下:
1) 定义fetchRegistry方法发送http请求获取Applications服务列表。根据instance中的ActionType来更新本地服务。ADDED,MODIFIED添加到Applications中,
DELETED则从Applications中删除。
2) 通过EurekaClientConfig的shouldFetchRegistry配置确认是否进行服务获取。forceFullRegistryFetch以属性用于判断全量抓取服务列表还是增量抓取,fetchRemoteRegionsRegistry配置指定要抓取的服务。

下一篇:Eureka服务fetch registries(二) server端分析

 类似资料: