/** * AbstractLoadBalancer contains features required for most loadbalancing * implementations. * * An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes) * that are potentially bucketed based on a specific criteria. 2. A Class that * defines and implements a LoadBalacing Strategy via <code>IRule</code> 3. A * Class that defines and implements a mechanism to determine the * suitability/availability of the nodes/servers in the List. * * * @author stonse * */ publicabstractclassAbstractLoadBalancerimplementsILoadBalancer { publicenumServerGroup{ ALL, STATUS_UP, STATUS_NOT_UP } /** * delegate to {@link #chooseServer(Object)} with parameter null. */ public Server chooseServer() { return chooseServer(null); }
/** * List of servers that this Loadbalancer knows about * * @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP} */ publicabstract List<Server> getServerList(ServerGroup serverGroup); /** * Obtain LoadBalancer related Statistics */ publicabstract LoadBalancerStats getLoadBalancerStats(); }
BaseLoadBalancer
Base Load Balancer 类是 Abstract Load Balancer 的实现类或工具类,他可以用一个 List 集合(AllServerList)来保存所有服务实例,之后用另一个 List 保存(UpServerList)当前有效的服务实例:
/** * A utility Ping Implementation that returns whatever its been set to return * (alive or dead) * @author stonse * */ publicclassPingConstantimplementsIPing { booleanconstant=true;
最后通过 choose(Object key) 通过索引去服务列表获取服务,如果连续 10次没有获取到服务,则会返回: “No available alive servers after 10 tries from load balancer”。假设 可达服务器(reachableServers) 或 服务器总数(allServers) 为 0,则会输出: “No up servers available from load balancer” 错误。
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { returnnull; } Serverserver=null;
while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { returnnull; } List<Server> allList = lb.getAllServers();
intserverCount= allList.size();
if (serverCount == 0) { returnnull; }
intserverIndex=0;
// last one in the list is the sum of all weights doublemaxTotalWeight= currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) doublerandomWeight= random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex intn=0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } }
try { logger.info("Weight adjusting job started"); AbstractLoadBalancernlb= (AbstractLoadBalancer) lb; LoadBalancerStatsstats= nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } doubletotalResponseTime=0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStatsss= stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen DoubleweightSoFar=0.0; // create new list and hot swap the reference List<Double> finalWeights = newArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStatsss= stats.getSingleServerStat(server); doubleweight= totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); }
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { returnnull; } Serverserver=null;
while (server == null) { if (Thread.interrupted()) { returnnull; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers();
intserverCount= allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ returnnull; }
intindex= rand.nextInt(serverCount); server = upList.get(index);
if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; }
if (server.isAlive()) { return (server); }
// Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); }
return server;
}
通过choose 方法获取到服务器实例,如果负载均衡器和服务器为0则返回 null
首先他会通过 upList 以及 allList() 分别获取存活服务器列表和所有服务器列表,之后通过 Random 来生成一个随机数生成器
客户端配置启用线性轮询策略,从名字上可以看出他就是为了启用 RoundRobinRule 策略的,因此他的整个流程都是通过 RoundRobinRule 来使用负载均衡器。而 choose 方法则也是通过 RoundRobinRule 来实现的,如果没有使用他则会出现 :“This class has not been initialized with the RoundRobinRule class” 的报错。
@Override publicvoidsetLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); roundRobinRule.setLoadBalancer(lb); } @Override public Server choose(Object key) { if (roundRobinRule != null) { return roundRobinRule.choose(key); } else { thrownewIllegalArgumentException( "This class has not been initialized with the RoundRobinRule class"); } }
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) { List<Server> eligible = getEligibleServers(servers); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(random.nextInt(eligible.size()))); }
/** * Choose a server in a round robin fashion after the predicate filters a list of servers. Load balancer key * is presumed to be null. */ public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) { List<Server> eligible = getEligibleServers(servers); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size())); }
/** * Choose a random server after the predicate filters list of servers given list of servers and * load balancer key. * */ public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(random.nextInt(eligible.size()))); }
/** * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. */ public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size())); } publicstatic AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) { returnnewAbstractServerPredicate() { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP") publicbooleanapply(PredicateKey input) { return p.apply(input); } }; }
/** * Create an instance from a predicate. */ publicstatic AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) { returnnewAbstractServerPredicate() { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP") publicbooleanapply(PredicateKey input) { return p.apply(input.getServer()); } }; }