type Framework interface {
QueueSortFunc() LessFunc
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
HasFilterPlugins() bool
HasPostFilterPlugins() bool
HasScorePlugins() bool
ListPlugins() *config.Plugins
ProfileName() string
type Handle interface {
SnapshotSharedLister() SharedLister
IterateOverWaitingPods(callback func(WaitingPod))
//根据pod UID获取WaitingPod,目前只在抢占插件中被调用
GetWaitingPod(uid types.UID) WaitingPod
RejectWaitingPod(uid types.UID) bool
//获取和apiserver交互的接口,比如bind插件和apiserver交互来bind pod
ClientSet() clientset.Interface
KubeConfig() *restclient.Config
EventRecorder() events.EventRecorder
SharedInformerFactory() informers.SharedInformerFactory
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
Extenders() []Extender
Parallelizer() parallelize.Parallelizer
type PluginsRunner interface {
RunPreScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) *Status
RunScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) (PluginToNodeScores, *Status)
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
registry Registry
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
scorePluginWeight map[string]int
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
profileName string
extenders []framework.Extender
parallelizer parallelize.Parallelizer
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
options := defaultFrameworkOptions()
for _, opt := range opts {
f := &frameworkImpl{
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
scorePluginWeight: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
runAllFilters: options.runAllFilters,
extenders: options.extenders,
PodNominator: options.podNominator,
parallelizer: options.parallelizer,
if profile == nil {
return f, nil
f.profileName = profile.SchedulerName
if profile.Plugins == nil {
return f, nil
pg := f.pluginsNeeded(profile.Plugins)
pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
for i := range profile.PluginConfig {
name := profile.PluginConfig[i].Name
if _, ok := pluginConfig[name]; ok {
return nil, fmt.Errorf("repeated config for plugin %s", name)
pluginConfig[name] = profile.PluginConfig[i].Args
pluginsMap := make(map[string]framework.Plugin)
for name, factory := range r {
if _, ok := pg[name]; !ok {
args := pluginConfig[name]
p, err := factory(args, f)
if err != nil {
return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
pluginsMap[name] = p
// Update ClusterEventMap in place.
fillEventToPluginMap(p, options.clusterEventMap)
for _, e := range f.getExtensionPoints(profile.Plugins) {
if err := updatePluginList(e.slicePtr, *e.plugins, pluginsMap); err != nil {
return nil, err
if len(f.queueSortPlugins) == 0 {
return nil, fmt.Errorf("no queue sort plugin is enabled")
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
if len(f.bindPlugins) == 0 {
return nil, fmt.Errorf("at least one bind plugin is needed")
return f, nil
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
for _, pl := range f.preFilterPlugins {
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
if status.IsUnschedulable() {
return status
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
return nil
func (f *frameworkImpl) RunFilterPlugins(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
statuses := make(framework.PluginToStatus)
for _, pl := range f.filterPlugins {
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !pluginStatus.IsSuccess() {
if !pluginStatus.IsUnschedulable() {
// Filter plugins are not supposed to return any status other than
// Success or Unschedulable.
errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
return map[string]*framework.Status{pl.Name(): errStatus}
statuses[pl.Name()] = pluginStatus
if !f.runAllFilters {
// Exit early if we don't need to run all filters.
return statuses
return statuses
successpod: 调度成功,并且bind到node成功的pod
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
// for nominated pod on the given node.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is
// schedulable on the node with all the existing pods on the node plus higher
// and equal priority pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption
// and add the nominated pods. Removal of the victims is done by
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
// NodeInfo before calling this function.
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
var status *framework.Status
podsAdded := false
//有些场景下需要执行两次filters. 如果node上有比目标pod优先级高或者相等的nominated pod,会将这些pod资源添加到node上,执行filters。
//不加nominated pod时可能会失败。
//如果没有比目标pod优先级高或者相等的nominated pod或者第一次执行失败了,不会再执行第二次。
//这里为什么只考虑比目标pod优先级高或者相等的nominated pod呢,因为这些pod本身不占用node资源,并且在抢占流程中会将这些pod的提名node删除,
//为什么要执行两次来保证目标pod在加和不加nominated pod时都能被调度呢,这是一个保守的决定:有些filter(比如pod间反亲和性filter)可能在
//加了nominated pod时失败,有些filter(比如pod间亲和性filter)可能在不加nominated pod时失败。我们不能假定nominated pod已经运行在node
for i := 0; i < 2; i++ {
stateToUse := state
nodeInfoToUse := info
if i == 0 {
var err error
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
if err != nil {
return framework.AsStatus(err)
} else if !podsAdded || !status.IsSuccess() {
statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() {
return status
return status