netflix ribbon 之 LoadBalancerRule

宣俊豪
2023-12-01

netflix ribbon 之 LoadBalancerRule

1.使用-DServer.port=8081开启了server.port为8080和8081的两个spring-cloud-order-service服务;
2.客户端spring-cloud-user-service配置文件:
#负载均衡策略
spring-cloud-order-service.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule
# 使用WeightedResponseTimeRule时,重新计算权重的时间间隔[默认30s]
ribbon.ServerWeightTaskTimerInterval: 5000
#处理请求的超时时间,默认为5秒
ribbon.ReadTimeout: 5000
#连接建立的超时时长,默认5秒
ribbon.ConnectTimeout: 1000
#同一台实例的最大重试次数,但是不包括首次调用,默认为1次
ribbon.MaxAutoRetries: 1
#重试负载均衡其他实例的最大重试次数,不包括首次调用,默认为0次
ribbon.MaxAutoRetriesNextServer: 0
#是否对所有操作都重试,默认false
ribbon.OkToRetryOnAllOperations: false
3.LoadBalancerRule负载选取策略:

RandomRule【默认:轮询】

WeightedResponseTimeRule【每30s进行一次权重调整】:
2021-01-15 20:58:19.482 INFO 10348 — [d-order-service] c.n.l.WeightedResponseTimeRule : Weight adjusting job started

RoundRobinRule【线性轮询】

RetryRule【重试机制轮询】


4.LoadBalancerRule源码解读
RoundRobinRule(线性轮询的规则)
public class RoundRobinRule extends AbstractLoadBalancerRule {
    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    public RoundRobinRule() {
        this.nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        this.setLoadBalancer(lb);
    }
    //最多选择十次,就会结束尝试
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        } else {
            Server server = null;
            int count = 0;

            while(true) {
                if (server == null && count++ < 10) {
                    List<Server> reachableServers = lb.getReachableServers();
                    List<Server> allServers = lb.getAllServers();
                    int upCount = reachableServers.size();
                    int serverCount = allServers.size();
                    if (upCount != 0 && serverCount != 0) {
                        int nextServerIndex = this.incrementAndGetModulo(serverCount);
                        server = (Server)allServers.get(nextServerIndex);
                        if (server == null) {
                            /* Transient.让出CPU, 重新抢占CPU */
                            Thread.yield();
                        } else {
                            if (server.isAlive() && server.isReadyToServe()) {
                                return server;
                            }
                            // Next.
                            server = null;
                        }
                        continue;
                    }

                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }

                if (count >= 10) {
                    log.warn("No available alive servers after 10 tries from load balancer: " + lb);
                }

                return server;
            }
        }
    }

    //通过递增实现轮询
    private int incrementAndGetModulo(int modulo) {
        int current;
        int next;
        do {
            current = this.nextServerCyclicCounter.get();
            next = (current + 1) % modulo;
        } while(!this.nextServerCyclicCounter.compareAndSet(current, next));

        return next;
    }

    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

WeightedResponseTimeRule(由响应时间确定权重的规则)
public class WeightedResponseTimeRule extends RoundRobinRule {
    public static final int DEFAULT_TIMER_INTERVAL = 30000;
    private int serverWeightTaskTimerInterval = 30000;
    private volatile List<Double> accumulatedWeights = new ArrayList();
    private final Random random = new Random();
    protected Timer serverWeightTimer = null;
    protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
    String name = "unknown";

    public WeightedResponseTimeRule() {
    }

