-DServer.port=8081
开启了server.port
为8080和8081的两个spring-cloud-order-service服务;#负载均衡策略
spring-cloud-order-service.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule
# 使用WeightedResponseTimeRule时,重新计算权重的时间间隔[默认30s]
ribbon.ServerWeightTaskTimerInterval: 5000
#处理请求的超时时间,默认为5秒
ribbon.ReadTimeout: 5000
#连接建立的超时时长,默认5秒
ribbon.ConnectTimeout: 1000
#同一台实例的最大重试次数,但是不包括首次调用,默认为1次
ribbon.MaxAutoRetries: 1
#重试负载均衡其他实例的最大重试次数,不包括首次调用,默认为0次
ribbon.MaxAutoRetriesNextServer: 0
#是否对所有操作都重试,默认false
ribbon.OkToRetryOnAllOperations: false
RandomRule【默认:轮询】
WeightedResponseTimeRule【每30s进行一次权重调整】:
2021-01-15 20:58:19.482 INFO 10348 — [d-order-service] c.n.l.WeightedResponseTimeRule : Weight adjusting job started
RoundRobinRule【线性轮询】
RetryRule【重试机制轮询】
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
public RoundRobinRule() {
this.nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
this.setLoadBalancer(lb);
}
//最多选择十次,就会结束尝试
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
} else {
Server server = null;
int count = 0;
while(true) {
if (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if (upCount != 0 && serverCount != 0) {
int nextServerIndex = this.incrementAndGetModulo(serverCount);
server = (Server)allServers.get(nextServerIndex);
if (server == null) {
/* Transient.让出CPU, 重新抢占CPU */
Thread.yield();
} else {
if (server.isAlive() && server.isReadyToServe()) {
return server;
}
// Next.
server = null;
}
continue;
}
log.warn("No up servers available from load balancer: " + lb);
return null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}
return server;
}
}
}
//通过递增实现轮询
private int incrementAndGetModulo(int modulo) {
int current;
int next;
do {
current = this.nextServerCyclicCounter.get();
next = (current + 1) % modulo;
} while(!this.nextServerCyclicCounter.compareAndSet(current, next));
return next;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final int DEFAULT_TIMER_INTERVAL = 30000;
private int serverWeightTaskTimerInterval = 30000;
private volatile List<Double> accumulatedWeights = new ArrayList();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
}
public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
this.name = ((BaseLoadBalancer)lb).getName();
}
this.initialize(lb);
}
// 规则的初始化
void initialize(ILoadBalancer lb) {
// 关闭前置的服务器权重timetask计划
if (this.serverWeightTimer != null) {
this.serverWeightTimer.cancel();
}
// 制定新的服务器权重timetask计划, 30s定时任务
this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
// 计算首次服务器权重列表
WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
sw.maintainWeights();
// 应用关闭动作里添加新的动作:服务器权重timetask计划关闭
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
WeightedResponseTimeRule.this.serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (this.serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + this.name);
this.serverWeightTimer.cancel();
}
}
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(this.accumulatedWeights);
}
// 选择服务
@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
} else {
Server server = null;
while(server == null) {
// 当前服务器权重列表
List<Double> currentWeights = this.accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// 最后一个就是最大权重?可能已实现排序
double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);
if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
// 随机权重 = 选择(0.0~1.0之间)的随机数 * 最大权重
double randomWeight = this.random.nextDouble() * maxTotalWeight;
int n = 0;
// 遍历列表,取出第一个满足[>=随机权重]的服务器下标
for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
Double d = (Double)var13.next();
if (d >= randomWeight) {
serverIndex = n;
break;
}
}
// 获取服务
server = (Server)allList.get(serverIndex);
} else {
// 如果服务器中的最大权重过小 或 服务器列表与权重列表不匹配,则使用线性轮询
server = super.choose(this.getLoadBalancer(), key);
if (server == null) {
return server;
}
}
// 再次验证服务可用性
if (server == null) {
Thread.yield();
} else {
if (server.isAlive()) {
return server;
}
server = null;
}
}
return server;
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
}
// 内部类(服务器权重)
class ServerWeight {
ServerWeight() {
}
// 维护权重
public void maintainWeights() {
ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
if (lb != null) {
// 服务器权重分配已经在进行?没有就改为正在进行然后执行,finally会进行重置
if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
try {
WeightedResponseTimeRule.logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats != null) {
double totalResponseTime = 0.0D;
ServerStats ss;
// 统计所有服务器的(平均响应时间)总和
for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
Server server = (Server)var6.next();
ss = stats.getSingleServerStat(server);
}
Double weightSoFar = 0.0D;
// 保存更新的服务器权重列表
List<Double> finalWeights = new ArrayList();
Iterator var20 = nlb.getAllServers().iterator();
// 遍历服务器列表,当前权重 = (平均响应时间)总和 - 当前(平均响应时间)
while(var20.hasNext()) {
Server serverx = (Server)var20.next();
ServerStats ssx = stats.getSingleServerStat(serverx);
double weight = totalResponseTime - ssx.getResponseTimeAvg();
weightSoFar = weightSoFar + weight;
finalWeights.add(weightSoFar);
}
WeightedResponseTimeRule.this.setWeights(finalWeights);
return;
}
} catch (Exception var16) {
WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
return;
} finally {
WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
}
}
}
}
}
// 内部类(服务器权重维护timetask计划)
class DynamicServerWeightTask extends TimerTask {
DynamicServerWeightTask() {
}
public void run() {
WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception var3) {
WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
}
}
}
}
public class RetryRule extends AbstractLoadBalancerRule {
// 默认使用线性轮询规则
IRule subRule = new RoundRobinRule();
// 最大重试时间:默认0.5s
long maxRetryMillis = 500L;
public RetryRule() {
}
public RetryRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
}
public RetryRule(IRule subRule, long maxRetryMillis) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
this.maxRetryMillis = maxRetryMillis > 0L ? maxRetryMillis : 500L;
}
public void setRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
}
public IRule getRule() {
return this.subRule;
}
public void setMaxRetryMillis(long maxRetryMillis) {
if (maxRetryMillis > 0L) {
this.maxRetryMillis = maxRetryMillis;
} else {
this.maxRetryMillis = 500L;
}
}
public long getMaxRetryMillis() {
return this.maxRetryMillis;
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
this.subRule.setLoadBalancer(lb);
}
// 服务选择
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();// 开始时间
long deadline = requestTime + this.maxRetryMillis;// 截止时间
Server answer = null;
answer = this.subRule.choose(key);
// 在重试允许时间内,选择的服务不可用
if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
// 中断任务,在一个delay时间之后,中断当前线程
InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
// 如果线程未中断,就不断进行重试
while(!Thread.interrupted()) {
// 继续使用默认选择器选择服务
answer = this.subRule.choose(key);
// 如果服务可靠 或 超过截止时间,则跳出重试
if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
break;
}
// 服务不可用,让出CPU,等待下一次重试
Thread.yield();
}
task.cancel();// 中断计划取消
}
// 最后再次确认服务可靠性
return answer != null && answer.isAlive() ? answer : null;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
WeightedResponseTimeRule中有如下代码:
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
public String key() {
return "ServerWeightTaskTimerInterval";
}
public String toString() {
return this.key();
}
public Class<Integer> type() {
return Integer.class;
}
};
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
}
我应该可以修改serverWeightTaskTimerInterval自定义重试间隔时长。
serverWeightTaskTimerInterval,WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY与IClientConfig是什么关系?
查看DefaultClientConfigImpl的clientConfig.get()
public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon";
public <T> T get(IClientConfigKey<T> key, T defaultValue) {
T value = this.get(key);
if (value == null) {
value = defaultValue;
}
return value;
}
就是纯粹从IClientConfig里获取key为ServerWeightTaskTimerInterval的配置值
# 使用WeightedResponseTimeRule时,重新计算权重的时间间隔[默认30s]
ribbon.ServerWeightTaskTimerInterval: 5000
示例:
public class RetryWeightedRule extends AbstractLoadBalancerRule {
// 默认使用服务器权重规则
IRule subRule = new WeightedResponseTimeRule();
// 最大重试时间:默认0.5s
long maxRetryMillis = 500L;
public RetryWeightedRule() {
}
public RetryWeightedRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
}
public RetryWeightedRule(IRule subRule, long maxRetryMillis) {
this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
this.maxRetryMillis = maxRetryMillis > 0L ? maxRetryMillis : 500L;
}
public void setRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new WeightedResponseTimeRule());
}
public IRule getRule() {
return this.subRule;
}
public void setMaxRetryMillis(long maxRetryMillis) {
if (maxRetryMillis > 0L) {
this.maxRetryMillis = maxRetryMillis;
} else {
this.maxRetryMillis = 500L;
}
}
public long getMaxRetryMillis() {
return this.maxRetryMillis;
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
this.subRule.setLoadBalancer(lb);
}
// 服务选择
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();// 开始时间
long deadline = requestTime + this.maxRetryMillis;// 截止时间
Server answer = null;
answer = this.subRule.choose(key);
// 在重试允许时间内,选择的服务不可用
if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
// 中断任务,在一个delay时间之后,中断当前线程
InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
// 如果线程未中断,就不断进行重试
while(!Thread.interrupted()) {
// 继续使用默认选择器选择服务
answer = this.subRule.choose(key);
// 如果服务可靠 或 超过截止时间,则跳出重试
if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
break;
}
// 服务不可用,让出CPU,等待下一次重试
Thread.yield();
}
task.cancel();// 中断计划取消
}
// 最后再次确认服务可靠性
return answer != null && answer.isAlive() ? answer : null;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
((WeightedResponseTimeRule) subRule).initWithNiwsConfig(clientConfig);
}
}