Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go kube-scheduler开发剖析

Go语言kube-scheduler深度剖析与开发之pod调度

作者:俯仰之间

这篇文章主要为大家介绍了Go语言kube-scheduler深度剖析与开发,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

为了深入学习 kube-scheduler,本系从源码和实战角度深度学 习kube-scheduler,该系列一共分6篇文章,如下:

上一篇文章我们讲了一个 kube-scheduler 是怎么初始化出来的,有了 调度器之后就得开始让他干活了 这一篇文章我们来讲讲一个 Pod 是怎么被调度到某个 Node 的。

我把调度一个 Pod 分为3个阶段

感知 Pod

要能够获取到 Pod 的前提是:kube-scheduler 能感知到有 Pod 需要被调度,得知有 Pod 需要被调度后还需要有地方存放被调度的 Pod 的信息。为了感知有 Pod 需要被调度,kube-scheduler 启动时通过 Informer watch Pod 的变化,它把待调度的 Pod 分了两种情况,代码如下

// pkg/scheduler/eventhandlers.go
func addAllEventHandlers(...) {
  //已经调度过的 Pod 则加到本地缓存,并判断是加入到调度队列还是加入到backoff队列
  informerFactory.Core().V1().Pods().Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
        switch t := obj.(type) {
        case *v1.Pod:
          return assignedPod(t)
        case cache.DeletedFinalStateUnknown:
          if _, ok := t.Obj.(*v1.Pod); ok {
            // The carried object may be stale, so we don't use it to check if
            // it's assigned or not. Attempting to cleanup anyways.
            return true
          }
          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
          return false
        default:
          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
          return false
        }
      },
      Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToCache,
        UpdateFunc: sched.updatePodInCache,
        DeleteFunc: sched.deletePodFromCache,
      },
    },
  )
  // 没有调度过的Pod,放到调度队列
  informerFactory.Core().V1().Pods().Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
        switch t := obj.(type) {
        case *v1.Pod:
          return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
        case cache.DeletedFinalStateUnknown:
          if pod, ok := t.Obj.(*v1.Pod); ok {
            // The carried object may be stale, so we don't use it to check if
            // it's assigned or not.
            return responsibleForPod(pod, sched.Profiles)
          }
          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
          return false
        default:
          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
          return false
        }
      },
      Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToSchedulingQueue,
        UpdateFunc: sched.updatePodInSchedulingQueue,
        DeleteFunc: sched.deletePodFromSchedulingQueue,
      },
    },
  )
......
}

加入或更新缓存后,还需要做一件事:去 unschedulablePods(调度失败的Pod) 中获取 Pod,这些 Pod 的亲和性和刚刚加入的这个 Pod 匹配,然后根据下面的规则判断是把 Pod 放入 backoffQ 还是放入 activeQ

从上面我们可以看到,一个 Pod 的变化会触发此前调度失败的 Pod 重新判断是否可以被调度

len(pod.Spec.NodeName) = 0,那么这个 Pod 没有被调度过或者是此前调度过但是调度失败的(用户修改了 Pod 的配置导致 Pod 发生变化,又被 kube-scheduler 感知到了),如果是没有调度过的 Pod 那么直接加入到 activeQ,如果是调度失败的 Pod 则根据上述规则判断是加入 backoffQ 还是 activeQ。加入到 activeQ 会马上被取走,然后开始调度。

那么那些因为调度失败而被放入 unscheduleable 的 Pod 还有其他机会(上面说的有新 Pod 创建是一个机会)重新被调度么?答案是有的,否则他们就“被饿死了”,有两种途径:1. 定期强制将 unscheduleable 的 Pod 放入 backoffQ 或 activeQ,定期将 backoffQ 等待超时的 Pod 放入 ac activeQ;2. 集群内其他相关资源变化时,判断 unscheduleable 中的 Pod 是不是要放入 backoffQ 或 activeQ,其实这跟有 Pod 发生变化的情况是一样的。

第一种情况

在 kube-scheduler启动的时候中会起两个协程,他们会定期把 backoffQ 和 unscheduleable 里面的 Pod拿到activeQ里面去

