在kubernetes中,组件的配置文件格式都推荐使用yaml格式,scheduler也不例外。下面是scheduler启动时所需要的全局配置类。
// scheduler的全局配置
type Options struct {
// The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
CombinedInsecureServing *CombinedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
Metrics *metrics.Options
Logs *logs.Options
Deprecated *DeprecatedOptions
// 配置文件地址
ConfigFile string
// WriteConfigTo is the path where the default configuration will be written.
WriteConfigTo string
Master string
}
//启动配置参数
type KubeSchedulerConfiguration struct {
metav1.TypeMeta
// Pod 的算法的并发 默认16
Parallelism int32
// 领导选举客户端的配置。
LeaderElection componentbaseconfig.LeaderElectionConfiguration
// kubeconfig文件和客户端连接
ClientConnection componentbaseconfig.ClientConnectionConfiguration
// 健康检查服务器的 IP 地址和端口
// defaulting to 0.0.0.0:10251
HealthzBindAddress string
// 状态的IP地址和端口
// serve on, defaulting to 0.0.0.0:10251.
MetricsBindAddress string
// 保存 Debugging 相关功能的配置保存 Debugging 相关功能的配置
componentbaseconfig.DebuggingConfiguration
// 值会转化成百分比,可行的所有节点的百分比,比如500个节点,设置30就是150个节点,然后在从这150个节点中使用调度算法判断。
PercentageOfNodesToScore int32
// 这两个是调度队列的初始/最大退避时间
PodInitialBackoffSeconds int64
PodMaxBackoffSeconds int64
// kube-scheduler 支持的调度配置文件 这个也是重点
Profiles []KubeSchedulerProfile
//调度器扩展器的列表
Extenders []Extender
}
KubeSchedulerConfiguration是整个scheduler配置信息,在scheduler中可以存在多个调度器,那么每个调度器的信息都存在于Profiles属性中
type KubeSchedulerProfile struct {
// 调度器的名称
SchedulerName *string `json:"schedulerName,omitempty"`
// 该调度器中所有插件
Plugins *Plugins `json:"plugins,omitempty"`
// 插件配置
PluginConfig []PluginConfig `json:"pluginConfig,omitempty"`
}
源码会删除大量代码,只会保留关键语句
如果想要深入了解请访问https://gitee.com/meng_mengs_boys/kubernetes-1.21.5#you-have-a-working-docker-environment,这是本人写的注释
请牢记上面的配置信息,这样便于在阅读代码时知其所以然。
在启动时,第一步必然是对配置信息进行解析,校验,填充等工作。
// 创建cmd,使用go的基本库
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
// 创建Scheduler所需要的配置信息并初始化配置
opts, err := options.NewOptions()
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
// 运行Scheduler
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
}
func NewOptions() (*Options, error) {
// 一定要进入该方法分析一遍,该方法初始化默认调度器信息
cfg, err := newDefaultComponentConfig()
// 解析健康检查的IP与端口号
hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
// 创建配置类
o := &Options{
ComponentConfig: *cfg,
// 安全认证信息类
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
// 为 healthz 和 metrics 设置了两个不安全的监听器
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindNetwork: "tcp",
}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindNetwork: "tcp",
}).WithLoopback(),
BindPort: hport,
BindAddress: hhost,
},
// 权限配置
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
Metrics: metrics.NewOptions(),
// 日志信息
Logs: logs.NewOptions(),
}
return o, nil
}
我们重点关注newDefaultComponentConfig(),该方法会创建KubeSchedulerConfiguration类以及对其进行初始化操作,该操作会创建
v1beta1.KubeSchedulerProfile{
SchedulerName: "default-scheduler"
}
只是指定了调度器的名称,但是通过该名称就可以检索出需要的调度器,在pod创建时如果没有指定调度器,则会使用它。所以default-scheduler相当于整个调度器的核心,这也是本文为什么强调一定要分析的原因。
现在要分析runCommand()里的方法,在RunCommand方法里总共有两个函数Setup()、Run()。
接下来让我们一一分析
Setup()的主要功能是根据KubeSchedulerConfiguration创建调度类,对Profiles中的每个调度类进行初始化,创建插件,绑定插件配置。
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
//配置校验
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 创建kube客户端、创建EventBroadcaster、创建InformerFactory监听所有POD信息
c, err := opts.Config()
// 认证创建
cc := c.Complete()
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// 创建调度器,这里我们可以看到调用了很多方法,不要紧张,他们都是配置信息,其意思就是上面配置类中属性的意思,我们可以直接忽略
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
recorderFactory,
ctx.Done(),
...),
)
}
那么接下来让我们进入Scheduler创建
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
// 对调度器配置进行初始化操作
options := defaultSchedulerOptions
// 通过传入的配置信息更新调度器配置
for _, opt := range opts {
opt(&options)
}
//创建调度器缓存
schedulerCache := internalcache.New(30*time.Second, stopEverything)
//创建插件注册表,就是一个map key为插件的名称,value为插件创建方法
registry := frameworkplugins.NewInTreeRegistry()
//对于自定义插件进行合并
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
//创建快照 用来保存集群中节点快照信息
snapshot := internalcache.NewEmptySnapshot()
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
}
metrics.Register()
var sched *Scheduler
//默认这里为 SchedulerAlgorithmSource{
// Provider: “DefaultProvider”,
// },
source := options.schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 所以会进入这里
// 这个Provider有什么用那?我们进入该方法去看看
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc
return sched, nil
}
// 返回调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).InfoS("Creating scheduler from algorithm provider", "algorithmProvider", providerName)
// 创建插件注册表
r := algorithmprovider.NewRegistry()
// 根据providerName获取当前使用的插件集合,比如&schedulerapi.Plugins{
// QueueSort: schedulerapi.PluginSet{
// Enabled: []schedulerapi.Plugin{
// {Name: queuesort.Name},
// },
// },
defaultPlugins, exist := r[providerName]
// 将插件集合存入KubeSchedulerProfile.Plugins中
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
// 创建调度器
return c.create()
}
上面的插件可能有些迷,这里需要讲解一下,首先在前面New()方法里,有一步frameworkplugins.NewInTreeRegistry(),该方法是获取所有的插件信息(比如排序所需要的插件、过滤节点时所需要的插件)。
algorithmprovider.NewRegistry()这里是获取要使用的插件集合,所以frameworkplugins.NewInTreeRegistry()包含algorithmprovider.NewRegistry()
// 创建调度器
func (c *Configurator) create() (*Scheduler, error) {
// 返回一个map,key为SchedulerName,string,这里为DefaultScheduler,value为framework.Framework类型
// 该类型就是插件式架构接口
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
...
)
}
// 获取插件集合中,关于pod排序的插件方法
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
// pod队列,用于缓存pod、对pod进行排序。Scheduler会从该队列中获取pod信息然后分配节点
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
c.informerFactory,
...
)
//记录了在过滤过程中node遍历的index,存储了node快照信息。
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.percentageOfNodesToScore,
)
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
SchedulingQueue: podQueue,
}, nil
}
我们主要关注profiles,该元素类型为map,key为SchedulerName,string,这里为DefaultScheduler,value为framework.Framework类型。
Framework?是什么那,负责初始化和运行调度器的组件,啥意思?就是接受到pod后对节点的过滤、打分、部署等一些列操作都由它来完成。
既然它这么厉害,就让我们看看它的真面目
type Framework interface {
Handle
//pod队列排序函数
QueueSortFunc() LessFunc
// 运行过滤节点前的函数
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
// 运行过滤节点函数
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus
// 运行过滤节点后的函数
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
// 运行打分前的函数
RunPreScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
// 运行打分的函数
RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status)
// 运行bind前 就是为pod绑定节点的函数
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// 运行bind后 就是为pod绑定节点的函数
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
// 运行bind
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// 所有插件
ListPlugins() map[string][]config.Plugin
// 调度器的名称 比如DefaultScheduler
ProfileName() string
}
通过上面的函数意思,我们可以猜到,pod调度周期为,QueueSortFunc->RunPreFilterPlugins->RunFilterPlugins->RunPostFilterPlugins->RunPreScorePlugins->RunScorePlugins->RunPreBindPlugins->RunBindPlugins->RunPostBindPlugins
让我们查看一下它的实现类
type frameworkImpl struct {
// 插件注册表
registry Registry
// 快照监听
snapshotSharedLister framework.SharedLister
// pod缓存同步等待
waitingPods *waitingPodsMap
// score需要的插件优先级
pluginNameToWeightMap map[string]int
// pod队列排序插件
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
profileName string
extenders []framework.Extender
framework.PodNominator
parallelizer parallelize.Parallelizer
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
}
在上面运行create()时,它会将原有的KubeSchedulerProfile.Plugins数组分别添加到Framework中对应的插件数组中,这样在使用时直接使用Framework即可。KubeSchedulerProfile.SchedulerName = Framework.profileName
接下来我们分析一个打分实现函数RunScorePlugins()
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) { metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
// 存储 打分插件数组中,每个打分插件对于所有的节点的打分结果。
pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
// 为pluginToNodeScores实例化
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
}
// Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
//调用打分插件数组
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
//运行当前插件的打分函数,返回分数
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
Name: nodeName,
Score: s,
}
}
})
// 根据插件的优先级对分数进行调整。
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// 获取当前插件的优先级以及节点列表
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
for i, nodeScore := range nodeScoreList {
// 使用优先级与分数相乘
if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
nodeScoreList[i].Score = nodeScore.Score * int64(weight)
}
})
return pluginToNodeScores, nil
}
通过上面的学习我们能够了解到,scheduler功能也是由插件实现的,那么接下来让我们对每个插件进行分析。
评分插件主要对每个节点进行评分,node节点范围主要为过滤后的节点
// 主要传入pod信息,要评分的节点名称
Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string)
使属于同一个服务(service或者Replication Controllers)的pod尽量均匀的分布在各个节点上或者zone上
什么意思那?就是,一个节点上已经有了pod副本,它的分数就比另一个没有pod副本的分数要低,一个节点上zone的数量也决定分数(数量越多,分数越低)。
让我们先看一下PreScore前置处理
func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
// 创建选择器
var selector labels.Selector
// 它的作用是检索集群中与pod关联的services、replicationControllers、replicaSets、statefulSets与其对应的Selector,并将其放入Set中。目的是在根据这些Selector检索出当前节点中,该pod副本的数量
selector = helper.DefaultSelector(
pod,
pl.services,
pl.replicationControllers,
pl.replicaSets,
pl.statefulSets,
)
state := &preScoreState{
selector: selector,
}
cycleState.Write(preScoreStateKey, state)
return nil
}
准备工作已经完成,那么接下来就是开始计算节点pod副本数了
func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// 获取我们刚刚准备的选择器集合
c, err := state.Read(preScoreStateKey)
s, ok := c.(*preScoreState)
// 获取当前节点信息
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
// 计算副本数
count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)
return int64(count), nil
}
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {
if len(nodeInfo.Pods) == 0 || selector.Empty() {
return 0
}
count := 0
// 遍历当前节点中所有pod
for _, p := range nodeInfo.Pods {
// 首先判断是否在一个命名空间里
if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
// 使用我们刚才的选择器来进行判断是否有关联
if selector.Matches(labels.Set(p.Pod.Labels)) {
count++
}
}
}
return count
}
这边我们获取到了节点对应的pod副本数的分数,接下来需要进行评分了
func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
if skipSelectorSpread(pod) {
return nil
}
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
// 循环遍历寻找pod副本数最大的值,以及计算节点zone分数
// zone为节点区域,是1.17以后加入的功能。
for i := range scores {
if scores[i].Score > maxCountByNodeName {
maxCountByNodeName = scores[i].Score
}
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += scores[i].Score
}
// 计算zone最大分数
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)
// 开始计算每个节点的分数
for i := range scores {
// 初始化分数为100
fScore := MaxNodeScoreFloat64
if maxCountByNodeName > 0 {
// 开始计算节点副本所产生的分数,比如 节点中pod最大副本数为7,当前节点副本数为3 那么结果为 100*(5-3)/5=40
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)
}
// 判断是否弃用了zone功能
if haveZones {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
// 启用了就进行计算,算法与副本计算一致,100*(最大zone数量- 当前节点zone数量)/最大zone数量
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
// 如果启用了zone,则根据zone权重重新计算分数,否则使用副本分数即可。
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
scores[i].Score = int64(fScore)
}
return nil
}
到此SelectorSpread分析完毕。
它计算节点上调度的 Pod 请求的内存和 CPU 的百分比,以及
根据请求容量的分数的平均值的最小值进行优先级排序。(cpu((capacity-sum(requested))*MaxNodeScore/capacity) + memory((capacity-sum(requested))*MaxNodeScore/capacity))/weightSum
func (r *resourceAllocationScorer) score(
pod *v1.Pod,
nodeInfo *framework.NodeInfo) (int64, *framework.Status) {
// 获取当前节点信息
node := nodeInfo.Node()
// 用来存储请求资源
requested := make(resourceToValueMap, len(r.resourceToWeightMap))
// 存储可分配资源
allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
// 根据资源名,获取当前节点的请求、可分配资源,比如cpu,memory、ephemeral-storage
for resource := range r.resourceToWeightMap {
allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
}
var score int64
// 判断当前pod是否有Volumes
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(requested, allocatable, false, 0, 0)
}
return score, nil
}
r.scorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int)
{
var nodeScore, weightSum int64
// 计算节点分数
for resource, weight := range resToWeightMap {
resourceScore := ((allocable[resource] - requested[resource]) * int64(100)) / allocable[resource]
nodeScore += resourceScore * weight
weightSum += weight
}
return nodeScore / weightSum
}
它计算cpu和内存容量分数之间的差异,
并根据两个指标彼此的接近程度对主机进行优先级排序。
// requested为当前节点资源请求集合 allocable为当前节点可用资源集合
cpuFraction = float64(requested[v1.ResourceCPU]) / float64(allocable[v1.ResourceCPU])
memoryFraction = float64(requested[v1.ResourceMemory]) / float64(allocable[v1.ResourceMemory])
// 判断是否有特定节点存储请求量
volumeFraction := float64(nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes) / float64(nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
int64((1 - variance) * float64(framework.MaxNodeScore))
// 具体算法请百度《An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization》
将pod调度到image已经存在的节点上(具体的实现就是打分score)
// Score invoked at the score extension point.
func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
//获取集群所有节点
totalNumNodes := len(nodeInfos)
//sumImageScores第一步根据pod容器镜像获取当前节点的镜像信息
//计算镜像被引用节点的个数/节点总个数 并返回
score := calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, totalNumNodes), len(pod.Spec.Containers))
return score, nil
}
func calculatePriority(sumScores int64, numContainers int) int64 {
maxThreshold := 1000 * mb * int64(numContainers)
if sumScores < 23 * mb {
sumScores = 23 * mb
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}
return int64(100) * (sumScores - 23 * mb) / (maxThreshold - 23 * mb)
}
pod亲和性计算
获取当前节点的所有pod,然后根据选择器进行匹配结果进行评分。
// at是要调度的pod的AffinityTerm属性,而参数pod为当前节点中存在的pod
// 下面使用label选择器来判断是否匹配。
func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set, nsSelectorEnabled bool) bool {
if at.Namespaces.Has(pod.Namespace) || (nsSelectorEnabled && at.NamespaceSelector.Matches(nsLabels)) {
return at.Selector.Matches(labels.Set(pod.Labels))
}
return false
}
// 这里它还做了一步就是,在pod亲和力匹配完成后,再进行TopologyKey判断,这样能保证部署在同一个区域。
if term.Matches(pod, nsLabels, enableNamespaceSelector) {
if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64)
}
m[term.TopologyKey][tpValue] += int64(weight * multiplier)
}
}
节点亲和力计算
原理与Pod亲和力计算类似,这里不过多讲解
此优选函数权限默认为10000,其将根据节点是够设置了注解信息"sheduler.alpha.kubernetes.io/preferAvoidPods"来计算其优先级
污点与容忍
在filter阶段,会判断pod与当前node的effect是否一致并且是NoSchedule或者NoExecute,然后运行以下程序判断容忍条件
func (t *Toleration) ToleratesTaint(taint *Taint) bool {
// pod与node的effect判断
if len(t.Effect) > 0 && t.Effect != taint.Effect {
return false
}
// 容忍条件判断
switch t.Operator {
// empty operator means Equal
case "", TolerationOpEqual:
return t.Value == taint.Value
case TolerationOpExists:
return true
default:
return false
}
}