67°

聊聊ribbon的ServerListSubsetFilter

本文主要研究一下ribbon的ServerListSubsetFilter

ServerListSubsetFilter

ribbon-loadbalancer-2.3.0-sources.jar!/com/netflix/loadbalancer/ServerListSubsetFilter.java

public class ServerListSubsetFilter<T extends Server> extends ZoneAffinityServerListFilter<T> implements IClientConfigAware, Comparator<T>{
private Random random = new Random();
private volatile Set&lt;T&gt; currentSubset = Sets.newHashSet(); 
private DynamicIntProperty sizeProp = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.size", 20);
private DynamicFloatProperty eliminationPercent = 
        new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);
private DynamicIntProperty eliminationFailureCountThreshold = 
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationFailureThresold", 0);
private DynamicIntProperty eliminationConnectionCountThreshold = 
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationConnectionThresold", 0);

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
    super.initWithNiwsConfig(clientConfig);
    sizeProp = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.size", 20);
    eliminationPercent = 
            new DynamicFloatProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);
    eliminationFailureCountThreshold = new DynamicIntProperty( clientConfig.getClientName() + "." + clientConfig.getNameSpace()
            + ".ServerListSubsetFilter.eliminationFailureThresold", 0);
    eliminationConnectionCountThreshold = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace()
            + ".ServerListSubsetFilter.eliminationConnectionThresold", 0);
}
    
/**
 * Given all the servers, keep only a stable subset of servers to use. This method
 * keeps the current list of subset in use and keep returning the same list, with exceptions
 * to relatively unhealthy servers, which are defined as the following:
 * &lt;p&gt;
 * &lt;ul&gt;
 * &lt;li&gt;Servers with their concurrent connection count exceeding the client configuration for 
 * {@code &lt;clientName&gt;.&lt;nameSpace&gt;.ServerListSubsetFilter.eliminationConnectionThresold} (default is 0)
 * &lt;li&gt;Servers with their failure count exceeding the client configuration for 
 * {@code &lt;clientName&gt;.&lt;nameSpace&gt;.ServerListSubsetFilter.eliminationFailureThresold} (default is 0)
 * &lt;li&gt;If the servers evicted above is less than the forced eviction percentage as defined by client configuration
 * {@code &lt;clientName&gt;.&lt;nameSpace&gt;.ServerListSubsetFilter.forceEliminatePercent} (default is 10%, or 0.1), the
 * remaining servers will be sorted by their health status and servers will worst health status will be
 * forced evicted.
 * &lt;/ul&gt;
 * &lt;p&gt;
 * After the elimination, new servers will be randomly chosen from all servers pool to keep the
 * number of the subset unchanged. 
 * 
 */
@Override
public List&lt;T&gt; getFilteredListOfServers(List&lt;T&gt; servers) {
    List&lt;T&gt; zoneAffinityFiltered = super.getFilteredListOfServers(servers);
    Set&lt;T&gt; candidates = Sets.newHashSet(zoneAffinityFiltered);
    Set&lt;T&gt; newSubSet = Sets.newHashSet(currentSubset);
    LoadBalancerStats lbStats = getLoadBalancerStats();
    for (T server: currentSubset) {
        // this server is either down or out of service
        if (!candidates.contains(server)) {
            newSubSet.remove(server);
        } else {
            ServerStats stats = lbStats.getSingleServerStat(server);
            // remove the servers that do not meet health criteria
            if (stats.getActiveRequestsCount() &gt; eliminationConnectionCountThreshold.get()
                    || stats.getFailureCount() &gt; eliminationFailureCountThreshold.get()) {
                newSubSet.remove(server);
                // also remove from the general pool to avoid selecting them again
                candidates.remove(server);
            }
        }
    }
    int targetedListSize = sizeProp.get();
    int numEliminated = currentSubset.size() - newSubSet.size();
    int minElimination = (int) (targetedListSize * eliminationPercent.get());
    int numToForceEliminate = 0;
    if (targetedListSize &lt; newSubSet.size()) {
        // size is shrinking
        numToForceEliminate = newSubSet.size() - targetedListSize;
    } else if (minElimination &gt; numEliminated) {
        numToForceEliminate = minElimination - numEliminated; 
    }
    
    if (numToForceEliminate &gt; newSubSet.size()) {
        numToForceEliminate = newSubSet.size();
    }

    if (numToForceEliminate &gt; 0) {
        List&lt;T&gt; sortedSubSet = Lists.newArrayList(newSubSet);           
        Collections.sort(sortedSubSet, this);
        List&lt;T&gt; forceEliminated = sortedSubSet.subList(0, numToForceEliminate);
        newSubSet.removeAll(forceEliminated);
        candidates.removeAll(forceEliminated);
    }
    
    // after forced elimination or elimination of unhealthy instances,
    // the size of the set may be less than the targeted size,
    // then we just randomly add servers from the big pool
    if (newSubSet.size() &lt; targetedListSize) {
        int numToChoose = targetedListSize - newSubSet.size();
        candidates.removeAll(newSubSet);
        if (numToChoose &gt; candidates.size()) {
            // Not enough healthy instances to choose, fallback to use the
            // total server pool
            candidates = Sets.newHashSet(zoneAffinityFiltered);
            candidates.removeAll(newSubSet);
        }
        List&lt;T&gt; chosen = randomChoose(Lists.newArrayList(candidates), numToChoose);
        for (T server: chosen) {
            newSubSet.add(server);
        }
    }
    currentSubset = newSubSet;       
    return Lists.newArrayList(newSubSet);            
}

/**
 * Randomly shuffle the beginning portion of server list (according to the number passed into the method) 
 * and return them.
 *  
 * @param servers
 * @param toChoose
 * @return
 */
private List&lt;T&gt; randomChoose(List&lt;T&gt; servers, int toChoose) {
    int size = servers.size();
    if (toChoose &gt;= size || toChoose &lt; 0) {
        return servers;
    } 
    for (int i = 0; i &lt; toChoose; i++) {
        int index = random.nextInt(size);
        T tmp = servers.get(index);
        servers.set(index, servers.get(i));
        servers.set(i, tmp);
    }
    return servers.subList(0, toChoose);        
}

/**
 * Function to sort the list by server health condition, with
 * unhealthy servers before healthy servers. The servers are first sorted by
 * failures count, and then concurrent connection count.
 */
@Override
public int compare(T server1, T server2) {
    LoadBalancerStats lbStats = getLoadBalancerStats();
    ServerStats stats1 = lbStats.getSingleServerStat(server1);
    ServerStats stats2 = lbStats.getSingleServerStat(server2);
    int failuresDiff = (int) (stats2.getFailureCount() - stats1.getFailureCount());
    if (failuresDiff != 0) {
        return failuresDiff;
    } else {
        return (stats2.getActiveRequestsCount() - stats1.getActiveRequestsCount());
    }
}

}

  • ServerListSubsetFilter继承了ZoneAffinityServerListFilter,实现了IClientConfigAware、Comparator接口
  • initWithNiwsConfig方法从IClientConfig读取了ServerListSubsetFilter.size、ServerListSubsetFilter.forceEliminatePercent、ServerListSubsetFilter.eliminationFailureThresold、ServerListSubsetFilter.eliminationConnectionThresold配置
  • getFilteredListOfServers方法会先调用父类ZoneAffinityServerListFilter的getFilteredListOfServers先过滤出zoneAffinityFiltered作为candidates,然后遍历currentSubset根据ServerStats剔除activeRequestsCount及failureCount超出阈值的server;然后根据numToForceEliminate,以及failureCount排序,使用subList方法得出forceEliminated,然后移除掉,最后randomChoose出来新的newSubSet,然后重置currentSubset

小结

在server list非常多的场景下,没有必要在连接池的保持这么多的连接,ServerListSubsetFilter可以在这种场景下对server list进行精简,通过剔除相对不健康(failureCount、activeRequestCount)的server来达到此目标

doc

本文由【go4it】发布于开源中国,原文链接:https://my.oschina.net/go4it/blog/3073583

全部评论: 0

    我有话说: