kube-scheduler基于Kubernetes管理的资源(比如CPU、内存、PV等)进行调度,但是,当需要对在Kubernetes外部管理的资源进行调度时,在Extender之前没有任何机制可以做到这一点。有一种方法可以让kubernetes具有扩展性,就是增加HTTP调度扩展程序。
有三种方法可以向kube-scheduler添加新的调度规则:
第一种和第二种方法对初学者有很高的要求,调度插件必须用go编写,并与kube-scheduler一起编译。开发一个自定义的kube-scheduler如果需要支持所有功能(例如扩展,亲和力,污点),则需要大量的工作量。第三种方法虽然也有缺点,例如性能低下和缓存不一致,但它也有几个优点:
本文介绍第三种方法,如果不关心延迟和缓存一致性,并且不想维护自定义构建的kube-scheduler,那么这种方法可能是更好的选择。如果关心性能并希望最大程度地自定义kube-scheduler,那么开发新调度插件可能是更好的选择。
本文参考了调度扩展程序官方设计文档,感兴趣的读者可以阅读原文。本文引用源码为kubernetes的release-1.21分支。
对于kube-scheduler,需要定义调度扩展程序的接口,就是Extender接口,源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/framework/extender.go#L27
type Extender interface {
// 调度扩展程序的唯一名字,因为可能会有多个调度扩展程序。
Name() string
// Filter()和FilterPlugin.Filter()类似,不同的是传入了全部的Node,而插件传入的是一个Node,这个是出于调用效率考虑的,毕竟是远程调用。
// 因为参数与过滤插件不同,所以返回值也略有不同,返回了已过滤的Node(通过过滤)和过滤失败(未通过过滤)的Node。
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// Prioritize()这接口名字是有历史原因的,因为以前的调度器分为‘predicate’和‘prioritize’两个阶段,对应调度插件的Filter和Score。
// Prioritize()接口要求输入Pod以及Filter()返回的Node集合,输出所有Node的评分(hostPriorities)以及调度扩展程序的权重。
// 这样kube-scheduler就可以将所有扩展程序返回的分数乘以权重再累加起来,这一点和调度插件原理是一样的。
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
// Bind()与BindPlugin.Bind()功能一样,只是参数的差异,了解DefaultBinder.Bind()读者应该知道,该函数最终将接口参数转换成了v1.Binding类型在执行绑定的。
Bind(binding *v1.Binding) error
// 告诉kube-scheduler调度扩展程序是否有绑定能力,如果有绑定能力kube-scheduler会优先用调度扩展程序绑定。
// 需要注意: kube-scheduler会优先用调度扩展程序绑定还有一个条件,那就是Pod有些资源是由Extender管理。
IsBinder() bool
// 判断Pod是否有任何资源是被Extender管理的,因为有资源被Extender管理交给它绑定才有意义,否则不如直接用默认的绑定插件。
IsInterested(pod *v1.Pod) bool
// Extender的抢占调度接口,传入待调度Pod,�Node和被强占的Pod候选‘nodeNameToVictims’,key是node名字,value是node上被强占的Pod。
// 有同学肯定会问,不是让Extender执行抢占调度么?哪来的Node和被强占的Pod候选?这些不应该是ProcessPreemption()返回的么?
// 这是因为DefaultPreemption(唯一的抢占调度插件)在调用Extender.ProcessPreemption()之前已经执行了一部分抢占调度的来降低
// Extender.ProcessPreemption()候选的数量,毕竟想要实现抢占调度既要满足调度插件的需求也要满足Extender的要求。
// 所以先用调度插件选出一部分候选,可以减少不必要的数据传输,因为这是http调用。关于抢占调度的实现笔者会单独写一个文件解析。
// ProcessPreemption()返回的结果可能:1)nodeNameToVictims的子集;2)候选Node上不同的被强占Pod集合
ProcessPreemption(
pod *v1.Pod,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos NodeInfoLister,
) (map[string]*extenderv1.Victims, error)
// 告知kube-scheduler是否具有抢占调度的能力。
SupportsPreemption() bool
// 告知kube-scheduler如果Extender不可用是否忽略,如果忽略,kube-scheduler不会返回错误。
// 因为Extender的实现是HTTP服务,所以不可用是一种正常现象。
IsIgnorable() bool
}
调度Pod时,扩展程序允许外部进程对节点进行过滤和评分(对应于调度插件的Filter和Score)。向调度扩展程序程序发出了两个独立的http/https调用,一个用于“过滤”操作,一个用于“评分”操作。如果无法调度Pod,则kube-scheduler将尝试抢占节点中优先级较低的Pod,并将其发送给调度扩展程序“preempt”动词(如果已配置)。调度扩展程序可以将Node和新的被强占Pod子集返回给调度程序。此外,调度扩展程序可以选择通过实现“绑定”操作将Pod绑定到apiserver。
Extender是kube-scheduler抽象的调度扩展程序的接口,而调度扩展程序的实现是一个HTTP服务,也就是Extender的一些接口都对应的是远程调用。所以把Extender看做一个RPC也是可以的,既然是RPC,总要有客户端(Client)的实现,熟悉GRPC的读者应该都懂得哈~
此时就要引入HTTPExtender这个类型了,它实现了Extender,将Extender的一些接口转换为HTTP请求,所以是调度扩展程序的客户端。源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/core/extender.go#L42
// HTTPExtender实现了Extender接口
type HTTPExtender struct {
// 调度扩展程序的URL,比如https://127.0.0.1:8080。
extenderURL string
// xxxVerb是HTTPExtender.Xxx()接口的HTTP请求的URL,比如https://127.0.0.1:8080/'preemptVerb' 用于ProcessPreemption()接口。
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
// 调度扩展程序的权重,用来与ScorePlugin计算出最终的分数
weight int64
// HTTP客户端
client *http.Client
// 调度扩展程序是否缓存了Node信息,如果调度扩展程序已经缓存了集群中所有节点的全部详细信息,那么只需要发送非常少量的Node信息即可,比如Node名字。
// 毕竟是HTTP调用,想法设法提升效率。但是为什么有podCacheCapable?这就要分析一下HTTPExtender发送的数据包括哪些了?
// 1. 待调度的Pod
// 2. Node(候选)
// 3. 候选Node上的候选Pod(仅抢占调度)
// 试想一下每次HTTP请求中Pod(包括候选Pod)可能不是不同的,而Node呢?有的请求可能会有不同,但于Filter请求因为需要的是Node全量,所以基本是相同。
// 会造成较大的无效数据传输,所以当调度扩展程序能够缓存Node信息时,客户端只需要传输很少的信息就可以了。
nodeCacheCapable bool
// 调度扩展程序管理的资源名称
managedResources sets.String
// 如果调度扩展程序不可用是否忽略
ignorable bool
}
Extender的配置与基本上与HTTPExtender一一对应,所以可以推测HTTPExtender的构造函数主要是通过配置赋值的过程,源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/core/extender.go#L86
func NewHTTPExtender(config *schedulerapi.Extender) (framework.Extender, error) {
// 没有配置超时,就用默认超时,5秒钟
if config.HTTPTimeout.Duration.Nanoseconds() == 0 {
config.HTTPTimeout.Duration = time.Duration(DefaultExtenderTimeout)
}
// 创建http.Client,makeTransport读者有兴趣自己看一下,跟大部分人用法一样
transport, err := makeTransport(config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout.Duration,
}
// 管理的资源从slice转为map[string]struct{}
managedResources := sets.NewString()
for _, r := range config.ManagedResources {
managedResources.Insert(string(r.Name))
}
// 各种通过配置赋值
return &HTTPExtender{
extenderURL: config.URLPrefix,
preemptVerb: config.PreemptVerb,
filterVerb: config.FilterVerb,
prioritizeVerb: config.PrioritizeVerb,
bindVerb: config.BindVerb,
weight: config.Weight,
client: client,
nodeCacheCapable: config.NodeCacheCapable,
managedResources: managedResources,
ignorable: config.Ignorable,
}, nil
}
要使用调度扩展程序,必须创建一个kube-scheduler配置文件,将上面的配置加进去。是不是感觉有点土?每次加一个调度扩展程序都要重启一下kube-scheduler,调度扩展程序将配置写入一个configmap,kube-scheudler监视(watch)配置,然后动态的添加、删除、更新它不香么?这个套路是笔者在实际项目中最常用的方法,保不齐哪天笔者会向社区提交一个可以动态配置调度扩展程序的PR!
Kubernetes定义了各种请求(xxxVerb)的参数以及结果的类型,在https://github.com/kubernetes/kube-scheduler/blob/release-1.21/extender/v1/types.go中,本文就不一一列举了。所有的请求参数和结果都是序列化为JSON格式放在HTTP的Body中,这一点可以从send()函数看出来,源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/core/extender.go#L413
func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
// 将请求参数(比如filter和prioritize请求是ExtenderArgs,preempt请求是ExtenderPreemptionArgs)序列化为JSON格式。
out, err := json.Marshal(args)
if err != nil {
return err
}
// 格式化请求的最终URL
url := strings.TrimRight(h.extenderURL, "/") + "/" + action
// 创建HTTP请求(看来POST还是比较香的),并将JSON格式的参数放到Body中
req, err := http.NewRequest("POST", url, bytes.NewReader(out))
if err != nil {
return err
}
// 内容格式是JSON
req.Header.Set("Content-Type", "application/json")
// 发送HTTP请求
resp, err := h.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 检查HTTP的状态码,如果不是200就返回错误
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
}
// 解析Body中的结果(比如filter请求是ExtenderFilterResult,prioritize请求是HostPriorityList,preempt请求是ExtenderPreemptionResult)
return json.NewDecoder(resp.Body).Decode(result)
}
看到send函数的实现,基本可以推测Filter()、Prioritize()以及ProcessPreemption()的实现,但是笔者还是象征性的对代码做一下简单的注释。本文不对ProcessPreemption()做注释,因为笔者一直都在说有一个单独的文章分析kube-scheduler的抢占调度的实现,ProcessPreemption()的实现会放到那个文章中分析,感兴趣的读者自己可以看看。
不废话了,直接上代码,源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/core/extender.go#L275
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node,
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
var (
result extenderv1.ExtenderFilterResult
nodeList *v1.NodeList
nodeNames *[]string
nodeResult []*v1.Node
args *extenderv1.ExtenderArgs
)
// 将[]*v1.Node转为map[string]*v1.Node,当调度扩展程序缓存了Node信息,返回的结果只有Node名字。
// fromNodeName用于根据Node名字快速查找对应的Node
fromNodeName := make(map[string]*v1.Node)
for _, n := range nodes {
fromNodeName[n.Name] = n
}
// 如果没有配置filterVerb,说明调度扩展程序不支持Filter,所以直接返回
if h.filterVerb == "" {
return nodes, extenderv1.FailedNodesMap{}, nil
}
if h.nodeCacheCapable {
// 如果调度扩展程序缓存了Node信息,则参数中只需要设置Node的名字
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
// 如果调度扩展程序没有缓存Node信息,就只能把全量的Node放在参数中
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
// 构造HTTP请求参数
args = &extenderv1.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 发送HTTP请求
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
}
// 如果调度扩展程序缓存Node信息并且结果中设置了Node名字,那么前面[]*v1.Node转为map[string]*v1.Node就用上了
if h.nodeCacheCapable && result.NodeNames != nil {
nodeResult = make([]*v1.Node, len(*result.NodeNames))
// 根据返回结果的Node名字找到Node并输出
for i, nodeName := range *result.NodeNames {
if n, ok := fromNodeName[nodeName]; ok {
nodeResult[i] = n
} else {
return nil, nil, fmt.Errorf(
"extender %q claims a filtered node %q which is not found in the input node list",
h.extenderURL, nodeName)
}
}
} else if result.Nodes != nil {
// 直接从结果中获取Node
nodeResult = make([]*v1.Node, len(result.Nodes.Items))
for i := range result.Nodes.Items {
nodeResult[i] = &result.Nodes.Items[i]
}
}
return nodeResult, result.FailedNodes, nil
}
不废话了,直接上代码,源码链接:https://github.com/kubernetes/kubernetes/blob/release-1.21/pkg/scheduler/core/extender.go#L345
func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) {
var (
result extenderv1.HostPriorityList
nodeList *v1.NodeList
nodeNames *[]string
args *extenderv1.ExtenderArgs
)
// 如果没有配置prioritizeVerb,说明调度扩展程序不支持Prioritize,所以直接返回
if h.prioritizeVerb == "" {
result := extenderv1.HostPriorityList{}
for _, node := range nodes {
result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0})
}
return &result, 0, nil
}
// 这部分和Filter()的一样,不重复注释了
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
// 构造HTTP请求参数
args = &extenderv1.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 发送HTTP请求
if err := h.send(h.prioritizeVerb, args, &result); err != nil {
return nil, 0, err
}
return &result, h.weight, nil
}