当前位置: 首页 > 工具软件 > kube-score > 使用案例 >

Kube-Scheduler配置与源码分析

齐望
2023-12-01

scheduler配置与源码分析

配置参数

启动参数

在kubernetes中,组件的配置文件格式都推荐使用yaml格式,scheduler也不例外。下面是scheduler启动时所需要的全局配置类。

// scheduler的全局配置
type Options struct {
	// The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
	ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
	SecureServing           *apiserveroptions.SecureServingOptionsWithLoopback
	CombinedInsecureServing *CombinedInsecureServingOptions
	Authentication          *apiserveroptions.DelegatingAuthenticationOptions
	Authorization           *apiserveroptions.DelegatingAuthorizationOptions
	Metrics                 *metrics.Options
	Logs                    *logs.Options
	Deprecated              *DeprecatedOptions
	// 配置文件地址
	ConfigFile string
	// WriteConfigTo is the path where the default configuration will be written.
	WriteConfigTo string
	Master string
}

//启动配置参数
type KubeSchedulerConfiguration struct {
   metav1.TypeMeta
   // Pod 的算法的并发 默认16
   Parallelism int32
   // 领导选举客户端的配置。
   LeaderElection componentbaseconfig.LeaderElectionConfiguration
   // kubeconfig文件和客户端连接
   ClientConnection componentbaseconfig.ClientConnectionConfiguration
   // 健康检查服务器的 IP 地址和端口
   // defaulting to 0.0.0.0:10251
   HealthzBindAddress string
   // 状态的IP地址和端口
   // serve on, defaulting to 0.0.0.0:10251.
   MetricsBindAddress string

   // 保存 Debugging 相关功能的配置保存 Debugging 相关功能的配置
   componentbaseconfig.DebuggingConfiguration

   // 值会转化成百分比,可行的所有节点的百分比,比如500个节点,设置30就是150个节点,然后在从这150个节点中使用调度算法判断。
   PercentageOfNodesToScore int32
   // 这两个是调度队列的初始/最大退避时间
   PodInitialBackoffSeconds int64
   PodMaxBackoffSeconds int64
   // kube-scheduler 支持的调度配置文件 这个也是重点
   Profiles []KubeSchedulerProfile
   //调度器扩展器的列表
   Extenders []Extender
}

KubeSchedulerConfiguration是整个scheduler配置信息,在scheduler中可以存在多个调度器,那么每个调度器的信息都存在于Profiles属性中

type KubeSchedulerProfile struct {
  // 调度器的名称
   SchedulerName *string `json:"schedulerName,omitempty"`
  // 该调度器中所有插件
   Plugins *Plugins `json:"plugins,omitempty"`
  // 插件配置
   PluginConfig []PluginConfig `json:"pluginConfig,omitempty"`
}

源码分析

源码会删除大量代码,只会保留关键语句

如果想要深入了解请访问https://gitee.com/meng_mengs_boys/kubernetes-1.21.5#you-have-a-working-docker-environment,这是本人写的注释

请牢记上面的配置信息,这样便于在阅读代码时知其所以然。

配置初始化

在启动时,第一步必然是对配置信息进行解析,校验,填充等工作。

// 创建cmd,使用go的基本库
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
  // 创建Scheduler所需要的配置信息并初始化配置
   opts, err := options.NewOptions()
   cmd := &cobra.Command{
      Run: func(cmd *cobra.Command, args []string) {
         // 运行Scheduler
         if err := runCommand(cmd, opts, registryOptions...); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
         }
      },
   }
}

