当前位置: 首页 > 工具软件 > Pigeon > 使用案例 >

【Pigeon源码阅读】高可用之熔断降级实现原理(十四)

翟冷勋
2023-12-01

pigeon熔断降级

当服务调用在短时间内出现大量的失败且失败率超过一定阀值时,可以通过配置手动或自动触发降级,调用端直接返回默认对象或抛出异常,不会将调用请求发到服务提供方,如果服务提供方恢复可用,客户端可以自动或手工解除降级。

pigeon降级开关

pigeon提供三种降级开关,来分别支持不同的降级策略:

  1. 强制降级开关:在远程服务大量超时或其他不可用情况时,紧急时候进行设置,开启后,调用端会根据上述降级策略直接返回默认值或抛出降级异常,当远程服务恢复后,建议关闭此开关。对应配置pigeon.invoker.degrade.force=true,默认为false
  2. 失败降级开关:失败降级开关便于客户端在服务端出现非业务异常(比如网络失败,超时,无可用节点等)时进行降级容错,而在出现业务异常(比如登录用户名密码错误)时不需要降级。对应配置pigeon.invoker.degrade.failure=true,默认为false
  3. 自动降级开关:自动降级开关是在调用端设置,开启自动降级后,调用端如果调用某个服务出现连续的超时或不可用,当一段时间内(10秒内)失败率超过一定阀值(默认1%)会触发自动降级,调用端会根据上述降级策略直接返回默认值或抛出降级异常;当服务端恢复后,调用端会自动解除降级模式,再次发起请求到远程服务。对应配置pigeon.invoker.degrade.auto=true,默认为false

若同时开启了多个开关,会根据下面优先级使用相应降级策略:强制降级 > 自动降级 > 失败降级,其中自动降级包含失败降级策略。

pigeon降级处理策略配置

通过配置pigeon.invoker.degrade.methods为不同的服务方法指定不同的降级策略,如:

http://service.dianping.com/com.dianping.pigeon.demo.EchoService#echo=a,http://service.dianping.com/com.dianping.pigeon.demo.EchoService#getUserDetail=b,http://service.dianping.com/com.dianping.pigeon.demo.EchoService#getUserDetailArray=c
上述配置内容包含多个方法的降级策略a、b、c。如果某此调用需要降级,而降级策略没有配置则不降级,进行正常调用流程。

配置解析定义在DegradationFilter#parseDegradeMethodsConfig方法中,

对于a、b、c这些降级策略,可以通过诸如pigeon.invoker.degrade.method.return.a等配置来定义具体的降级处理策略。
在触发降级后,pigeon支持4种降级处理策略:

  1. 指定默认返回值,可以为一个复杂对象
  2. 抛出指定异常
  3. 执行groovy脚本
  4. Mock方式

下面对这几种降级处理策略举例分析:

指定默认返回值

如策略a有pigeon.invoker.degrade.method.return.a配置值为:

{
	"returnClass": "java.lang.String",
	"content": "echo,input"
}

这里意思是降级返回一个字符串"echo,input"。
而对于复杂对象,可参照json格式配置,如:

{
	"returnClass": "com.dianping.pigeon.demo.User",
	"content": "{\"username\":\"user-1\"}"
}

如果反序列化是Map类型,还可以配置keyClass和valueClass属性来指定键值类型,如果是Collection类型,可以配置getComponentClass来指定元素类型。

抛出指定异常

如果想在降级后抛出指定异常,可以配置如pigeon.invoker.degrade.method.return.b为:

{
	"throwException": "true",
	"returnClass": "Exception"
}

执行groovy脚本

如配置pigeon.invoker.degrade.method.return.c值为:

{
	"useGroovyScript": "true",
	"content": "throw new RuntimeException('test groovy degrade');"
}

pigeon可以根据配置的content,动态执行groovy脚本,这里需注意脚本的最后一条语句必须返回方法的返回值类型或抛出异常。

mock方式

除了上述几种使用lion配置降级策略的方式,pigeon还提供了一种使用mock类的降级配置方式。

例如我们想修改pigeon-test.pigeon.invoker.degrade.method.return.a的降级策略方式为mock方式,只需修改配置为:{"useMockClass":"true"}
打开mock开关,然后在spring的xml配置中添加mock类的引用对象:

<bean id="echoService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
    <property name="url" value="com.dianping.pigeon.benchmark.service.EchoService" />
    <property name="interfaceName" value="com.dianping.pigeon.benchmark.service.EchoService" />
    <property name="mock" ref="echoServiceMock" /><!-- 添加mock类的引用 -->
