调度框架用来保存配置的插件,并在调度算法中被执行遍历插件,其实现了如下接口
type Framework interface {
//提供了插件获取调度框架里数据的接口和抢占接口,见下面注释
Handle
//返回排序函数,用在调度队列中,用来给pod排序
QueueSortFunc() LessFunc
//用来执行配置的PreFilter插件
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
//用来执行配置的PostFilter插件
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
//用来执行配置的PreBind插件
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//用来执行配置的PostBind插件
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//用来执行配置的Reserve插件的Reserve方法
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//用来执行配置的Reserve插件的Unreserve方法
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//用来执行配置的Permit插件
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//如果pod是一个等待pod,则堵塞,直到等待pod返回拒绝或者允许
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
//用来执行配置的Bind插件
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//返回是否配置了Filter插件
HasFilterPlugins() bool
//返回是否配置了PostFilter插件
HasPostFilterPlugins() bool
//返回是否配置了Score插件
HasScorePlugins() bool
//获取map,key为扩展点名字,value为扩展点上配置的插件
ListPlugins() *config.Plugins
//返回此framework的名字,由此可见,一个framework对应一个profile
ProfileName() string
}
type Handle interface {
//抽象了操作提名pod的函数,会在抢占流程中讲解
PodNominator
//抽象了运行插件的函数
PluginsRunner
SnapshotSharedLister() SharedLister
IterateOverWaitingPods(callback func(WaitingPod))
//根据pod UID获取WaitingPod,目前只在抢占插件中被调用
GetWaitingPod(uid types.UID) WaitingPod
//拒绝WaitingPod,即让WaitingPod不可调度,目前只在抢占插件中被调用
RejectWaitingPod(uid types.UID) bool
//获取和apiserver交互的接口,比如bind插件和apiserver交互来bind pod
ClientSet() clientset.Interface
KubeConfig() *restclient.Config
EventRecorder() events.EventRecorder
//有些插件需要从informer获取数据
SharedInformerFactory() informers.SharedInformerFactory
//为提名pod运行Filter插件,此函数在抢占流程和正常调度流程都会被调用,后面会详细讲解此函数
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
//暂时忽略
Extenders() []Extender
//获取并行个数
Parallelizer() parallelize.Parallelizer
}
//官网说此接口是抢占插件(PostFilter)用来评估当某些运行的pod被抢占后,调度成功的可行性。
//但目前RunPreScorePlugins和RunScorePlugins只用在正常调度,RunFilterPlugins用在正常调度和抢占流程,其他两个只用在抢占流程
type PluginsRunner interface {
//执行PreScore插件
RunPreScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) *Status
//执行Score插件
RunScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) (PluginToNodeScores, *Status)
//执行Filter插件
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
//调用插件的AddPod函数,只用在抢占流程
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
//调用插件的RemovePod函数,只用在抢占流程
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}
frameworkImpl为调度框架的具体实现,其实现了上面的接口
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
//支持的所有插件,保存后好像也没用到
registry Registry
snapshotSharedLister framework.SharedLister
//用来保存waitingPod
waitingPods *waitingPodsMap
//保存score插件的权重
scorePluginWeight map[string]int
//这里保存了每个扩展点上的插件
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
//和apiserver通信的接口
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
//informer获取数据接口
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
//一个profiler对应一个调用框架的实现frameworkImpl
profileName string
extenders []framework.Extender
framework.PodNominator
parallelizer parallelize.Parallelizer
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
}
调度框架初始化
//r为所有的插件
//profile为一个调度器的配置
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
options := defaultFrameworkOptions()
for _, opt := range opts {
opt(&options)
}
f := &frameworkImpl{
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
scorePluginWeight: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
runAllFilters: options.runAllFilters,
extenders: options.extenders,
PodNominator: options.podNominator,
parallelizer: options.parallelizer,
}
if profile == nil {
return f, nil
}
f.profileName = profile.SchedulerName
if profile.Plugins == nil {
return f, nil
}
...
//获取每个扩展点上使能的插件。
//因为一个插件可能在多个扩展点上,这里通过map结构进行去重,map的key为插件名字,value为插件,
//为什么去重?因为后面会调用factory初始化插件对象,一个插件初始化一次即可
pg := f.pluginsNeeded(profile.Plugins)
//将插件参数保存到pluginConfig,key为插件名字,value为插件参数
pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
for i := range profile.PluginConfig {
name := profile.PluginConfig[i].Name
if _, ok := pluginConfig[name]; ok {
return nil, fmt.Errorf("repeated config for plugin %s", name)
}
pluginConfig[name] = profile.PluginConfig[i].Args
}
...
//pluginsMap用来保存初始化后的插件对象
pluginsMap := make(map[string]framework.Plugin)
//遍历所有插件
for name, factory := range r {
//跳过没使能的插件
if _, ok := pg[name]; !ok {
continue
}
args := pluginConfig[name]
...
//调用插件的初始化函数,将f作为参数传给插件,这样插件就可以获取调度框架的数据
//比如pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go:New()
p, err := factory(args, f)
if err != nil {
return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
}
pluginsMap[name] = p
// Update ClusterEventMap in place.
fillEventToPluginMap(p, options.clusterEventMap)
}
//因为一个插件可能在多个扩展点,这里将初始化后的插件对象保存到各个扩展点上
for _, e := range f.getExtensionPoints(profile.Plugins) {
if err := updatePluginList(e.slicePtr, *e.plugins, pluginsMap); err != nil {
return nil, err
}
}
...
//排序插件个数不能为0
if len(f.queueSortPlugins) == 0 {
return nil, fmt.Errorf("no queue sort plugin is enabled")
}
//排序插件个数不能大于1,所以一个调度器只能使能一个排序插件
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
}
//bind插件个数不能为0
if len(f.bindPlugins) == 0 {
return nil, fmt.Errorf("at least one bind plugin is needed")
}
...
return f, nil
}
RunPreFilterPlugins
执行配置在PreFilter扩展点上的插件
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
//遍历扩展点上的插件
for _, pl := range f.preFilterPlugins {
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
status.SetFailedPlugin(pl.Name())
if status.IsUnschedulable() {
return status
}
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
}
}
return nil
}
RunFilterPlugins
执行配置在Filter扩展点上的插件
func (f *frameworkImpl) RunFilterPlugins(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
statuses := make(framework.PluginToStatus)
//遍历扩展点上的插件
for _, pl := range f.filterPlugins {
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !pluginStatus.IsSuccess() {
if !pluginStatus.IsUnschedulable() {
// Filter plugins are not supposed to return any status other than
// Success or Unschedulable.
errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
return map[string]*framework.Status{pl.Name(): errStatus}
}
pluginStatus.SetFailedPlugin(pl.Name())
statuses[pl.Name()] = pluginStatus
if !f.runAllFilters {
// Exit early if we don't need to run all filters.
return statuses
}
}
}
return statuses
}
RunFilterPluginsWithNominatedPods
这里先说几个概念
successpod: 调度成功,并且bind到node成功的pod
assumepod:调度成功,假定pod运行在node上,bind异步执行中
failedpod:调度失败的pod
NominatedPods:当一个pod调度失败后,在PostFilter中执行抢占插件流程,看是否能抢占某个node上的其他低优先级的pod,如果能抢占成功,则此pod在后面调度周期很可能会被调度到此node上,这种pod被称为NominatedPods,NominatedPods虽然还没真正运行在抢占的node上,但后续的调度周期必须考虑这些pod。NominatedPods肯定是failedpod
victimpod:牺牲的pod,这些pod是被高优先级pod抢占的pod,在运行的node上被驱逐,再重新调度
此函数在正常调度流程和抢占流程被调用,目的是确认目标pod(参数pod)是否能运行在指定node(参数info)上。
正常调度被调用时:在nodeinfo(successpod+assumepod)中加上比目标pod优先级高的NominatedPods后,对目标pod执行Filter扩展点上的插件,如果返回失败,则目标pod肯定不能运行在指定node上。
抢占流程被调用时:在nodeinfo(successpod+assumepod-victimpod)中加上比目标pod优先级高的NominatedPods后,对目标pod执行Filter扩展点上的插件,如果返回失败,则目标pod肯定不能运行在指定node上。
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
// for nominated pod on the given node.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is
// schedulable on the node with all the existing pods on the node plus higher
// and equal priority pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption
// and add the nominated pods. Removal of the victims is done by
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
// NodeInfo before calling this function.
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
var status *framework.Status
podsAdded := false
//有些场景下需要执行两次filters. 如果node上有比目标pod优先级高或者相等的nominated pod,会将这些pod资源添加到node上,执行filters。
//如果所有的filters都通过了,还要不加这些nominatedpod资源,再次执行filters,因为有些filters(比如pod亲和性filter)在
//不加nominated pod时可能会失败。
//如果没有比目标pod优先级高或者相等的nominated pod或者第一次执行失败了,不会再执行第二次。
//这里为什么只考虑比目标pod优先级高或者相等的nominated pod呢,因为这些pod本身不占用node资源,并且在抢占流程中会将这些pod的提名node删除,
//从而进行正常调度,可参考函数prepareCandidate。
//为什么要执行两次来保证目标pod在加和不加nominated pod时都能被调度呢,这是一个保守的决定:有些filter(比如pod间反亲和性filter)可能在
//加了nominated pod时失败,有些filter(比如pod间亲和性filter)可能在不加nominated pod时失败。我们不能假定nominated pod已经运行在node
//上了,因为他们还没真正调度到此node上,而且最终还有可能被调度到其他node上。
for i := 0; i < 2; i++ {
stateToUse := state
nodeInfoToUse := info
//第一轮执行,加上抢占成功的pod
if i == 0 {
var err error
//将node上优先级比目标pod高的抢占成功的pod的资源考虑进去保存到nodeInfoToUse
//如果有这样的pod,则podsAdded为true
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
if err != nil {
return framework.AsStatus(err)
}
//如果没有抢占pod,或者第一轮执行失败
} else if !podsAdded || !status.IsSuccess() {
break
}
statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() {
return status
}
}
return status
}