    public WeightedResponseTimeRule(ILoadBalancer lb) {
        super(lb);
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        if (lb instanceof BaseLoadBalancer) {
            this.name = ((BaseLoadBalancer)lb).getName();
        }
        this.initialize(lb);
    }
    // 规则的初始化
    void initialize(ILoadBalancer lb) {
        // 关闭前置的服务器权重timetask计划
        if (this.serverWeightTimer != null) {
            this.serverWeightTimer.cancel();
        }
        // 制定新的服务器权重timetask计划, 30s定时任务
        this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
        this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
        // 计算首次服务器权重列表
        WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
        sw.maintainWeights();
        // 应用关闭动作里添加新的动作:服务器权重timetask计划关闭
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
                WeightedResponseTimeRule.this.serverWeightTimer.cancel();
            }
        }));
    }

    public void shutdown() {
        if (this.serverWeightTimer != null) {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + this.name);
            this.serverWeightTimer.cancel();
        }

    }

    List<Double> getAccumulatedWeights() {
        return Collections.unmodifiableList(this.accumulatedWeights);
    }

    // 选择服务
    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                // 当前服务器权重列表
                List<Double> currentWeights = this.accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }

                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int serverIndex = 0;
                // 最后一个就是最大权重?可能已实现排序
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);
                if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
                    // 随机权重 = 选择(0.0~1.0之间)的随机数 * 最大权重
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    int n = 0;
                    // 遍历列表,取出第一个满足[>=随机权重]的服务器下标
                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }
                    // 获取服务
                    server = (Server)allList.get(serverIndex);
                } else {
                    // 如果服务器中的最大权重过小 或 服务器列表与权重列表不匹配,则使用线性轮询
                    server = super.choose(this.getLoadBalancer(), key);
                    if (server == null) {
                        return server;
                    }
                }
                // 再次验证服务可用性
                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                }
            }

            return server;
        }
    }

    void setWeights(List<Double> weights) {
        this.accumulatedWeights = weights;
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
    }
    // 内部类(服务器权重)
    class ServerWeight {
        ServerWeight() {
        }
        // 维护权重
        public void maintainWeights() {
            ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
            if (lb != null) {
                // 服务器权重分配已经在进行?没有就改为正在进行然后执行,finally会进行重置
                if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                    try {
                        WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                        AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                        LoadBalancerStats stats = nlb.getLoadBalancerStats();
                        if (stats != null) {
                            double totalResponseTime = 0.0D;
 
                            ServerStats ss;
                            // 统计所有服务器的(平均响应时间)总和
                            for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                                Server server = (Server)var6.next();
                                ss = stats.getSingleServerStat(server);
                            }

                            Double weightSoFar = 0.0D;
                            // 保存更新的服务器权重列表
                            List<Double> finalWeights = new ArrayList();
                            Iterator var20 = nlb.getAllServers().iterator();
                            // 遍历服务器列表,当前权重 = (平均响应时间)总和 - 当前(平均响应时间)
                            while(var20.hasNext()) {
                                Server serverx = (Server)var20.next();
                                ServerStats ssx = stats.getSingleServerStat(serverx);
                                double weight = totalResponseTime - ssx.getResponseTimeAvg();
                                weightSoFar = weightSoFar + weight;
                                finalWeights.add(weightSoFar);
                            }

                            WeightedResponseTimeRule.this.setWeights(finalWeights);
                            return;
                        }
                    } catch (Exception var16) {
                        WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
                        return;
                    } finally {
                        WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                    }

                }
            }
        }
    }
    // 内部类(服务器权重维护timetask计划)
    class DynamicServerWeightTask extends TimerTask {
        DynamicServerWeightTask() {
        }

        public void run() {
            WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();

            try {
                serverWeight.maintainWeights();
            } catch (Exception var3) {
                WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
            }

        }
    }
}

RetryRule(具备重试机制的[默认:线性轮询]规则)
public class RetryRule extends AbstractLoadBalancerRule {
    // 默认使用线性轮询规则
    IRule subRule = new RoundRobinRule();
    // 最大重试时间:默认0.5s
    long maxRetryMillis = 500L;

    public RetryRule() {
    }

    public RetryRule(IRule subRule) {
        this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
    }

    public RetryRule(IRule subRule, long maxRetryMillis) {
        this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
        this.maxRetryMillis = maxRetryMillis > 0L ? maxRetryMillis : 500L;
    }

    public void setRule(IRule subRule) {
        this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
    }

    public IRule getRule() {
        return this.subRule;
    }

    public void setMaxRetryMillis(long maxRetryMillis) {
        if (maxRetryMillis > 0L) {
            this.maxRetryMillis = maxRetryMillis;
        } else {
            this.maxRetryMillis = 500L;
        }

    }