func (p *PriorityQueue) Run() {
   go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
   go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

flushUnschedulablePodsLeftover

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
   p.lock.Lock()
   defer p.lock.Unlock()
   var podsToMove []*framework.QueuedPodInfo
   currentTime := p.clock.Now()
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      lastScheduleTime := pInfo.Timestamp
      if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
         podsToMove = append(podsToMove, pInfo)
      }
   }
   if len(podsToMove) > 0 {
      p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
   }
}
    func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
       activated := false
       for _, pInfo := range podInfoList {
          // If the event doesn't help making the Pod schedulable, continue.
          // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
          // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
          // In that case, it's desired to move it anyways.
          if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
             continue
          }
          pod := pInfo.Pod
          if p.isPodBackingoff(pInfo) {
             if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
             } else {
                metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          } else {
             if err := p.activeQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
             } else {
                    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          }
       }
       p.moveRequestCycle = p.schedulingCycle
       if activated {
          p.cond.Broadcast()
       }
    }x

将在 unscheduleable 里面停留时长超过 podMaxInUnschedulablePodsDuration(默认是5min)的pod放入到 ActiveQ 或 BackoffQueue,具体是放到哪个队列里面,还是根据我们上文说的那个实际计算规则来。这么做的原因就是给那些“问题少年”一次重新做人的机会,也不能一犯错误(调度失败)就彻底打入死牢了。

flushBackoffQCompleted

去 backoffQ 获取等待结束的 Pod,放入 activeQ

    func (p *PriorityQueue) flushBackoffQCompleted() {
       p.lock.Lock()
       defer p.lock.Unlock()
       activated := false
       for {
          rawPodInfo := p.podBackoffQ.Peek()
          if rawPodInfo == nil {
             break
          }
          pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
          boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
          if boTime.After(p.clock.Now()) {
             break
          }
          _, err := p.podBackoffQ.Pop()
          if err != nil {
             klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
             break
          }
          p.activeQ.Add(rawPodInfo)
          metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
          activated = true
       }
       if activated {
          p.cond.Broadcast()
       }
    }

第二种情况

集群内资源发生变化

informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
   cache.ResourceEventHandlerFuncs{
      AddFunc:    sched.addNodeToCache,
      UpdateFunc: sched.updateNodeInCache,
      DeleteFunc: sched.deleteNodeFromCache,
   },
)

新加入节点

func (sched *Scheduler) addNodeToCache(obj interface{}) {
   node, ok := obj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj)
      return
   }
   nodeInfo := sched.Cache.AddNode(node)
   klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
   // Note: the following checks doesn't take preemption into considerations, in very rare
   // cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
   // chose to ignore those cases as unschedulable pods will be re-queued eventually.
   return func(pod *v1.Pod) bool {
      admissionResults := AdmissionCheck(pod, nodeInfo, false)
      if len(admissionResults) != 0 {
         return false
      }
      _, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
         return t.Effect == v1.TaintEffectNoSchedule
      })
      return !isUntolerated
   }
}

可以看到,当有节点加入集群的时候,会把 unscheduleable 里面的Pod 依次拿出来做下面的判断:

只有上述4个条件都满足,那么新加入节点这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ,至于是加入哪个queue,上面已经分析过了

节点更新

func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
   oldNode, ok := oldObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
      return
   }
   newNode, ok := newObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
      return
   }
   nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
   // Only requeue unschedulable pods if the node became more schedulable.
   if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
      sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
   }
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
   if nodeSpecUnschedulableChanged(newNode, oldNode) {
      return &queue.NodeSpecUnschedulableChange
   }
   if nodeAllocatableChanged(newNode, oldNode) {
      return &queue.NodeAllocatableChange
   }
   if nodeLabelsChanged(newNode, oldNode) {
      return &queue.NodeLabelChange
   }
   if nodeTaintsChanged(newNode, oldNode) {
      return &queue.NodeTaintChange
   }
   if nodeConditionsChanged(newNode, oldNode) {
      return &queue.NodeConditionChange
   }
   return nil
}

首先是判断节点是何种配置发生了变化,有如下情况

如果某个 Pod 调度失败的原因可以匹配到上面其中一个原因,那么节点更新这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ

informerFactory.Core().V1().Pods().Informer().AddEventHandler(
   cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
         switch t := obj.(type) {
         case *v1.Pod:
            return assignedPod(t)
         case cache.DeletedFinalStateUnknown:
            if _, ok := t.Obj.(*v1.Pod); ok {
               // The carried object may be stale, so we don't use it to check if
               // it's assigned or not. Attempting to cleanup anyways.
               return true
            }
            utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
            return false
         default:
            utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
            return false
         }
      },
      Handler: cache.ResourceEventHandlerFuncs{
         AddFunc:    sched.addPodToCache,
         UpdateFunc: sched.updatePodInCache,
         DeleteFunc: sched.deletePodFromCache,
      },
   },
)

已经存在的 Pod 发生变化

func (sched *Scheduler) addPodToCache(obj interface{}) {
   pod, ok := obj.(*v1.Pod)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
      return
   }
   klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.AddPod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
   p.lock.Lock()
   p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
   p.lock.Unlock()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
   var nsLabels labels.Set
   nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
   var podsToMove []*framework.QueuedPodInfo
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      for _, term := range pInfo.RequiredAffinityTerms {
         if term.Matches(pod, nsLabels) {
            podsToMove = append(podsToMove, pInfo)
            break
         }
      }
   }
   return podsToMove
}

可以看到,已经存在的Pod发生变化后,会把这个Pod亲和性配置依次和 unscheduleable 里面的Pod匹配,如果能够匹配上,那么节点更新这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ。

集群内有Pod删除

func (sched *Scheduler) deletePodFromCache(obj interface{}) {
  var pod *v1.Pod
   switch t := obj.(type) {
   case *v1.Pod:
      pod = t
   case cache.DeletedFinalStateUnknown:
      var ok bool
      pod, ok = t.Obj.(*v1.Pod)
      if !ok {
         klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
         return
      }
   default:
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t)
      return
   }
   klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.RemovePod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}

可以看到,Pod删除时间不像其他时间需要做额外的判断,这个preCheck函数是空的,所以所有 unscheduleable 里面的Pod都会被放到 activeQ 或 backoffQ 中。

从上面的情况,我们可以看到,集群内有事件发生变化,是可以加速调度失败的Pod被重新调度的进程的。常规的是,调度失败的 Pod 需要等5min 然后才会被重新加入 backoffQ 或 activeQ。backoffQ里面的Pod也需要等一段时间才会重新调度。这也就是为什么,当你修改节点配置的时候,能看到Pod马上重新被调度的原因

上面就是一个Pod调度失败后,重新触发调度的情况了。

取出 Pod

Scheduler 中有个成员 NextPod 会从 activeQ 队列中尝试获取一个待调度的 Pod,该函数在 SchedulePod 中被调用,如下:

// 启动 Scheduler
func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	<-ctx.Done()
	sched.SchedulingQueue.Close()
}
// 尝试调度一个 Pod,所以 Pod 的调度入口
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	// 会一直阻塞,直到获取到一个Pod
	......
	podInfo := sched.NextPod()
    ......
}

NextPod 它被赋予如下函数:

// pkg/scheduler/internal/queue/scheduling_queue.go
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
	return func() *framework.QueuedPodInfo {
		podInfo, err := queue.Pop()
		if err == nil {
			klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
			for plugin := range podInfo.UnschedulablePlugins {
				metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
			}
			return podInfo
		}
		klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
		return nil
	}
}

Pop 会一直阻塞,直到 activeQ 长度大于0,然后取出一个 Pod 返回

// pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
		// When Close() is called, the p.closed is set and the condition is broadcast,
		// which causes this loop to continue and return from the Pop().
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		p.cond.Wait()
	}
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	pInfo.Attempts++
	p.schedulingCycle++
	return pInfo, nil
}

调度 Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 取出 Pod
    podInfo := sched.NextPod()
    ...
    // 根据 Pod 的调度名字,获取之前初始化好的调度框架(framework)
    fwk, err := sched.frameworkForPod(pod)
    ...
    // 开始执行插件,包括 filter, socre 两个扩展点内的所有插件,获取一个最合适 Pod 的节点
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    // 如果获取节点失败,则开始运行 postFilter 开始抢占一个 Pod
    if err != nil {
        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
    }
    ....
    // 将 Pod 放入 assumedPod 存储,即假设 Pod 已经调度成功
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    // 运行 Reserve 插件
    fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    ...
    // 运行 Permit 插件
    fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    ...
    // 启动一个协程,开始绑定,主流程到了这里就结束了,然后开始新的一轮调度;
    go func() {
        // 执行 preBind 插件
        fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        ...
        // 执行绑定插件,会调用 kube-apiserver 写入etcd 调度结果,就是给 Pod 赋予 Nodename
        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        ...
        // 执行 postBind
        fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    }

好了,到了这里一个 Pod 如果能够正常的被调度的话,那么流程就结束了。如果调度失败的话,Pod会被放入 unscheduleable 中,后续还会对 unscheduleable 中的 Pod 重新调度。

您可能感兴趣的文章:
阅读全文