</bean>

<!-- 必须实现EchoService接口 -->
<bean id="echoServiceMock" class="com.dianping.pigeon.benchmark.service.EchoServiceMock"/

对于上面几种策略,可以通过配置enable=true|false来确定是否启动策略,不填写默认为true,如{“useMockClass”:“true”,“enable”:“false”}。
如果对于同一个服务方法启动了多种降级策略,会根据以下优先级执行策略:
mock方式>groovy脚本>抛出异常>返回默认对象。

分析完以上策略配置,来看看pigeon解析配置的代码实现,定义在DegradationFilter#parseDegradeMethodsConfig方法中:

private static void parseDegradeMethodsConfig(String degradeMethodsConfig) throws Throwable {
    if (StringUtils.isNotBlank(degradeMethodsConfig)) {
        ConcurrentHashMap<String, DegradeAction> map = new ConcurrentHashMap<String, DegradeAction>();
        // 格式如"key1=value1,key2=value2",其中key为url + "#" + methodName
        // 可以从配置"pigeon.invoker.degrade.method.return." + value中获取具体方法的DegradeActionConfig JSON字符串
        String[] pairArray = degradeMethodsConfig.split(",");
        for (String str : pairArray) {
            if (StringUtils.isNotBlank(str)) {
                String[] pair = str.split("=");
                if (pair != null && pair.length == 2) {
                    String key = pair[1].trim();
                    DegradeAction degradeAction = new DegradeAction();
                    if (StringUtils.isNotBlank(key)) {
                        // 获取指定的degradeActionConfig,并装配DegradeAction对象
                        String config = configManager.getStringValue(KEY_DEGRADE_METHOD + key);

                        if (StringUtils.isNotBlank(config)) {
                            // 反序列化DegradeActionConfig
                            config = config.trim();
                            config = "{\"@class\":\"" + DegradeActionConfig.class.getName() + "\","
                                    + config.substring(1);
                            DegradeActionConfig degradeActionConfig = (DegradeActionConfig) jacksonSerializer
                                    .toObject(DegradeActionConfig.class, config);

                            // 解析配置,初始化degradeAction
                            degradeAction.setUseMockClass(degradeActionConfig.getUseMockClass());
                            degradeAction.setUseGroovyScript(degradeActionConfig.getUseGroovyScript());
                            degradeAction.setThrowException(degradeActionConfig.getThrowException());
                            degradeAction.setEnable(degradeActionConfig.getEnable());
                            String content = degradeActionConfig.getContent();
                            Object returnObj = null;

                            // 解析具体的降级方案
                            if (degradeAction.isUseMockClass()) {
                                // use mock class
                            } else if (degradeAction.isUseGroovyScript()) {
                                degradeAction.setGroovyScript(GroovyUtils.getScript(content));
                            } else if (degradeAction.isThrowException()) {
                                if (StringUtils.isNotBlank(degradeActionConfig.getReturnClass())) {
                                    // 反序列化成指定异常
                                    returnObj = jacksonSerializer
                                            .toObject(Class.forName(degradeActionConfig.getReturnClass()), content);
                                    if (!(returnObj instanceof Exception)) {
                                        throw new IllegalArgumentException(
                                                "Invalid exception class:" + degradeActionConfig.getReturnClass());
                                    }
                                    degradeAction.setReturnObj(returnObj);
                                }
                            } else {
                                if (StringUtils.isNotBlank(degradeActionConfig.getKeyClass())
                                        && StringUtils.isNotBlank(degradeActionConfig.getValueClass())) {
                                    // 反序列化map对象
                                    returnObj = jacksonSerializer.deserializeMap(content,
                                            Class.forName(degradeActionConfig.getReturnClass()),
                                            Class.forName(degradeActionConfig.getKeyClass()),
                                            Class.forName(degradeActionConfig.getValueClass()));
                                } else if (StringUtils.isNotBlank(degradeActionConfig.getComponentClass())) {
                                    // 反序列化collection对象
                                    returnObj = jacksonSerializer.deserializeCollection(content,
                                            Class.forName(degradeActionConfig.getReturnClass()),
                                            Class.forName(degradeActionConfig.getComponentClass()));
                                } else if (StringUtils.isNotBlank(degradeActionConfig.getReturnClass())) {
                                    // 反序列化普通java对象
                                    returnObj = jacksonSerializer
                                            .toObject(Class.forName(degradeActionConfig.getReturnClass()), content);
                                }
                                degradeAction.setReturnObj(returnObj);
                            }
                        }
                    }
                    map.put(pair[0].trim(), degradeAction);
                }
            }
        }
        // 重置缓存
        degradeMethodActions.clear();
        degradeMethodActions = map;
    } else {
        // 重置缓存
        degradeMethodActions.clear();
    }

    groovyMocks.clear();
}

降级配置和请求数据统计

在方法实现中,如果开启了强制降级开关,会直接走降级策略,否则如果开启了自动降级开关,会有一个较为复杂的计算逻辑根据自动降级配置及当前的调用情况来判断是否需要降级以及是否从降级中恢复为正常访问。
下面主要看看和自动降级策略相关的配置和数据统计。

在DegradationFilter获取到后面拦截器链调用的相应结果后,会根据结果统计调用数据,来判断后续是否需要走降级策略。
结果数据统计主要通过以下代码实现:

 // 统计失败调用情况,包括返回异常、捕获异常,降级调用异常等情况会走到
DegradationManager.INSTANCE.addFailedRequest(context, failedException);

// 统计降级调用情况,在降级调用处理里,如果不是一个失败降级,则会走到当前统计
DegradationManager.INSTANCE.addDegradedRequest(context, null);

// 统计成功调用情况
DegradationManager.INSTANCE.addNormalRequest(context);

对应的方法实现:

public void addFailedRequest(InvokerContext context, Throwable t) {
    if (t instanceof ServiceUnavailableException || t instanceof RequestTimeoutException
            || t instanceof RemoteInvocationException || t instanceof RejectedException
            || t instanceof ServiceFailureDegreadedException || isCustomizedDegradeException(t)) {
        // 非业务异常或自定义降级异常
        addRequest(context, t, false);
    }
}

public void addDegradedRequest(InvokerContext context, Throwable t) {
    addRequest(context, null, true);
    if (isLogDegrade && !(t instanceof ServiceDegradedException)) {
        // 启动记录降级日志开关,且不是降级异常
        ServiceDegradedException ex = new ServiceDegradedException(getRequestUrl(context), t);
        ex.setStackTrace(new StackTraceElement[]{});
        monitor.logError(ex);
    }
}

public void addNormalRequest(InvokerContext context) {
    addRequest(context, null, false);
}

上面三个方法都统一调用了addRequest方法:

private void addRequest(InvokerContext context, Throwable t, boolean degraded) {
    // 自动降级或强制降级才进行统计
    if (isAutoDegrade || isForceDegrade) {
        int currentSecond = Calendar.getInstance().get(Calendar.SECOND);
        String requestUrl = getRequestUrl(context);
        // 获取指定url的每秒请求数统计
        ConcurrentHashMap<Integer, Count> secondCount = requestSecondCountMap.get(requestUrl);
        if (secondCount == null) {
            // 没有则初始化一个
            secondCount = new ConcurrentHashMap<Integer, Count>();
            ConcurrentHashMap<Integer, Count> last = requestSecondCountMap.putIfAbsent(requestUrl, secondCount);
            if (last != null) {
                // 这里考虑并发处理,可能会有其他线程初始化了,则直接覆盖使用
                secondCount = last;
            }
        }
        // 获取当前秒的计数
        Count count = secondCount.get(currentSecond);
        if (count == null) {
            // 为空则初始化一个
            count = new Count(0, 0, 0);
            // 考虑并发处理
            Count last = secondCount.putIfAbsent(currentSecond, count);
            if (last != null) {
                count = last;
            }
        }
        // 总计数+1
        count.total.incrementAndGet();
        if (t != null) {
            // 存在异常,则失败数+1,仅addFailedRequest会触发这个分支
            count.failed.incrementAndGet();
        }
        if (degraded) {
            // 存在降级,则降级数+1,仅addDegradedRequest会触发这个分支
            count.degraded.incrementAndGet();
        }
    }
}

从上面我们看到,最终针对每个服务方法,会有三个纬度的秒级统计,为总请求数、降级数、失败数。并最终存储到DegradationManager的requestSecondCountMap成员变量中。但实际上,在进行降级策略触发判断时,用到的是requestCountMap。中间的转化由DegradationManager的静态内部类Checker完成,Checker实现了Runnable接口。DegradationManager会在静态初始化时启动另开一个子线程来运行Checker定时任务,每秒根据requestSecondCountMap重新初始化requestCountMap。
下面看看Checker关于数据汇总的实现,同时历史计数的清理也是在这个定时器中完成:

static class Checker implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    // 每隔一秒进行
                    Thread.sleep(1000 * degradeCheckInterval);
                    checkRequestSecondCount();
                } catch (Exception e) {
                    logger.error("", e);
                }
            }
        }

        private void checkRequestSecondCount() {
            Map<String, Count> countMap = new ConcurrentHashMap<String, Count>();
            // 最近需要统计的秒数数据,默认为10,可以理解位一个滑动的统计窗口
            final int recentSeconds = degradeCheckSeconds;
            final int currentSecond = Calendar.getInstance().get(Calendar.SECOND);
            // 遍历每隔服务方法
            for (String url : requestSecondCountMap.keySet()) {
                Map<Integer, Count> secondCount = requestSecondCountMap.get(url);
                int total = 0, failed = 0, degraded = 0;
                // 统计过去recentSeconds秒的调用情况,并汇总到countMap中
                for (int i = 1; i <= recentSeconds; i++) {
                    int prevSec = currentSecond - i;
                    // 60秒循环
                    prevSec = prevSec >= 0 ? prevSec : prevSec + 60;
                    Count ct = secondCount.get(prevSec);
                    if (ct != null) {
                        total += ct.getTotalValue();
                        failed += ct.getFailedValue();
                        degraded += ct.getDegradedValue();
                    }
                }
                countMap.put(url, new Count(total, failed, degraded));
                // 如recentSeconds=20,则清空过去20~40秒的计数器计数
                for (int i = recentSeconds + 1; i <= recentSeconds + 20; i++) {
                    int prevSec = currentSecond - i;
                    prevSec = prevSec >= 0 ? prevSec : prevSec + 60;
                    Count ct = secondCount.get(prevSec);
                    if (ct != null) {
                        ct.clear();
                    }
                }
            }
            Map<String, Count> old = requestCountMap;
            requestCountMap = countMap;
            // 清理无用数据,防止不必要的存在引用引起内存泄漏
            if (old != null) {
                old.clear();
                old = null;
            }

            // 复用降级统计和清空的线程,用于服务质量统计和清空(窗口默认为10秒)
            // ……
            // 省略这部分和降级无关的统计代码
        }

    }
}

根据requestCount中的三个汇总统计量,下面看看在自动降级策略下,降级触发和恢复的相关策略。

降级触发和恢复策略

在DegradationFilter#invoker执行拦截器逻辑开始,有两处可能会触发降级策略:

  1. 在开始会先调用DegradationManager.INSTANCE.needDegrade(context)来判断是否需要进行降级。如果满足降级条件直接走降级策略。
  2. 在完成拦截器链的上游调用回来时,会根据返回结果为下面3类异常时,进一步结合自动降级或失败降级配置判断是否走降级策略。
    1. 异常非业务异常
    2. 自定义可降级的业务异常
    3. 在调用过程中抛出而捕获到异常,

降级触发逻辑具体看needDegrade实现:

public boolean needDegrade(InvokerContext context) {
    // 是否有指定方法降级配置,且配置开关开启
    if (degradationIsEnable(context)) {
        // 是否强制降级
        if (isForceDegrade) {
            return true;
        }

        // 是否开启自动降级开关
        if (isAutoDegrade) {
            if (!CollectionUtils.isEmpty(requestCountMap)) {
                String requestUrl = getRequestUrl(context);
                // 请求量统计,基于一个滑动窗口
                Count count = requestCountMap.get(requestUrl);
                if (count != null) {
                    // 一、 请求总量达到指定阈值
                    if (count.getTotalValue() >= degradeTotalThreshold) {
                        // 二、非降级请求量(成功+失败)达到指定阈值,且失败率(非降级请求量/失败量)小于降级恢复比率
                        // 这意味会尝试从降级中恢复过来
                        if ((count.getTotalValue() - count.getDegradedValue()) > degradeInvokeThreshold
                                && count.getFailedPercent() < degradeRecoverPercent) {
                            // (降级比率-恢复比率)发生降级,如果请求正常,则降级比率逐渐降低,会慢慢恢复正常
                            return random(count.getDegradedPercent() - degradeRecoverInterval);
                        } else if (count.getFailedPercent() >= degradeRecoverPercent) {
                            // 三、失败率大于恢复率,最大降级比率(默认99.9%)发生降级
                            return random(degradePercentMax);
                        }
                    }
                }
            }
        }

        // 是否失败降级
        if (isFailureDegrade) {
            // 在调用失败后才触发降级,直接返回false表示不降级
            return false;
        }
    }
    return false;
}

// 失败比率计算方法
public float getFailedPercent() {
    int m = (total.get() - degraded.get());
    if (total.get() > 0 && m > 0) {
        return failed.get() * 100 / m;
    } else {
        return 0;
    }
}

// 降级比率计算方法
public float getDegradedPercent() {
    if (total.get() > 0) {
        return degraded.get() * 100 / total.get();
    } else {
        return 0;
    }
}

// 根据概率返回真假,如percent=80,则表示80%返回true,20%返回false
private boolean random(float percent) {
    return random.nextInt(10000) < percent * 100;
}

自动降级部分逻辑较为复杂,针对注视标注的3个条件判断,这里分情况说明:

  1. 窗口内请求量较少(不满足条件一),不降级,否则继续往下走:
  2. 请求总数增多(满足条件一),但请求基本正常(满足条件二,但负概率降级),即不降级
  3. 请求出现大量失败,失败率升高(满足条件一,三),则大概率降级。
  4. 第3点中,会漏出少部分不降级的请求,如果这部分的请求的基本正常,会触发条件二,即非降级请求达到请求阈值,且失败率低于恢复阈值,会尝试从降级请求中漏出一定比率用于试探正常调用
  5. 在第4点基础上,如果试探继续出现大量失败,则回到第3点,
  6. 在第4点基础上,如果试探请求正常,则调用比率降低,即越来越多的请求会恢复为正常调用,最终知道完全恢复。

在自动降级判断中,有几个关键配置:

  1. degradeTotalThreshold:窗口时间内最少请求总数阈值
  2. degradeInvokeThreshold:降级恢复阈值,当非降级请求量达到指定阈值后,开始尝试恢复。
  3. degradeRecoverPercent:失败率阈值百分比,非降级请求中,成功率超过这个比率阈值会触发恢复。
  4. degradeRecoverInterval: 起始恢复比率,如为10%则表示会从降级请求的比率中恢复10%尝试进行正常请求。
  5. degradePercentMax: 最大降级百分比如99%,表示失败率达到特定阈值后,会有99%的请求直接走降级,其余1%走正常请求用于试探恢复。

降级拦截器总体实现逻辑

基于上面分析,我们对降级策略的解析和处理时机以及具体触发、恢复策略有了较完整的了解,下面来看看DegradationFilter的invke方法完整实现:

public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext context) throws Throwable {
    context.getTimeline().add(new TimePoint(TimePhase.D));

    InvocationResponse degradeResponse;
    // 如果需要降级,则进行降级处理
    if (DegradationManager.INSTANCE.needDegrade(context)) {
        degradeResponse = degradeCall(context);

        if (degradeResponse != null) {
            // 返回自动降级熔断的降级结果
            return degradeResponse;
        }
    }

    boolean failed = false;
    Throwable failedException = null;

    try {
        // 继续拦截器链调用
        InvocationResponse response = handler.handle(context);
        Object responseReturn = response.getReturn();
        if (responseReturn != null) {
            int messageType = response.getMessageType();

            if (messageType == Constants.MESSAGE_TYPE_EXCEPTION) {
                // 非业务异常,尝试走降级
                RpcException rpcException = InvokerUtils.toRpcException(response);
                if (rpcException instanceof RemoteInvocationException
                        || rpcException instanceof RejectedException) {
                    // 进一步限制为RemoteInvocationException或者RejectedException等非业务异常
                    failed = true;
                    failedException = rpcException;
                    // 是否启用了调用失败降级
                    if (DegradationManager.INSTANCE.needFailureDegrade(context)) {
                        context.getDegradeInfo().setFailureDegrade(true);
                        context.getDegradeInfo().setCause(rpcException);
                        // 失败调用降级
                        degradeResponse = degradeCall(context);

                        if (degradeResponse != null) {
                            // 返回同步调用模式的失败降级结果
                            return degradeResponse;
                        }
                    }
                }
            } else if (messageType == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) {
                // 如果捕捉到用户指定的业务异常,包装为降级异常捕捉
                Exception exception = InvokerUtils.toApplicationException(response);
                // 异常为自定义降级异常,并且启用了自动降级或失败降级的开关
                if (DegradationManager.INSTANCE.needFailureDegrade(context)
                        && DegradationManager.INSTANCE.isCustomizedDegradeException(exception)) {
                    failed = true;
                    failedException = exception;
                    if (DegradationManager.INSTANCE.needFailureDegrade(context)) { // 这个判断是多余的?
                        context.getDegradeInfo().setFailureDegrade(true);
                        context.getDegradeInfo().setCause(exception);
                        // 触发降级策略
                        degradeResponse = degradeCall(context);

                        if (degradeResponse != null) {// 返回同步调用模式的失败降级结果
                            return degradeResponse;
                        }
                    }
                }
            }
        }

        // 开始统计
        // 先获取调用方式
        InvokerConfig<?> invokerConfig = context.getInvokerConfig();
        byte callMethodCode = invokerConfig.getCallMethod(context.getMethodName());
        CallMethod callMethod = CallMethod.getCallMethod(callMethodCode);

        // 添加统计打点信息
        if (CallMethod.SYNC == callMethod) {
            // 同步调用统计
            if (failed) {
                // 统计失败调用情况
                DegradationManager.INSTANCE.addFailedRequest(context, failedException);
            } else {
                // 统计成功调用情况
                DegradationManager.INSTANCE.addNormalRequest(context);
            }
        }

        return response;

    } catch (ServiceUnavailableException | RemoteInvocationException | RequestTimeoutException
            | RejectedException e) {
        // 仅捕捉非业务异常
        failed = true;
        // 满足下列异常条件,则直接走降级
        if (DegradationManager.INSTANCE.needFailureDegrade(context)) {
            context.getDegradeInfo().setFailureDegrade(true);
            context.getDegradeInfo().setCause(e);
            degradeResponse = degradeCall(context);

            if (degradeResponse != null) {// 返回同步调用模式的失败降级结果
                return degradeResponse;
            }
        }

        // 添加统计打点信息
        DegradationManager.INSTANCE.addFailedRequest(context, e);
        throw e;
    } finally {
        // 添加统计打点信息
        RequestQualityManager.INSTANCE.addClientRequest(context, failed);
    }
}

降级具体策略实现

在上面判断需要触发降级时,会调用DegradationFilter#degradeCall:

public static InvocationResponse degradeCall(InvokerContext context) {
    InvocationResponse resp = doDegradeCall(context);
    if (resp != null) {
        // 监控数据记录
        InvokerMonitorData monitorData = (InvokerMonitorData) context.getMonitorData();

        if (monitorData != null) {
            monitorData.degrade();
        }

        if (context.getDegradeInfo().isFailureDegrade()) {
            // 如果是失败降级,统计失败调用情况
            DegradationManager.INSTANCE.addFailedRequest(context, new ServiceFailureDegreadedException());
        } else {
            // 统计降级调用情况
            DegradationManager.INSTANCE.addDegradedRequest(context, null);
        }

    }
    return resp;
}

继续追踪降级策略的实现逻辑,在DegradationFilter#doDegradeCall方法:

private static InvocationResponse doDegradeCall(InvokerContext context) {
    // 获取调用方法策略配置
    InvokerConfig<?> invokerConfig = context.getInvokerConfig();
    byte callMethodCode = invokerConfig.getCallMethod(context.getMethodName());
    CallMethod callMethod = CallMethod.getCallMethod(callMethodCode);

    InvocationResponse response = null;
    // 获取调用超时时间配置
    int timeout = invokerConfig.getTimeout(context.getMethodName());
    Integer timeoutThreadLocal = InvokerHelper.getTimeout();
    if (timeoutThreadLocal != null) {
        timeout = timeoutThreadLocal;
    }

    // 监控统计
    InvokerMonitorData monitorData = (InvokerMonitorData) context.getMonitorData();

    if (monitorData != null) {
        monitorData.setCallMethod(invokerConfig.getCallMethod());
        monitorData.setSerialize(invokerConfig.getSerialize());
        monitorData.setTimeout(timeout);
        monitorData.add();
    }

    // 线程内设置的默认结果,类似缓存
    Object defaultResult = InvokerHelper.getDefaultResult();
    // 获取特定服务方法配置的降级策略
    String key = DegradationManager.INSTANCE.getRequestUrl(context);
    DegradeAction action = degradeMethodActions.get(key);

    if (callMethod == CallMethod.FUTURE && context.getDegradeInfo().isFailureDegrade()) {
        callMethod = CallMethod.SYNC;
    }

    switch (callMethod) {
        case SYNC:
            try {
                if (defaultResult != null) {
                    // 存在默认结果配置,返回默认结果
                    response = InvokerUtils.createDefaultResponse(defaultResult);
                } else if (action != null) {
                    if (action.isUseMockClass()) {
                        // 依赖注入的mock对象
                        Object mockObj = context.getInvokerConfig().getMock();
                        if (mockObj != null) {
                            // 反射调用配置的mock对象的指定方法,方法参数和请求方法一致
                            defaultResult = new MockProxyWrapper(mockObj).invoke(context.getMethodName(),
                                    context.getParameterTypes(), context.getArguments());
                            response = InvokerUtils.createDefaultResponse(defaultResult);
                        }
                    } else if (action.isUseGroovyScript()) {
                        // 传入groovy脚本,生成相应的调用对象,再通过代理进行包装,通过反射调用代理方法来实际完成groovy脚本方法的调用
                        defaultResult = new MockProxyWrapper(getGroovyMockProxy(key, context, action))
                                .invoke(context.getMethodName(), context.getParameterTypes(), context.getArguments());
                        response = InvokerUtils.createDefaultResponse(defaultResult);
                    } else if (action.isThrowException()) {
                        Exception exception;
                        // 如果指定了异常,则返回指定异常,否则返回ServiceDegradedException
                        if (action.getReturnObj() == null) {
                            exception = new ServiceDegradedException(key);
                        } else {
                            exception = (Exception) action.getReturnObj();
                        }
                        throw exception;
                    } else {
                        // 降级配置中的默认返回结果
                        defaultResult = action.getReturnObj();
                        response = InvokerUtils.createDefaultResponse(defaultResult);
                    }
                }
            } catch (Throwable t) {
                // 业务异常
                response = InvokerUtils.createDefaultResponse(t);
                response.setMessageType(Constants.MESSAGE_TYPE_SERVICE_EXCEPTION);
            } finally {
                if (response != null) {
                    // 标志为降级调用
                    context.getDegradeInfo().setDegrade(true);
                    addCurrentTimeData(timeout);
                }
            }
            break;
        case CALLBACK:
            // 实现原理类似与SYNC,只是用callBack进行了一层封装
            try {
                if (defaultResult != null) {
                    response = callBackOnSuccess(context, defaultResult);
                } else if (action != null) {
                    if (action.isUseMockClass()) {
                        Object mockObj = context.getInvokerConfig().getMock();
                        if (mockObj != null) {
                            defaultResult = new MockProxyWrapper(mockObj).invoke(context.getMethodName(),
                                    context.getParameterTypes(), context.getArguments());
                            response = callBackOnSuccess(context, defaultResult);
                        }
                    } else if (action.isUseGroovyScript()) {
                        defaultResult = new MockProxyWrapper(getGroovyMockProxy(key, context, action))
                                .invoke(context.getMethodName(), context.getParameterTypes(), context.getArguments());
                        response = callBackOnSuccess(context, defaultResult);
                    } else if (action.isThrowException()) {
                        Exception exception;
                        if (action.getReturnObj() == null) {
                            exception = new ServiceDegradedException(key);
                        } else {
                            exception = (Exception) action.getReturnObj();
                        }

                        throw exception;

                    } else {
                        defaultResult = action.getReturnObj();
                        response = callBackOnSuccess(context, defaultResult);
                    }
                }
            } catch (Throwable t) {
                if (t instanceof Exception) {
                    response = callBackOnfailure(context, (Exception) t);
                } else {
                    response = callBackOnfailure(context, new ApplicationException(t));
                }
            } finally {
                if (response != null) {
                    context.getDegradeInfo().setDegrade(true);
                    addCurrentTimeData(timeout);
                    MonitorTransaction transaction = MonitorLoader.getMonitor().getCurrentCallTransaction();
                    if (transaction != null) {
                        DegradationManager.INSTANCE.monitorDegrade(context, transaction);
                    }
                }
            }
            break;
        case FUTURE:
            // 实现原理类似与SYNC,只是用future进行了一层封装
            if (defaultResult != null) {
                DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);
                FutureFactory.setFuture(future);
                response = InvokerUtils.createFutureResponse(future);
                future.callback(InvokerUtils.createDefaultResponse(defaultResult));
                future.run();
            } else if (action != null) {
                if (action.isUseMockClass()) {
                    Object mockObj = context.getInvokerConfig().getMock();
                    if (mockObj != null) {
                        MockProxyWrapper mockProxyWrapper = new MockProxyWrapper(mockObj);
                        MockCallbackFuture future = new MockCallbackFuture(mockProxyWrapper, context, timeout);
                        FutureFactory.setFuture(future);
                        response = InvokerUtils.createFutureResponse(future);
                        future.callback(response);
                        future.run();
                    }
                } else if (action.isUseGroovyScript()) {
                    MockProxyWrapper mockProxyWrapper = new MockProxyWrapper(getGroovyMockProxy(key, context, action));
                    MockCallbackFuture future = new MockCallbackFuture(mockProxyWrapper, context, timeout);
                    FutureFactory.setFuture(future);
                    response = InvokerUtils.createFutureResponse(future);
                    future.callback(response);
                    future.run();
                } else if (action.isThrowException()) {
                    Exception exception;
                    if (action.getReturnObj() == null) {
                        exception = new ServiceDegradedException(key);
                    } else {
                        exception = (Exception) action.getReturnObj();
                    }
                    DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);
                    FutureFactory.setFuture(future);
                    response = InvokerUtils.createFutureResponse(future);
                    future.callback(InvokerUtils.createDefaultResponse(exception));
                    future.run();
                } else {
                    defaultResult = action.getReturnObj();
                    DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);
                    FutureFactory.setFuture(future);
                    response = InvokerUtils.createFutureResponse(future);
                    future.callback(InvokerUtils.createDefaultResponse(defaultResult));
                    future.run();
                }
            }
            if (response != null) {
                context.getDegradeInfo().setDegrade(true);
                addCurrentTimeData(timeout);
            }
            break;
        case ONEWAY:
            // 不需要调用结果的调用,返回空对象
            context.getDegradeInfo().setDegrade(true);
            addCurrentTimeData(timeout);
            response = NO_RETURN_RESPONSE;
            break;
        default:
            break;
    }

    if (response != null) {
        ((DefaultInvokerContext) context).setResponse(response);
    }

    return response;
}