    public long getMaxRetryMillis() {
        return this.maxRetryMillis;
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        this.subRule.setLoadBalancer(lb);
    }
    // 服务选择
    public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();// 开始时间
        long deadline = requestTime + this.maxRetryMillis;// 截止时间
        Server answer = null;
        answer = this.subRule.choose(key);
        // 在重试允许时间内,选择的服务不可用
        if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
            // 中断任务,在一个delay时间之后,中断当前线程
            InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
            // 如果线程未中断,就不断进行重试
            while(!Thread.interrupted()) {
                // 继续使用默认选择器选择服务
                answer = this.subRule.choose(key);
                // 如果服务可靠 或 超过截止时间,则跳出重试
                if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
                    break;
                }
                // 服务不可用,让出CPU,等待下一次重试
                Thread.yield();
            }

            task.cancel();// 中断计划取消
        }
        // 最后再次确认服务可靠性
        return answer != null && answer.isAlive() ? answer : null;
    }

    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

继承AbstractLoadBalancerRule实现定制化

WeightedResponseTimeRule中有如下代码:

    public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
        public String key() {
            return "ServerWeightTaskTimerInterval";
        }

        public String toString() {
            return this.key();
        }

        public Class<Integer> type() {
            return Integer.class;
        }
    };
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
    }

我应该可以修改serverWeightTaskTimerInterval自定义重试间隔时长。

serverWeightTaskTimerInterval,WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY与IClientConfig是什么关系?

查看DefaultClientConfigImpl的clientConfig.get()

    public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon";

    public <T> T get(IClientConfigKey<T> key, T defaultValue) {
        T value = this.get(key);
        if (value == null) {
            value = defaultValue;
        }

        return value;
    }

就是纯粹从IClientConfig里获取key为ServerWeightTaskTimerInterval的配置值

# 使用WeightedResponseTimeRule时,重新计算权重的时间间隔[默认30s]
ribbon.ServerWeightTaskTimerInterval: 5000

示例:

public class RetryWeightedRule extends AbstractLoadBalancerRule {
    // 默认使用服务器权重规则
    IRule subRule = new WeightedResponseTimeRule();
    // 最大重试时间:默认0.5s
    long maxRetryMillis = 500L;

    public RetryWeightedRule() {
    }

    public RetryWeightedRule(IRule subRule) {
        this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
    }

    public RetryWeightedRule(IRule subRule, long maxRetryMillis) {
        this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
        this.maxRetryMillis = maxRetryMillis > 0L ? maxRetryMillis : 500L;
    }

    public void setRule(IRule subRule) {
        this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
    }

    public IRule getRule() {
        return this.subRule;
    }

    public void setMaxRetryMillis(long maxRetryMillis) {
        if (maxRetryMillis > 0L) {
            this.maxRetryMillis = maxRetryMillis;
        } else {
            this.maxRetryMillis = 500L;
        }

    }

    public long getMaxRetryMillis() {
        return this.maxRetryMillis;
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        this.subRule.setLoadBalancer(lb);
    }
    // 服务选择
    public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();// 开始时间
        long deadline = requestTime + this.maxRetryMillis;// 截止时间
        Server answer = null;
        answer = this.subRule.choose(key);
        // 在重试允许时间内,选择的服务不可用
        if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
            // 中断任务,在一个delay时间之后,中断当前线程
            InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
            // 如果线程未中断,就不断进行重试
            while(!Thread.interrupted()) {
                // 继续使用默认选择器选择服务
                answer = this.subRule.choose(key);
                // 如果服务可靠 或 超过截止时间,则跳出重试
                if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
                    break;
                }
                // 服务不可用,让出CPU,等待下一次重试
                Thread.yield();
            }

            task.cancel();// 中断计划取消
        }
        // 最后再次确认服务可靠性
        return answer != null && answer.isAlive() ? answer : null;
    }

    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        ((WeightedResponseTimeRule) subRule).initWithNiwsConfig(clientConfig);
    }

}

 类似资料: