调度缓存用来保存node列表,及每个node上的所有pod信息,包括已经在node上运行的pod,假定pod和调度失败的pod信息,其必须实现下面的接口
type Cache interface {
//测试用
NodeCount() int
//测试用
PodCount() (int, error)
//pod调度成功后,调用AssumePod将此pod占用资源增加到它所在node上
//此时还未开始bind node
AssumePod(pod *v1.Pod) error
//当bind node后,用来通知cache,假定的pod可以开始计算超时了
FinishBinding(pod *v1.Pod) error
//删除假定pod,并且它所在node也要减去此pod所占资源
//调用AssumePod后的流程有任何错误,都需要调用ForgetPod进行删除
ForgetPod(pod *v1.Pod) error
//pod调度成功,并且bind node也成功后,会通过informer监听到pod添加事件,调用AddPod将pod加到cache,
//如果假定pod已经expire了,则再次添加回来,保证pod所占资源被node统计
AddPod(pod *v1.Pod) error
UpdatePod(oldPod, newPod *v1.Pod) error
//通过informer监听到pod删除事件后,将pod从cache删除,所占资源也要从node删除
RemovePod(pod *v1.Pod) error
GetPod(pod *v1.Pod) (*v1.Pod, error)
//判断pod是否还是假定状态,并且未超时
IsAssumedPod(pod *v1.Pod) (bool, error)
//通过informer监听到node添加事件后,将node信息保存到cache
AddNode(node *v1.Node) *framework.NodeInfo
//通过informer监听到node更新事件后,更新cache中的信息
UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
//通过informer监听到node删除事件后,将node信息从cache中删除
RemoveNode(node *v1.Node) error
//将cache中当前的信息做快照,其中node信息包括调度成功pod和假定pod的聚合信息,
//并且在调用此函数时node仍未被删除。
UpdateSnapshot(nodeSnapshot *Snapshot) error
Dump() *Dump
}
schedulerCache为Cache接口的具体实现
type schedulerCache struct {
stop <-chan struct{}
//假定pod超时时间,默认30s
ttl time.Duration
//cache模块会启动一个协程执行周期性任务,period指定周期为多久,默认为cleanAssumedPeriod,1s
period time.Duration
//cache里的信息可能会被多个协程访问,这里使用锁保证互斥
mu sync.RWMutex
//保存假定pod的key,key可以用来得到podStates的一个entry
assumedPods sets.String
//保存pod key到podState的映射
podStates map[string]*podState
//保存node名字到nodeInfoListItem的映射
nodes map[string]*nodeInfoListItem
//nodeInfoListItem用来保存一个node信息,其内部有next和prev指针,用来实现双向链表,这里的headNode为双向链表的头,
//而且headNode指向的是最近更新的node
headNode *nodeInfoListItem
//按区域保存node名字
nodeTree *nodeTree
//image信息,暂时忽略
imageStates map[string]*imageState
}
podState保存pod在cache中的信息
type podState struct {
//pod基本信息
pod *v1.Pod
//假定pod的超时时间
deadline *time.Time
//标志bind是否结束,只有结束后假定pod才能开始计时
bindingFinished bool
}
nodeInfoListItem不仅要保存到nodes map中,还要以双向链表的结构加入链表,更新频繁的node在链表头部,做快照时会遍历此链表
type nodeInfoListItem struct {
//node信息,包括其上的所有pod信息
info *framework.NodeInfo
//用来实现双向链表
next *nodeInfoListItem
prev *nodeInfoListItem
}
// NodeInfo is node level aggregated information.
type NodeInfo struct {
//node基本信息
node *v1.Node
//运行在此node上的所有pod
Pods []*PodInfo
//运行在此node上的所有声明亲和性的pod
PodsWithAffinity []*PodInfo
//运行在此node上的所有声明反亲和性的pod
PodsWithRequiredAntiAffinity []*PodInfo
// Ports allocated on the node.
UsedPorts HostPortInfo
//运行在此node上所有pod的资源总和,也包括假定pod的资源。这里的资源指的是yaml指定的cpu/memory
Requested *Resource
//运行在此node上所有pod的资源总和,这里的资源包括yaml指定的资源和没指定的资源(有些container没指定资源,会自动生成一个默认值)
//这样做的目的是防止很多0-request的pod被调度到同一个node上
NonZeroRequested *Resource
//node可分配的资源,kubelet上报的node可用资源
Allocatable *Resource
//暂时忽略
ImageStates map[string]*ImageStateSummary
//暂时忽略
PVCRefCounts map[string]int
//node序列号,如果node有任何改变,此值加1。做快照时,只添加有更新的node
Generation int64
}
nodeTree按区域保存node name
type nodeTree struct {
//map的key为zone,value为node name列表
tree map[string][]string
//区域列表
zones []string
numNodes int
}
AssumePod
pod调度成功后,调用AssumePod将此pod占用资源增加到它所在node上
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
//获取pod key,即pod UID
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
//如果已经在cache中,则返回
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
//将pod加入node
cache.addPod(pod)
ps := &podState{
pod: pod,
}
//保存pod
cache.podStates[key] = ps
//插入pod key
cache.assumedPods.Insert(key)
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
//如果还不存在,则创建新节点
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
//将pod保存到node的pod列表,并将pod请求的资源加到node Requested中
n.info.AddPod(pod)
//将node移动到链表头部
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
FinishBinding
当bind node后,用来通知cache,假定的pod可以开始计算超时了
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods.Has(key) {
dl := now.Add(cache.ttl)
//设置标志,协程cleanupExpiredAssumedPods可以启动对假定pod的超时处理
currState.bindingFinished = true
//设置超时时间
currState.deadline = &dl
}
return nil
}
ForgetPod
删除假定pod,并且它所在node也要减去此pod所占资源,调用AssumePod后的流程有任何错误,都需要调用ForgetPod进行删除
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods.Has(key):
err := cache.removePod(pod)
if err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
return nil
}
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
return nil
}
//从node中删除此pod信息及所占资源
if err := n.info.RemovePod(pod); err != nil {
return err
}
//如果node上没pod了或者node不存在了,则将node删除
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
//node信息有更新,则将node移动链表头部
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil
}
AddPod
pod调度成功,并且bind node也成功后,会通过informer监听到pod添加事件,调用AddPod将pod加到cache,
如果假定pod已经expire了,则再次添加回来,保证pod所占资源被node统计
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
//pod还在假定状态,未超时,则将其删除
case ok && cache.assumedPods.Has(key):
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
if err = cache.removePod(currState.pod); err != nil {
klog.Errorf("removing pod error: %v", err)
}
cache.addPod(pod)
}
//pod已经被确认,则删除pod key
delete(cache.assumedPods, key)
//停止超时
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
//pod已经超时,则将其添加回来。因为超时的时候已经将pod信息从node中删除了
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
RemovePod
通过informer监听到pod删除事件后,将pod从cache删除,所占资源也要从node删除
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Delete/Remove event. It needs to have Add event
// before Remove event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods.Has(key):
//pod的node和之前保存的不一样,这估计是bug了,直接crash
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
//删除pod信息
err := cache.removePod(currState.pod)
if err != nil {
return err
}
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
}
return nil
}
AddNode
通过informer监听到node添加事件后,将node信息保存到cache
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
//node有更新,移动到链表头部
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.addNode(node)
cache.addNodeImageStates(node, n.info)
n.info.SetNode(node)
return n.info.Clone()
}
UpdateSnapshot
每次调度开始前都会调用UpdateSnapshot将cache中的信息做快照保存到g.nodeInfoSnapshot中,后续调度过程中用到的node/pod信息从快照获取即可,不用再访问cache,而且访问cache需要加锁,导致性能降低
//pkg/scheduler/generic_scheduler.go
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
}
快照使用如下结构表示
type Snapshot struct {
//node name到node info的映射
nodeInfoMap map[string]*framework.NodeInfo
//node info列表
nodeInfoList []*framework.NodeInfo
//node info列表,每个node上至少有一个pod声明了亲和性
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
//node info列表,每个node上至少有一个pod声明了反亲和性
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
//nodeinfo中也有一个变量generation,每次node信息变化时会将一个全局变量generation加1,并赋给nodeinfo的generation,
//这里的generation会被赋值为最近更新过的nodoinfo的generation,所以只要这里的generation大于等于nodoinfo的generation,
//说明node没有更新,也不用更新快照
generation int64
}
//pkg/scheduler/internal/cache/cache.go
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
//获取generation
snapshotGeneration := nodeSnapshot.generation
// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
// or removed from the cache.
updateAllLists := false
// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false
//遍历cache的双向链表,链表中nodeinfo按照generation的值递减排列
for node := cache.headNode; node != nil; node = node.next {
//只更新node.info.Generation大于snapshotGeneration的node,增量更新
if node.info.Generation <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if np := node.info.Node(); np != nil {
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
//快照中还没有此node信息
if !ok {
//设置全量更新标志
updateAllLists = true
existing = &framework.NodeInfo{}
nodeSnapshot.nodeInfoMap[np.Name] = existing
}
//复制node信息
clone := node.info.Clone()
//快照中保存的node上声明亲和性的pod个数和此node最新的声明亲和性的pod个数不相等
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
//快照中保存的node上声明反亲和性的pod个数和此node最新的声明反亲和性的pod个数不相等
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
//将nodeinfo最新的状态更新到快照中
*existing = *clone
}
}
//将最新的nodeinfo的generation更新到快照中
if cache.headNode != nil {
nodeSnapshot.generation = cache.headNode.info.Generation
}
//快照中node个数大于cache中node个数,说明有node被删除了,也需要全量更新
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true
}
//有需要更新的话调用updateNodeInfoSnapshotList进行更新
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
", length of NodeInfoMap=%v, length of nodes in cache=%v"+
", trying to recover",
len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
klog.Error(errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
return fmt.Errorf(errMsg)
}
return nil
}
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
//全量更新
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
//获取nodelist,保存的是nodename
nodesList, err := cache.nodeTree.list()
if err != nil {
klog.Error(err)
}
for _, nodeName := range nodesList {
if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
} else {
//只更新有pod亲和性/反亲和性的node
for _, nodeInfo := range snapshot.nodeInfoList {
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
}
}
}
run
调度缓存会启动一个协程,周期清理过期的假定pod
func (cache *schedulerCache) run() {
//period为cleanAssumedPeriod = 1 * time.Second
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer cache.updateMetrics()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
}
//pod还未完成bind,不能超时
if !ps.bindingFinished {
klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
//假定状态的pod超时了,将其从cache中删除
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}