在降级处理中,看看groovy脚本执行原理:
先生成一个脚本动态代理:

private static Object getGroovyMockProxy(String key, InvokerContext context, DegradeAction action) {
    // 获取脚本代理缓存
    Object interfaceProxy = groovyMocks.get(key);
    // 不存在则初始化
    if (interfaceProxy == null) {
        // 生成动态代理
        interfaceProxy = MockInvocationUtils.getProxy(context.getInvokerConfig(),
                new GroovyScriptInvocationProxy(action.getGroovyScript()));
        // 更新缓存,需要考虑并发
        Object oldInterfaceProxy = groovyMocks.putIfAbsent(key, interfaceProxy);
        if (oldInterfaceProxy != null) {
            interfaceProxy = oldInterfaceProxy;
        }
    }

    return interfaceProxy;
}

动态代理的封装调用非常简单:

public class MockInvocationUtils {

    public static Object getProxy(InvokerConfig invokerConfig, InvocationHandler proxyObject) {
        return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
                new Class[]{invokerConfig.getServiceInterface()}, proxyObject);
    }
}

再看看GroovyScriptInvocationProxy实现:

public class GroovyScriptInvocationProxy implements InvocationHandler {

    private final Script script;

    public GroovyScriptInvocationProxy(Script script) {
        this.script = script;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        // 如果是Object的基本方法或重载方法,特殊处理,
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(script, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return script.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return script.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return script.equals(args[0]);
        }
        // 运行脚本
        return script.run();
    }
}

最后看看MockProxyWrapper,传入一个代理对象,基于方法名,参数类型和具体参数值,调用具体的代理对象方法,mock对象也基于MockProxyWrapper进行调用:

public class MockProxyWrapper {

    private final Object proxy;

    public MockProxyWrapper(Object proxy) {

        if (proxy == null) {
            throw new IllegalArgumentException("proxy == null");
        }

        this.proxy = proxy;
    }

    public Object invoke(String methodName, Class<?>[] parameterTypes, Object[] arguments)
            throws Throwable {
        // 根据方法名,参数类型列表反射获取方法对象
        Method method = proxy.getClass().getMethod(methodName, parameterTypes);
        try {
            // 反射调用方法
            return method.invoke(proxy, arguments);
        } catch (InvocationTargetException e) {
            // 反射异常处理,返回实际异常
            Throwable t = e.getTargetException();
            if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
                throw t.getCause();
            }
            throw t;
        }
    }
}

 类似资料: