kube-scheduler cache



type Cache interface {
	NodeCount() int

	PodCount() (int, error)

    //此时还未开始bind node
	AssumePod(pod *v1.Pod) error

    //当bind node后,用来通知cache,假定的pod可以开始计算超时了
	FinishBinding(pod *v1.Pod) error

	ForgetPod(pod *v1.Pod) error

    //pod调度成功,并且bind node也成功后,会通过informer监听到pod添加事件,调用AddPod将pod加到cache,
	AddPod(pod *v1.Pod) error

	UpdatePod(oldPod, newPod *v1.Pod) error

	RemovePod(pod *v1.Pod) error

	GetPod(pod *v1.Pod) (*v1.Pod, error)

	IsAssumedPod(pod *v1.Pod) (bool, error)

	AddNode(node *v1.Node) *framework.NodeInfo

	UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo

	RemoveNode(node *v1.Node) error

	UpdateSnapshot(nodeSnapshot *Snapshot) error

	Dump() *Dump


type schedulerCache struct {
	stop   <-chan struct{}
	ttl    time.Duration
	period time.Duration

	mu sync.RWMutex
	assumedPods sets.String
	//保存pod key到podState的映射
	podStates map[string]*podState
	nodes     map[string]*nodeInfoListItem
	headNode *nodeInfoListItem
	nodeTree *nodeTree
	imageStates map[string]*imageState


type podState struct {
	pod *v1.Pod
	deadline *time.Time
	bindingFinished bool

nodeInfoListItem不仅要保存到nodes map中,还要以双向链表的结构加入链表,更新频繁的node在链表头部,做快照时会遍历此链表

type nodeInfoListItem struct {
	info *framework.NodeInfo
	next *nodeInfoListItem
	prev *nodeInfoListItem

// NodeInfo is node level aggregated information.
type NodeInfo struct {
	node *v1.Node

	Pods []*PodInfo

	PodsWithAffinity []*PodInfo

	PodsWithRequiredAntiAffinity []*PodInfo

	// Ports allocated on the node.
	UsedPorts HostPortInfo

	Requested *Resource
	NonZeroRequested *Resource
	Allocatable *Resource

	ImageStates map[string]*ImageStateSummary

	PVCRefCounts map[string]int

	Generation int64

nodeTree按区域保存node name

type nodeTree struct {
	//map的key为zone,value为node name列表
	tree     map[string][]string
	zones    []string
	numNodes int


func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
	//获取pod key,即pod UID
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err

	defer cache.mu.Unlock()
	if _, ok := cache.podStates[key]; ok {
		return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)

	ps := &podState{
		pod: pod,
	cache.podStates[key] = ps
	//插入pod key
	return nil

// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
	n, ok := cache.nodes[pod.Spec.NodeName]
	if !ok {
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[pod.Spec.NodeName] = n
	//将pod保存到node的pod列表,并将pod请求的资源加到node Requested中

当bind node后,用来通知cache,假定的pod可以开始计算超时了

func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
	return cache.finishBinding(pod, time.Now())

// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err

	defer cache.mu.RUnlock()

	klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
	currState, ok := cache.podStates[key]
	if ok && cache.assumedPods.Has(key) {
		dl := now.Add(cache.ttl)
		currState.bindingFinished = true
		currState.deadline = &dl
	return nil


func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err

	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
		return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)

	switch {
	// Only assumed pod can be forgotten.
	case ok && cache.assumedPods.Has(key):
		err := cache.removePod(pod)
		if err != nil {
			return err
		delete(cache.assumedPods, key)
		delete(cache.podStates, key)
		return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
	return nil

func (cache *schedulerCache) removePod(pod *v1.Pod) error {
	n, ok := cache.nodes[pod.Spec.NodeName]
	if !ok {
		klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
		return nil
	if err := n.info.RemovePod(pod); err != nil {
		return err
	if len(n.info.Pods) == 0 && n.info.Node() == nil {
	} else {
	return nil

pod调度成功,并且bind node也成功后,会通过informer监听到pod添加事件,调用AddPod将pod加到cache,

func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err

	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	switch {
	case ok && cache.assumedPods.Has(key):
		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
			// The pod was added to a different node than it was assumed to.
			klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
			// Clean this up.
			if err = cache.removePod(currState.pod); err != nil {
				klog.Errorf("removing pod error: %v", err)
		//pod已经被确认,则删除pod key
		delete(cache.assumedPods, key)
		cache.podStates[key].deadline = nil
		cache.podStates[key].pod = pod
	case !ok:
		ps := &podState{
			pod: pod,
		cache.podStates[key] = ps
		return fmt.Errorf("pod %v was already in added state", key)
	return nil


func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err

	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	switch {
	// An assumed pod won't have Delete/Remove event. It needs to have Add event
	// before Remove event, in which case the state would change from Assumed to Added.
	case ok && !cache.assumedPods.Has(key):
		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
			klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
			klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
		err := cache.removePod(currState.pod)
		if err != nil {
			return err
		delete(cache.podStates, key)
		return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
	return nil


func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[node.Name] = n
	} else {

	cache.addNodeImageStates(node, n.info)
	return n.info.Clone()


func (g *genericScheduler) snapshot() error {
	// Used for all fit and priority funcs.
	return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)


type Snapshot struct {
	//node name到node info的映射
	nodeInfoMap map[string]*framework.NodeInfo
	//node info列表
	nodeInfoList []*framework.NodeInfo
	//node info列表,每个node上至少有一个pod声明了亲和性
	havePodsWithAffinityNodeInfoList []*framework.NodeInfo
	//node info列表,每个node上至少有一个pod声明了反亲和性
	havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
	generation                                   int64
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
	defer cache.mu.Unlock()

	snapshotGeneration := nodeSnapshot.generation

	// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
	// or removed from the cache.
	updateAllLists := false
	// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
	// status from having pods with affinity to NOT having pods with affinity or the other
	// way around.
	updateNodesHavePodsWithAffinity := false
	// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
	// status from having pods with required anti-affinity to NOT having pods with required
	// anti-affinity or the other way around.
	updateNodesHavePodsWithRequiredAntiAffinity := false

	for node := cache.headNode; node != nil; node = node.next {
		if node.info.Generation <= snapshotGeneration {
			// all the nodes are updated before the existing snapshot. We are done.
		if np := node.info.Node(); np != nil {
			existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
			if !ok {
				updateAllLists = true
				existing = &framework.NodeInfo{}
				nodeSnapshot.nodeInfoMap[np.Name] = existing
			clone := node.info.Clone()
			if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
				updateNodesHavePodsWithAffinity = true
			if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
				updateNodesHavePodsWithRequiredAntiAffinity = true
			*existing = *clone
	if cache.headNode != nil {
		nodeSnapshot.generation = cache.headNode.info.Generation

	if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
		updateAllLists = true

	if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
		cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)

	if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
		errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
			", length of NodeInfoMap=%v, length of nodes in cache=%v"+
			", trying to recover",
			len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
			len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
		// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
		// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
		cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
		return fmt.Errorf(errMsg)

	return nil

func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
	snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	if updateAll {
		// Take a snapshot of the nodes order in the tree
		snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
		nodesList, err := cache.nodeTree.list()
		if err != nil {
		for _, nodeName := range nodesList {
			if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
				snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
				if len(nodeInfo.PodsWithAffinity) > 0 {
					snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
				if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
					snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
			} else {
				klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
	} else {
		for _, nodeInfo := range snapshot.nodeInfoList {
			if len(nodeInfo.PodsWithAffinity) > 0 {
				snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
			if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
				snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)


func (cache *schedulerCache) run() {
	//period为cleanAssumedPeriod = 1 * time.Second
	go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)

func (cache *schedulerCache) cleanupExpiredAssumedPods() {

// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
	defer cache.mu.Unlock()
	defer cache.updateMetrics()

	// The size of assumedPods should be small
	for key := range cache.assumedPods {
		ps, ok := cache.podStates[key]
		if !ok {
			klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
		if !ps.bindingFinished {
			klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
				ps.pod.Namespace, ps.pod.Name)
		if now.After(*ps.deadline) {
			klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
			if err := cache.expirePod(key, ps); err != nil {
				klog.Errorf("ExpirePod failed for %s: %v", key, err)