func NewOptions() (*Options, error) {
	// 一定要进入该方法分析一遍,该方法初始化默认调度器信息
	cfg, err := newDefaultComponentConfig()
	// 解析健康检查的IP与端口号
	hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
	// 创建配置类
	o := &Options{
		ComponentConfig: *cfg,
		// 安全认证信息类
		SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
		// 为 healthz 和 metrics 设置了两个不安全的监听器
		CombinedInsecureServing: &CombinedInsecureServingOptions{
			Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{
				BindNetwork: "tcp",
			}).WithLoopback(),
			Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{
				BindNetwork: "tcp",
			}).WithLoopback(),
			BindPort:    hport,
			BindAddress: hhost,
		},
		// 权限配置
		Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
		Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
		Metrics: metrics.NewOptions(),
		// 日志信息
		Logs: logs.NewOptions(),
	}

	return o, nil
}

我们重点关注newDefaultComponentConfig(),该方法会创建KubeSchedulerConfiguration类以及对其进行初始化操作,该操作会创建

v1beta1.KubeSchedulerProfile{
  SchedulerName: "default-scheduler"
}

只是指定了调度器的名称,但是通过该名称就可以检索出需要的调度器,在pod创建时如果没有指定调度器,则会使用它。所以default-scheduler相当于整个调度器的核心,这也是本文为什么强调一定要分析的原因。

Setup()与Run()

现在要分析runCommand()里的方法,在RunCommand方法里总共有两个函数Setup()、Run()。

接下来让我们一一分析

Setup()的主要功能是根据KubeSchedulerConfiguration创建调度类,对Profiles中的每个调度类进行初始化,创建插件,绑定插件配置。

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
   //配置校验
   if errs := opts.Validate(); len(errs) > 0 {
      return nil, nil, utilerrors.NewAggregate(errs)
   }
   // 创建kube客户端、创建EventBroadcaster、创建InformerFactory监听所有POD信息
   c, err := opts.Config()
   // 认证创建
   cc := c.Complete()
   recorderFactory := getRecorderFactory(&cc)
   completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
   // 创建调度器,这里我们可以看到调用了很多方法,不要紧张,他们都是配置信息,其意思就是上面配置类中属性的意思,我们可以直接忽略
   sched, err := scheduler.New(cc.Client,
      cc.InformerFactory,
      recorderFactory,
      ctx.Done(),
     ...),
   )
}

那么接下来让我们进入Scheduler创建

// New returns a Scheduler
func New(client clientset.Interface,
   informerFactory informers.SharedInformerFactory,
   recorderFactory profile.RecorderFactory,
   stopCh <-chan struct{},
   opts ...Option) (*Scheduler, error) {
   // 对调度器配置进行初始化操作
   options := defaultSchedulerOptions
   // 通过传入的配置信息更新调度器配置
   for _, opt := range opts {
      opt(&options)
   }
   //创建调度器缓存
   schedulerCache := internalcache.New(30*time.Second, stopEverything)
   //创建插件注册表,就是一个map key为插件的名称,value为插件创建方法
   registry := frameworkplugins.NewInTreeRegistry()
   //对于自定义插件进行合并
   if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
      return nil, err
   }
   //创建快照 用来保存集群中节点快照信息
   snapshot := internalcache.NewEmptySnapshot()
   configurator := &Configurator{
      client:                   client,
      recorderFactory:          recorderFactory,
      informerFactory:          informerFactory,
      schedulerCache:           schedulerCache,
      StopEverything:           stopEverything,
      percentageOfNodesToScore: options.percentageOfNodesToScore,
      podInitialBackoffSeconds: options.podInitialBackoffSeconds,
      podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
      profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
      registry:                 registry,
      nodeInfoSnapshot:         snapshot,
      extenders:                options.extenders,
      frameworkCapturer:        options.frameworkCapturer,
      parallellism:             options.parallelism,
   }
   metrics.Register()
   var sched *Scheduler
   //默认这里为 SchedulerAlgorithmSource{
   //    Provider: “DefaultProvider”,
   // },
   source := options.schedulerAlgorithmSource
   switch {
   case source.Provider != nil:
      // 所以会进入这里
      // 这个Provider有什么用那?我们进入该方法去看看
      sc, err := configurator.createFromProvider(*source.Provider)
      if err != nil {
         return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
      }
      sched = sc
   return sched, nil
}
  
  
 // 返回调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
	klog.V(2).InfoS("Creating scheduler from algorithm provider", "algorithmProvider", providerName)
	// 创建插件注册表
	r := algorithmprovider.NewRegistry()
	// 根据providerName获取当前使用的插件集合,比如&schedulerapi.Plugins{
	//	QueueSort: schedulerapi.PluginSet{
	//		Enabled: []schedulerapi.Plugin{
	//			{Name: queuesort.Name},
	//		},
	//	},
	defaultPlugins, exist := r[providerName]
	// 将插件集合存入KubeSchedulerProfile.Plugins中
	for i := range c.profiles {
		prof := &c.profiles[i]
		plugins := &schedulerapi.Plugins{}
		plugins.Append(defaultPlugins)
		plugins.Apply(prof.Plugins)
		prof.Plugins = plugins
	}
	// 创建调度器
	return c.create()
}

上面的插件可能有些迷,这里需要讲解一下,首先在前面New()方法里,有一步frameworkplugins.NewInTreeRegistry(),该方法是获取所有的插件信息(比如排序所需要的插件、过滤节点时所需要的插件)。

algorithmprovider.NewRegistry()这里是获取要使用的插件集合,所以frameworkplugins.NewInTreeRegistry()包含algorithmprovider.NewRegistry()

// 创建调度器
func (c *Configurator) create() (*Scheduler, error) {
   // 返回一个map,key为SchedulerName,string,这里为DefaultScheduler,value为framework.Framework类型
   // 该类型就是插件式架构接口
   profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
     ...
   )
   }
   // 获取插件集合中,关于pod排序的插件方法
   lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()

   // pod队列,用于缓存pod、对pod进行排序。Scheduler会从该队列中获取pod信息然后分配节点
   podQueue := internalqueue.NewSchedulingQueue(
      lessFn,
      c.informerFactory,
      ...
   )
   //记录了在过滤过程中node遍历的index,存储了node快照信息。
   algo := core.NewGenericScheduler(
      c.schedulerCache,
      c.nodeInfoSnapshot,
      extenders,
      c.percentageOfNodesToScore,
   )
   return &Scheduler{
      SchedulerCache:  c.schedulerCache,
      Algorithm:       algo,
      Profiles:        profiles,
      NextPod:         internalqueue.MakeNextPodFunc(podQueue),
      Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
      StopEverything:  c.StopEverything,
      SchedulingQueue: podQueue,
   }, nil
}

我们主要关注profiles,该元素类型为map,key为SchedulerName,string,这里为DefaultScheduler,value为framework.Framework类型。

Framework

Framework?是什么那,负责初始化和运行调度器的组件,啥意思?就是接受到pod后对节点的过滤、打分、部署等一些列操作都由它来完成。

既然它这么厉害,就让我们看看它的真面目

type Framework interface {
   Handle
   //pod队列排序函数
   QueueSortFunc() LessFunc
   // 运行过滤节点前的函数
   RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
   // 运行过滤节点函数
   RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus
   // 运行过滤节点后的函数
   RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
   // 运行打分前的函数
   RunPreScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
   // 运行打分的函数
   RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status)
   // 运行bind前 就是为pod绑定节点的函数
   RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
   //   运行bind后 就是为pod绑定节点的函数
   RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
   // 运行bind
   RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
   // 所有插件
   ListPlugins() map[string][]config.Plugin
   // 调度器的名称 比如DefaultScheduler
   ProfileName() string
}

通过上面的函数意思,我们可以猜到,pod调度周期为,QueueSortFunc->RunPreFilterPlugins->RunFilterPlugins->RunPostFilterPlugins->RunPreScorePlugins->RunScorePlugins->RunPreBindPlugins->RunBindPlugins->RunPostBindPlugins

让我们查看一下它的实现类

type frameworkImpl struct {
  // 插件注册表
   registry              Registry
  // 快照监听
   snapshotSharedLister  framework.SharedLister
  // pod缓存同步等待
   waitingPods           *waitingPodsMap
  // score需要的插件优先级
   pluginNameToWeightMap map[string]int
  // pod队列排序插件
   queueSortPlugins      []framework.QueueSortPlugin
   preFilterPlugins      []framework.PreFilterPlugin
   filterPlugins         []framework.FilterPlugin
   postFilterPlugins     []framework.PostFilterPlugin
   preScorePlugins       []framework.PreScorePlugin
   scorePlugins          []framework.ScorePlugin
   reservePlugins        []framework.ReservePlugin
   preBindPlugins        []framework.PreBindPlugin
   bindPlugins           []framework.BindPlugin
   postBindPlugins       []framework.PostBindPlugin
   permitPlugins         []framework.PermitPlugin
   clientSet       clientset.Interface
   eventRecorder   events.EventRecorder
   informerFactory informers.SharedInformerFactory
   metricsRecorder *metricsRecorder
   profileName     string
   extenders []framework.Extender
   framework.PodNominator
   parallelizer parallelize.Parallelizer
   // Indicates that RunFilterPlugins should accumulate all failed statuses and not return
   // after the first failure.
   runAllFilters bool
}

在上面运行create()时,它会将原有的KubeSchedulerProfile.Plugins数组分别添加到Framework中对应的插件数组中,这样在使用时直接使用Framework即可。KubeSchedulerProfile.SchedulerName = Framework.profileName

接下来我们分析一个打分实现函数RunScorePlugins()

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {     metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
   }()
   // 存储 打分插件数组中,每个打分插件对于所有的节点的打分结果。
   pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
   // 为pluginToNodeScores实例化
   for _, pl := range f.scorePlugins {
      pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
   }
   // Run Score method for each node in parallel.
   f.Parallelizer().Until(ctx, len(nodes), func(index int) {
      //调用打分插件数组
      for _, pl := range f.scorePlugins {
         nodeName := nodes[index].Name
         //运行当前插件的打分函数,返回分数
         s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
         if !status.IsSuccess() {
            err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
            errCh.SendErrorWithCancel(err, cancel)
            return
         }
         pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
            Name:  nodeName,
            Score: s,
         }
      }
   })
   // 根据插件的优先级对分数进行调整。
   f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
      pl := f.scorePlugins[index]
      // 获取当前插件的优先级以及节点列表
      weight := f.pluginNameToWeightMap[pl.Name()]
      nodeScoreList := pluginToNodeScores[pl.Name()]
      for i, nodeScore := range nodeScoreList {
         // 使用优先级与分数相乘
         if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
         nodeScoreList[i].Score = nodeScore.Score * int64(weight)
      }
   })

   return pluginToNodeScores, nil
}

插件讲解

通过上面的学习我们能够了解到,scheduler功能也是由插件实现的,那么接下来让我们对每个插件进行分析。

Score

评分插件主要对每个节点进行评分,node节点范围主要为过滤后的节点

// 主要传入pod信息,要评分的节点名称
Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string)
SelectorSpread

使属于同一个服务(service或者Replication Controllers)的pod尽量均匀的分布在各个节点上或者zone上

什么意思那?就是,一个节点上已经有了pod副本,它的分数就比另一个没有pod副本的分数要低,一个节点上zone的数量也决定分数(数量越多,分数越低)。

让我们先看一下PreScore前置处理

func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
  // 创建选择器
   var selector labels.Selector
  // 它的作用是检索集群中与pod关联的services、replicationControllers、replicaSets、statefulSets与其对应的Selector,并将其放入Set中。目的是在根据这些Selector检索出当前节点中,该pod副本的数量
   selector = helper.DefaultSelector(
      pod,
      pl.services,
      pl.replicationControllers,
      pl.replicaSets,
      pl.statefulSets,
   )
   state := &preScoreState{
      selector: selector,
   }
   cycleState.Write(preScoreStateKey, state)
   return nil
}

准备工作已经完成,那么接下来就是开始计算节点pod副本数了

func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  // 获取我们刚刚准备的选择器集合
   c, err := state.Read(preScoreStateKey)
   s, ok := c.(*preScoreState)
  // 获取当前节点信息
   nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
  // 计算副本数
   count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)
   return int64(count), nil
}

func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {
	if len(nodeInfo.Pods) == 0 || selector.Empty() {
		return 0
	}
	count := 0
   // 遍历当前节点中所有pod
	for _, p := range nodeInfo.Pods {
		// 首先判断是否在一个命名空间里
		if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
          // 使用我们刚才的选择器来进行判断是否有关联
			if selector.Matches(labels.Set(p.Pod.Labels)) {
				count++
			}
		}
	}
	return count
}

这边我们获取到了节点对应的pod副本数的分数,接下来需要进行评分了

func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
   if skipSelectorSpread(pod) {
      return nil
   }

   countsByZone := make(map[string]int64, 10)
   maxCountByZone := int64(0)
   maxCountByNodeName := int64(0)

  // 循环遍历寻找pod副本数最大的值,以及计算节点zone分数
  // zone为节点区域,是1.17以后加入的功能。
   for i := range scores {
      if scores[i].Score > maxCountByNodeName {
         maxCountByNodeName = scores[i].Score
      }
      nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
      if err != nil {
         return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
      }
      zoneID := utilnode.GetZoneKey(nodeInfo.Node())
      if zoneID == "" {
         continue
      }
      countsByZone[zoneID] += scores[i].Score
   }

  // 计算zone最大分数
   for zoneID := range countsByZone {
      if countsByZone[zoneID] > maxCountByZone {
         maxCountByZone = countsByZone[zoneID]
      }
   }

   haveZones := len(countsByZone) != 0
   maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
   maxCountByZoneFloat64 := float64(maxCountByZone)
   MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)

  // 开始计算每个节点的分数
   for i := range scores {
      // 初始化分数为100
      fScore := MaxNodeScoreFloat64
      if maxCountByNodeName > 0 {
        // 开始计算节点副本所产生的分数,比如 节点中pod最大副本数为7,当前节点副本数为3 那么结果为	100*(5-3)/5=40
         fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)
      }
      // 判断是否弃用了zone功能
      if haveZones {
         nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
         if err != nil {
            return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
         }

         zoneID := utilnode.GetZoneKey(nodeInfo.Node())
         if zoneID != "" {
           // 启用了就进行计算,算法与副本计算一致,100*(最大zone数量- 当前节点zone数量)/最大zone数量
            zoneScore := MaxNodeScoreFloat64
            if maxCountByZone > 0 {
               zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
            }
           // 如果启用了zone,则根据zone权重重新计算分数,否则使用副本分数即可。
            fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
         }
      }
      scores[i].Score = int64(fScore)
   }
   return nil
}

到此SelectorSpread分析完毕。

LeastAllocated

它计算节点上调度的 Pod 请求的内存和 CPU 的百分比,以及
根据请求容量的分数的平均值的最小值进行优先级排序。

(cpu((capacity-sum(requested))*MaxNodeScore/capacity) + memory((capacity-sum(requested))*MaxNodeScore/capacity))/weightSum

func (r *resourceAllocationScorer) score(
   pod *v1.Pod,
   nodeInfo *framework.NodeInfo) (int64, *framework.Status) {
  // 获取当前节点信息
   node := nodeInfo.Node()
  // 用来存储请求资源
   requested := make(resourceToValueMap, len(r.resourceToWeightMap))
  // 存储可分配资源
   allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
  // 根据资源名,获取当前节点的请求、可分配资源,比如cpu,memory、ephemeral-storage
   for resource := range r.resourceToWeightMap {
      allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
   }
   var score int64
  
   // 判断当前pod是否有Volumes
   if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
      score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
   } else {
      score = r.scorer(requested, allocatable, false, 0, 0)
   }
   return score, nil
}

r.scorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int)
{
  var nodeScore, weightSum int64
  // 计算节点分数
		for resource, weight := range resToWeightMap {
			resourceScore := ((allocable[resource] - requested[resource]) * int64(100)) / allocable[resource]
			nodeScore += resourceScore * weight
			weightSum += weight
		}
	return nodeScore / weightSum
}
NodeResourcesBalancedAllocation

它计算cpu和内存容量分数之间的差异,
并根据两个指标彼此的接近程度对主机进行优先级排序。


// requested为当前节点资源请求集合  allocable为当前节点可用资源集合
cpuFraction = float64(requested[v1.ResourceCPU]) / float64(allocable[v1.ResourceCPU])
memoryFraction =  float64(requested[v1.ResourceMemory]) / float64(allocable[v1.ResourceMemory])

// 判断是否有特定节点存储请求量
volumeFraction := float64(nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes) / float64(nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)

mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)

variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))

int64((1 - variance) * float64(framework.MaxNodeScore))


// 具体算法请百度《An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization》

ImageLocality

将pod调度到image已经存在的节点上(具体的实现就是打分score)

// Score invoked at the score extension point.
func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
	//获取集群所有节点
	totalNumNodes := len(nodeInfos)
	//sumImageScores第一步根据pod容器镜像获取当前节点的镜像信息
    //计算镜像被引用节点的个数/节点总个数 并返回
	score := calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, totalNumNodes), len(pod.Spec.Containers))
	return score, nil
}
	
func calculatePriority(sumScores int64, numContainers int) int64 {
	maxThreshold := 1000 * mb * int64(numContainers)
	if sumScores < 23 * mb {
		sumScores = 23 * mb
	} else if sumScores > maxThreshold {
		sumScores = maxThreshold
	}

	return int64(100) * (sumScores - 23 * mb) / (maxThreshold - 23 * mb)
}
InterPodAffinity

pod亲和性计算

获取当前节点的所有pod,然后根据选择器进行匹配结果进行评分。

// at是要调度的pod的AffinityTerm属性,而参数pod为当前节点中存在的pod
// 下面使用label选择器来判断是否匹配。
func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set, nsSelectorEnabled bool) bool {
   if at.Namespaces.Has(pod.Namespace) || (nsSelectorEnabled && at.NamespaceSelector.Matches(nsLabels)) {
      return at.Selector.Matches(labels.Set(pod.Labels))
   }
   return false
}

// 这里它还做了一步就是,在pod亲和力匹配完成后,再进行TopologyKey判断,这样能保证部署在同一个区域。
if term.Matches(pod, nsLabels, enableNamespaceSelector) {
		if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
			if m[term.TopologyKey] == nil {
				m[term.TopologyKey] = make(map[string]int64)
			}
			m[term.TopologyKey][tpValue] += int64(weight * multiplier)
		}
	}
NodeAffinity

节点亲和力计算

原理与Pod亲和力计算类似,这里不过多讲解

NodePreferAvoidPods

此优选函数权限默认为10000,其将根据节点是够设置了注解信息"sheduler.alpha.kubernetes.io/preferAvoidPods"来计算其优先级

TaintToleration

污点与容忍

在filter阶段,会判断pod与当前node的effect是否一致并且是NoSchedule或者NoExecute,然后运行以下程序判断容忍条件

func (t *Toleration) ToleratesTaint(taint *Taint) bool {
  // pod与node的effect判断
	if len(t.Effect) > 0 && t.Effect != taint.Effect {
		return false
	}
	// 容忍条件判断
	switch t.Operator {
	// empty operator means Equal
	case "", TolerationOpEqual:
		return t.Value == taint.Value
	case TolerationOpExists:
		return true
	default:
		return false
	}
}
 类似资料: