Scheduler结构
Scheduler 是整个 kube-scheduler
的一个 structure,提供了 kube-scheduler
运行所需的组件。
type Scheduler struct { // Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加 Cache internalcache.Cache // Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略 Extenders []framework.Extender // NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du NextPod func() *framework.QueuedPodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.QueuedPodInfo, error) // SchedulePod 尝试将给出的pod调度到Node。 SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) // 关闭scheduler的信号 StopEverything <-chan struct{} // SchedulingQueue保存要调度的Pod SchedulingQueue internalqueue.SchedulingQueue // Profiles中是多个调度框架 Profiles profile.Map client clientset.Interface nodeInfoSnapshot *internalcache.Snapshot percentageOfNodesToScore int32 nextStartNodeIndex int }
作为实际执行的两个核心,SchedulingQueue
,与 scheduleOne
将会分析到这两个
SchedulingQueue
在知道 kube-scheduler
初始化过程后,需要对 kube-scheduler
的整个 structure 和 workflow 进行分析
在 Run 中,运行的是 一个 SchedulingQueue
与 一个 scheduleOne
,从结构上看是属于 Scheduler
func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() // We need to start scheduleOne loop in a dedicated goroutine, // because scheduleOne function hangs on getting the next item // from the SchedulingQueue. // If there are no new pods to schedule, it will be hanging there // and if done in this goroutine it will be blocking closing // SchedulingQueue, in effect causing a deadlock on shutdown. go wait.UntilWithContext(ctx, sched.scheduleOne, 0) <-ctx.Done() sched.SchedulingQueue.Close() }
SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。
type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // The passed-in pods are originally compiled from plugins that want to activate Pods, // by injecting the pods through a reserved CycleState struct (PodsToActivate). Activate(pods map[string]*v1.Pod) // 将不可调度的Pod重入到队列中 AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop会弹出一个pod,并从head优先级队列中删除 Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // Run starts the goroutines managing the queue. Run() }
而 PriorityQueue 是 SchedulingQueue
的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQ
、backoffQ
和 unschedulablePods
activeQ
:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度backoffQ
:也是一个 heap 类型的优先级队列,存放的是不可调度的PodunschedulablePods
:保存确定不可被调度的Pod
type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // The passed-in pods are originally compiled from plugins that want to activate Pods, // by injecting the pods through a reserved CycleState struct (PodsToActivate). Activate(pods map[string]*v1.Pod) // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // Run starts the goroutines managing the queue. Run() }
在New scheduler 时可以看到会初始化这个queue
podQueue := internalqueue.NewSchedulingQueue( // 实现pod对比的一个函数即less profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), )
而 NewSchedulingQueue 则是初始化这个 PriorityQueue
// NewSchedulingQueue initializes a priority queue as a new scheduling queue. func NewSchedulingQueue( lessFn framework.LessFunc, informerFactory informers.SharedInformerFactory, opts ...Option) SchedulingQueue { return NewPriorityQueue(lessFn, informerFactory, opts...) } // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( lessFn framework.LessFunc, informerFactory informers.SharedInformerFactory, opts ...Option, ) *PriorityQueue { options := defaultPriorityQueueOptions for _, opt := range opts { opt(&options) } // 这个就是 less函数,作为打分的一部分 comp := func(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.QueuedPodInfo) pInfo2 := podInfo2.(*framework.QueuedPodInfo) return lessFn(pInfo1, pInfo2) } if options.podNominator == nil { options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) } pq := &PriorityQueue{ PodNominator: options.podNominator, clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), moveRequestCycle: -1, clusterEventMap: options.clusterEventMap, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() return pq }
了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs
这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。
注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈
Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { pod := obj.(*v1.Pod) klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod)) if err := sched.SchedulingQueue.Add(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) } }
而这个 SchedulingQueue
的实现就是 PriorityQueue
,而Add中则对 activeQ进行的操作
func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() // 格式化入栈数据,包含podinfo,里会包含v1.Pod // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称 pInfo := p.newQueuedPodInfo(pod) // 入栈 if err := p.activeQ.Add(pInfo); err != nil { klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod)) return err } if p.unschedulablePods.get(pod) != nil { klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) p.unschedulablePods.delete(pod) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.cond.Broadcast() return nil }
在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo { return func() *framework.QueuedPodInfo { podInfo, err := queue.Pop() if err == nil { klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod)) for plugin := range podInfo.UnschedulablePlugins { metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec() } return podInfo } klog.ErrorS(err, "Error while retrieving next pod from scheduling queue") return nil } }
而这个 queue.Pop()
对应的就是 PriorityQueue
的 Pop() ,在这里会将作为 activeQ 的消费端
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). if p.closed { return nil, fmt.Errorf(queueClosed) } p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { return nil, err } pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ return pInfo, nil }
在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc
,至此回到对应的 Pop来做消费。
schedulerOne是为一个Pod做调度的流程。
func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. klog.ErrorS(err, "Error occurred") return } if sched.skipPodSchedule(fwk, pod) { return } ...
调度上下文
当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。
调度框架 [2]
调度框架 (scheduling framework
SF ) 是kubernetes为 scheduler设计的一个pluggable的架构。SF 将scheduler设计为 Plugin 式的 API,API将上一章中提到的一些列调度策略实现为 Plugin
。
在 SF 中,定义了一些扩展点 (extension points
EP ),而被实现为Plugin的调度程序将被注册在一个或多个 EP 中,换句话来说,在这些 EP 的执行过程中如果注册在多个 EP 中,将会在多个 EP 被调用。
每次调度都分为两个阶段,调度周期(Scheduling Cycel
)与绑定周期(Binding Cycle
)。
- SC 表示为,为Pod选择一个节点;SC 是串行运行的。
- BC 表示为,将 SC 决策结果应用于集群中;BC 可以同时运行。
调度周期与绑定周期结合一起,被称为调度上下文 (Scheduling Context
),下图则是调度上下文的工作流
注:如果决策结果为Pod的调度结果无可用节点,或存在内部错误,则中止 SC 或 BC。Pod将重入队列重试
扩展点 [3]
扩展点(Extension points
)是指在调度上下文中的每个可扩展API,通过图提现为[图1]。其中 Filter
相当于 Predicate
而 Scoring
相当于 Priority
。
对于调度阶段会通过以下扩展点:
-
Sort
:该插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。 -
preFilter
:该插件用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度 -
filter
:该插件相当于调度上下文中的Predicates
,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。 -
postFilter
:当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter
插件将 Pod 标记为schedulable,则不会调用其余插件。即filter
成功后不会进行这步骤 -
preScore
:可用于进行预Score工作(通知性的扩展点)。 -
score
:该插件为每个通过filter
阶段的Node提供打分服务。然后Scheduler将选择具有最高加权分数总和的Node。 -
reserve
:因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发UnReserve
回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到postBind
,要么失败触发UnReserve
。 -
permit
:该插件可以阻止或延迟 Pod 的绑定,一般情况下这步骤会做三件事:appove
:调度器继续绑定过程Deny
:如果任何一个Premit拒绝了Pod与节点的绑定,那么将触发UnReserve
,并重入队列Wait
: 如果 Permit 插件返回Wait
,该 Pod 将保留在内部Wait
Pod 列表中,直到被Appove
。如果发生超时,wait
变为deny
,将Pod放回至调度队列中,并触发Unreserve
回滚 。
-
preBind
:该插件用于在 bind Pod 之前执行所需的前置工作。如,preBind
可能会提供一个网络卷并将其挂载到目标节点上。如果在该步骤中的任意插件返回错误,则Pod 将被deny
并放置到调度队列中。 -
bind
:在所有的preBind
完成后,该插件将用于将Pod绑定到Node,并按顺序调用绑定该步骤的插件。如果有一个插件处理了这个事件,那么则忽略其余所有插件。 -
postBind
:该插件在绑定 Pod 后调用,可用于清理相关资源(通知性的扩展点)。 -
multiPoint
:这是一个仅配置字段,允许同时为所有适用的扩展点启用或禁用插件。
而 scheduler 对于调度上下文在代码中的实现就是 scheduleOne
,下面就是看这个调度上下文
Sort
Sort
插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。
在进入 scheduleOne
后,NextPod
从 activeQ
中队列中得到一个Pod,然后的 frameworkForPod
会做打分的动作就是调度上下文的第一个扩展点 sort
func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod fwk, err := sched.frameworkForPod(pod) ... func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) { // 获取指定的profile fwk, ok := sched.Profiles[pod.Spec.SchedulerName] if !ok { return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) } return fwk, nil }
回顾,因为在New scheduler时会初始化这个 sort 函数
podQueue := internalqueue.NewSchedulingQueue( profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), )
preFilter
preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度
func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. klog.ErrorS(err, "Error occurred") return } if sched.skipPodSchedule(fwk, pod) { return } klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod)) // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. podsToActivate := framework.NewPodsToActivate() state.Write(framework.PodsToActivateKey, podsToActivate) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() // 这里将进入prefilter scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) // 用于将cache更新为当前内容 if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if sched.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } // 找到一个合适的pod时,会执行扩展点 feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) ...
findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件😁😁,后面会分析算法内容,只对workflow学习。
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { diagnosis := framework.Diagnosis{ NodeToStatusMap: make(framework.NodeToStatusMap), UnschedulablePlugins: sets.NewString(), } // Run "prefilter" plugins. preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod) allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, diagnosis, err } if !s.IsSuccess() { if !s.IsUnschedulable() { return nil, diagnosis, s.AsError() } // All nodes will have the same status. Some non trivial refactoring is // needed to avoid this copy. for _, n := range allNodes { diagnosis.NodeToStatusMap[n.Node().Name] = s } // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins. if s.FailedPlugin() != "" { diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin()) } return nil, diagnosis, nil } // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption. // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes. if len(pod.Status.NominatedNodeName) > 0 { feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) if err != nil { klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName) } // Nominated node passes all the filters, scheduler is good to assign this node to the pod. if len(feasibleNodes) != 0 { return feasibleNodes, diagnosis, nil } } nodes := allNodes if !preRes.AllNodes() { nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames)) for n := range preRes.NodeNames { nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n) if err != nil { return nil, diagnosis, err } nodes = append(nodes, nInfo) } } feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes) if err != nil { return nil, diagnosis, err } feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, diagnosis, err } return feasibleNodes, diagnosis, nil }
filter
filter插件相当于调度上下文中的 Predicates
,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。
对于代码中来讲,filter还是处于 findNodesThatFitPod 函数中,findNodesThatPassFilters
就是获取到 FN,即可行节点,而这个过程就是 filter 扩展点
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { ... feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes) if err != nil { return nil, diagnosis, err } feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, diagnosis, err } return feasibleNodes, diagnosis, nil }
Postfilter
当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter
插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter
成功后不会进行这步骤,那我们来验证下这里把😊
还是在 scheduleOne 中,当我们运行的 SchedulePod 完成后(成功或失败),这时会返回一个err,而 postfilter
会根据这个 err进行选择执行或不执行,符合官方给出的说法。
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) if err != nil { // SchedulePod() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. var nominatingInfo *framework.NominatingInfo if fitError, ok := err.(*framework.FitError); ok { if !fwk.HasPostFilterPlugins() { klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") } else { // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) if status.Code() == framework.Error { klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) } else { fitError.Diagnosis.PostFilterMsg = status.Message() klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) } if result != nil { nominatingInfo = result.NominatingInfo } } // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to // schedule it. (hopefully) metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else if err == ErrNoNodesAvailable { nominatingInfo = clearNominatedNode // No nodes available is counted as unschedulable rather than an error. metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else { nominatingInfo = clearNominatedNode klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return }
PreScore,Score
可用于进行预Score工作,作为通知性的扩展点,会在在filter完之后直接会关联 preScore 插件进行继续工作,而不是返回,如果配置的这些插件有任何一个返回失败,则Pod将被拒绝。
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if sched.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") if len(feasibleNodes) == 0 { return result, &framework.FitError{ Pod: pod, NumAllNodes: sched.nodeInfoSnapshot.NumNodes(), Diagnosis: diagnosis, } } // When only one node after predicate, just use it. if len(feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap), FeasibleNodes: 1, }, nil } // 这里会完成prescore,score priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes) if err != nil { return result, err } host, err := selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap), FeasibleNodes: len(feasibleNodes), }, err }
priorityNodes 会通过配置的插件给Node打分,并返回每个Node的分数,将每个插件打分结果计算总和获得Node的分数,最后获得节点的加权总分数。
func prioritizeNodes( ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(extenders) == 0 && !fwk.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ Name: nodes[i].Name, Score: 1, }) } return result, nil } // Run PreScore plugins. preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes) if !preScoreStatus.IsSuccess() { return nil, preScoreStatus.AsError() } // Run the Score plugins. scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } // Additional details logged at level 10 if enabled. klogV := klog.V(10) if klogV.Enabled() { for plugin, nodeScoreList := range scoresMap { for _, nodeScore := range nodeScoreList { klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score) } } } // Summarize all scores. result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(extenders) != 0 && nodes != nil { var mu sync.Mutex var wg sync.WaitGroup combinedScores := make(map[string]int64, len(nodes)) for i := range extenders { if !extenders[i].IsInterested(pod) { continue } wg.Add(1) go func(extIndex int) { metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() wg.Done() }() prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) if err != nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name()) return } mu.Lock() for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klogV.Enabled() { klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score) } combinedScores[host] += score * weight } mu.Unlock() }(i) } // wait for all go routines to finish wg.Wait() for i := range result { // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, // therefore we need to scale the score returned by extenders to the score range used by the scheduler. result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) } } if klogV.Enabled() { for i := range result { klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score) } } return result, nil }
Reserve
Reserve 因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve
回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind
,要么失败触发 UnReserve
。
// Run the Reserve method of reserve plugins. if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时 metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // 触发 un-reserve 来清理相关Pod的状态 fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) return }
permit
Permit 插件可以阻止或延迟 Pod 的绑定
// Run "permit" plugins. runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { var reason string if runPermitStatus.IsUnschedulable() { metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = SchedulerError } // 只要其中一个插件返回的状态不是 success 或者 wait fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) // 从cache中忘掉pod if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) return }
Binding Cycle
在选择好 FN 后则做一个假设绑定,并更新到cache中,接下来回去执行真正的bind操作,也就是 binding cycle
func (sched *Scheduler) scheduleOne(ctx context.Context) { ... ... // binding cycle 是一个异步的操作,这里表现就是go协程 go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() // 运行WaitOnPermit插件,如果失败则,unReserve回滚 waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { var reason string if waitOnPermitStatus.IsUnschedulable() { metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = SchedulerError } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // as the assumed Pod had occupied a certain amount of resources in scheduler cache. // TODO(#103853): de-duplicate the logic. // Avoid moving the assumed Pod itself as it's always Unschedulable. // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { return assumedPod.UID != pod.UID }) } sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return } // 运行Prebind 插件 preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // as the assumed Pod had occupied a certain amount of resources in scheduler cache. // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) return } // bind是真正的绑定操作 err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // 如果失败了就触发 un-reserve plugins fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if err := sched.Cache.ForgetPod(assumedPod); err != nil { klog.ErrorS(err, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // as the assumed Pod had occupied a certain amount of resources in scheduler cache. // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) return } // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // 运行 "postbind" 插件 // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。 fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) // At the end of a successful binding cycle, move up Pods if needed. if len(podsToActivate.Map) != 0 { sched.SchedulingQueue.Activate(podsToActivate.Map) // Unlike the logic in scheduling cycle, we don't bother deleting the entries // as `podsToActivate.Map` is no longer consumed. } }() }
调度上下文中的失败流程
上面说到的都是正常的请求,下面会对失败的请求是如何重试的进行分析,而 scheduler 中关于失败处理方面相关的属性会涉及到上面 scheduler 结构中的 backoffQ
与 unschedulablePods
backoffQ
:也是一个 heap 类型的优先级队列,存放的是不可调度的PodunschedulablePods
:保存确定不可被调度的Pod,一个map类型
backoffQ 与 unschedulablePods 会在初始化 scheduler 时初始化,
func NewPriorityQueue( lessFn framework.LessFunc, informerFactory informers.SharedInformerFactory, opts ...Option, ) *PriorityQueue { options := defaultPriorityQueueOptions for _, opt := range opts { opt(&options) } comp := func(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.QueuedPodInfo) pInfo2 := podInfo2.(*framework.QueuedPodInfo) return lessFn(pInfo1, pInfo2) } if options.podNominator == nil { options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) } pq := &PriorityQueue{ PodNominator: options.podNominator, clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), moveRequestCycle: -1, clusterEventMap: options.clusterEventMap, } pq.cond.L = &pq.lock // 初始化backoffQ // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。 // podInfoKeyFunc是一个函数,返回错误与字符串 // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true, // 反之 false pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() return pq }
对于初始化 backoffQ 会产生的两个函数,getBackoffTime 与 calculateBackoffDuration
// getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { duration := p.calculateBackoffDuration(podInfo) backoffTime := podInfo.Timestamp.Add(duration) return backoffTime } // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { // Use subtraction instead of addition or multiplication to avoid overflow. if duration > p.podMaxBackoffDuration-duration { return p.podMaxBackoffDuration } duration += duration } return duration }
对于整个故障错误会按照如下流程进行,在初始化 scheduler 会注册一个 Error 函数,这个函数用作对不可调度Pod进行处理,实际上被注册的函数是 MakeDefaultErrorFunc。这个函数将作为 Error 函数被调用。
sched := newScheduler( schedulerCache, extenders, internalqueue.MakeNextPodFunc(podQueue), MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache), stopEverything, podQueue, profiles, client, snapshot, options.percentageOfNodesToScore, )
而在 调度周期中,也就是 scheduleOne 可以看到,每个扩展点操作失败后都会调用 handleSchedulingFailure 而该函数,使用了注册的 Error 函数来处理Pod
func (sched *Scheduler) scheduleOne(ctx context.Context) { ... defer cancel() scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) if err != nil { var nominatingInfo *framework.NominatingInfo if fitError, ok := err.(*framework.FitError); ok { if !fwk.HasPostFilterPlugins() { klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") } else { result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) if status.Code() == framework.Error { klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) } else { fitError.Diagnosis.PostFilterMsg = status.Message() klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) } if result != nil { nominatingInfo = result.NominatingInfo } } metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else if err == ErrNoNodesAvailable { nominatingInfo = clearNominatedNode // No nodes available is counted as unschedulable rather than an error. metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else { nominatingInfo = clearNominatedNode klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } // 处理不可调度Pod sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return }
来到了注册的 Error 函数 MakeDefaultErrorFunc
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { return func(podInfo *framework.QueuedPodInfo, err error) { pod := podInfo.Pod if err == ErrNoNodesAvailable { klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) } else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) } else if apierrors.IsNotFound(err) { klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { nodeName := errStatus.Status().Details.Name // when node is not found, We do not remove the node right away. Trying again to get // the node and if the node is still not found, then remove it from the scheduler cache. _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) if err != nil && apierrors.IsNotFound(err) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} if err := schedulerCache.RemoveNode(&node); err != nil { klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name) } } } } else { klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod)) } // Check if the Pod exists in informer cache. cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) if err != nil { klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err) return } // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. if len(cachedPod.Spec.NodeName) != 0 { klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) return } // As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here. podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy()) // 添加到unschedulable队列中 if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { klog.ErrorS(err, "Error occurred") } } }
下面来到 AddUnschedulableIfNotPresent
,这个也是操作 backoffQ
和 unschedulablePods
的真正的动作
AddUnschedulableIfNotPresent
函数会吧无法调度的 pod 插入队列,除非它已经在队列中。通常情况下,PriorityQueue
将不可调度的 Pod 放在 unschedulablePods
中。但如果最近有 move request,则将 pod 放入 podBackoffQ
中。
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() pod := pInfo.Pod // 如果已经存在则不添加 if p.unschedulablePods.get(pod) != nil { return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) } // 检查是否在activeQ中 if _, exists, _ := p.activeQ.Get(pInfo); exists { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } // 检查是否在podBackoffQ中 if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) } // 在重新添加时,会刷新 Pod时间为最新操作的时间 pInfo.Timestamp = p.clock.Now() for plugin := range pInfo.UnschedulablePlugins { metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc() } // 如果接受到move request那么则放入BackoffQ if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc() } else { // 否则将放入到 unschedulablePods p.unschedulablePods.addOrUpdate(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) return nil }
在启动 scheduler 时,会将这两个队列异步启用两个loop来操作队列。表现在 Run()
func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop) }
可以看到 flushBackoffQCompleted 作为 BackoffQ
实现;而 flushUnschedulablePodsLeftover 作为 UnschedulablePods
实现。
flushBackoffQCompleted
是用于将所有已完成回退的 pod 从 backoffQ
移到 activeQ
中
func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() broadcast := false for { // 这就是heap实现的方法,窥视下,但不弹出 rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { break } pod := rawPodInfo.(*framework.QueuedPodInfo).Pod boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo)) if boTime.After(p.clock.Now()) { break } _, err := p.podBackoffQ.Pop() // 弹出一个 if err != nil { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } p.activeQ.Add(rawPodInfo) // 放入到活动队列中 metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() broadcast = true } if broadcast { p.cond.Broadcast() } }
flushUnschedulablePodsLeftover
函数用于将在 unschedulablePods
中的存放时间超过 podMaxInUnschedulablePodsDuration
值的 pod 移动到 backoffQ
或 activeQ
中。
podMaxInUnschedulablePodsDuration
会根据配置传入,当没有传入,也就是使用了 Deprecated 那么会为5分钟。
func NewOptions() *Options { o := &Options{ SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(), Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(), Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(), Deprecated: &DeprecatedOptions{ PodMaxInUnschedulablePodsDuration: 5 * time.Minute, },
对于 flushUnschedulablePodsLeftover
就是做一个时间对比,然后添加到对应的队列中
func (p *PriorityQueue) flushUnschedulablePodsLeftover() { p.lock.Lock() defer p.lock.Unlock() var podsToMove []*framework.QueuedPodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulablePods.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration { podsToMove = append(podsToMove, pInfo) } } if len(podsToMove) > 0 { p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) } }
总结调度上下文流程
- 在构建一个 scheduler 时经历如下步骤:
- 准备cache,informer,queue,错误处理函数等
- 添加事件函数,会监听资源(如Pod),当有变动则触发对应事件函数,这是入站
activeQ
- 构建完成后会 run,run时会run一个
SchedulingQueue
,这个是作为不可调度队列BackoffQ
UnschedulablePods
- 不可调度队列会根据注册时定期消费队列中Pod将其添加到
activeQ
中
- 启动一个
scheduleOne
的loop,这个是调度上下文中所有的扩展点的执行,也是activeQ
的消费端scheduleOne
获取 pod- 执行各个扩展点,如果出错则 Error 函数
MakeDefaultErrorFunc
将其添加到不可调度队列中 - 回到不可调度队列中消费部分
Reference
[1] kubernetes scheduler extender
[2] scheduling framework
[3] Extension points