Kubernetes Informer数据存储Index与Pod分配流程解析
作者:LuckyLove
这篇文章主要为大家介绍了Kubernetes Informer数据存储Index与Pod分配流程解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
确立目标
- 理解Informer的数据存储方式
- 大致理解Pod的分配流程
理解Informer的数据存储方式 代码在k8s.io/client-go/tools/cache/controller
Process 查看消费的过程
func (c *controller) processLoop() { for { // Pop出Object元素 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // 重新进队列 c.config.Queue.AddIfNotPresent(obj) } } } } // 去查看Pop的具体实现 点进Pop 找到fifo.go func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { // 调用process去处理item,然后返回 item, ok := f.items[id] delete(f.items, id) err := process(item) return item, err } } // 然后去查一下 PopProcessFunc 的定义,在创建controller前 share_informer.go的Run()里面 cfg := &Config{ Process: s.HandleDeltas, } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) { switch d.Type { // 增、改、替换、同步 case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) // 先去indexer查询 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // 如果数据已经存在,就执行Update逻辑 if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 分发Update事件 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { // 没查到数据,就执行Add操作 if err := s.indexer.Add(d.Object); err != nil { return err } // 分发 Add 事件 s.processor.distribute(addNotification{newObj: d.Object}, false) } // 删除 case Deleted: // 去indexer删除 if err := s.indexer.Delete(d.Object); err != nil { return err } // 分发 delete 事件 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
Index 掌握Index数据结构
Index
的定义为资源的本地存储,保持与etcd中的资源信息一致。
// 我们去看看Index是怎么创建的 func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, // indexer 的初始化 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } // 生成一个map和func组合而成的Indexer func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } // ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map[string]interface{}{}, indexers: indexers, indices: indices, } }
distribute 信息的分发distribute
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数 s.processor.distribute() // 分发process的创建 func NewSharedIndexInformer() SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, } return sharedIndexInformer } // sharedProcessor的结构 type sharedProcessor struct { listenersStarted bool // 读写锁 listenersLock sync.RWMutex // 普通监听列表 listeners []*processorListener // 同步监听列表 syncingListeners []*processorListener clock clock.Clock wg wait.Group } // 查看distribute函数 func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 将object分发到 同步监听 或者 普通监听 的列表 if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } // 这个add的操作是利用了channel func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
理解一个pod的被调度的大致流程
Scheduler
在前面,我们了解了Pod调度算法的注册和Informer机制来监听kube-apiserver上的资源变化,这一次,我们就将两者串联起来,看看在kube-scheduler中,Informer监听到资源变化后,如何用调度算法将pod进行调度。
// 在setup()中找到scheduler // 在运行 kube-scheduler 的初期,我们创建了一个Scheduler的数据结构,回头再看看有什么和pod调度算法相关的 type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // 获取下一个需要调度的Pod NextPod func() *framework.QueuedPodInfo Error func(*framework.QueuedPodInfo, error) StopEverything <-chan struct{} // 等待调度的Pod队列,我们重点看看这个队列是什么 SchedulingQueue internalqueue.SchedulingQueue Profiles profile.Map scheduledPodsHasSynced func() bool client clientset.Interface } // Scheduler的实例化函数 在最新的版本中少了create这一层 直接是进行里面的逻辑 func New(){ var sched *Scheduler switch { // 从 Provider 创建 case source.Provider != nil: sc, err := configurator.createFromProvider(*source.Provider) sched = sc // 从文件或者ConfigMap中创建 case source.Policy != nil: sc, err := configurator.createFromConfig(*policy) sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } } // 两个创建方式,底层都是调用的 create 函数 func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { return c.create() } func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error){ return c.create() } func (c *Configurator) create() (*Scheduler, error) { // 实例化 podQueue podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, // NextPod 函数依赖于 podQueue NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, // 调度队列被赋值为podQueue SchedulingQueue: podQueue, }, nil } // 再看看这个调度队列的初始化函数,点进去podQueue,从命名可以看到是一个优先队列,它的实现细节暂不细看 // 结合实际情况思考下,pod会有重要程度的区分,所以调度的顺序需要考虑优先级的 func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { return NewPriorityQueue(lessFn, opts...) }
SchedulingQueue
// 在上面实例化Scheduler后,有个注册事件 Handler 的函数:addAllEventHandlers(sched, informerFactory, podInformer) informer接到消息之后触发对应的Handler func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { /* 函数前后有很多注册的Handler,但是和未调度pod添加到队列相关的,只有这个 */ podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ // 定义过滤函数:必须为未调度的pod FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles) } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, // 增改删三个操作对应的Handler,操作到对应的Queue Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, ) } // 牢记我们第一阶段要分析的对象:create nginx pod,所以进入这个add的操作,对应加入到队列 func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { pod := obj.(*v1.Pod) klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name) // 加入到队列 if err := sched.SchedulingQueue.Add(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) } } // 在实例化Scheduler的地方 // 入队操作我们清楚了,那出队呢?我们回过头去看看上面定义的NextPod的方法实现 func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo { return func() *framework.QueuedPodInfo { // 从队列中弹出 podInfo, err := queue.Pop() if err == nil { klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) return podInfo } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) return nil } }
scheduleOne
// 了解入队和出队操作后,我们看一下Scheduler运行的过程 func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() // 调度一个pod对象 wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() } // 接下来scheduleOne方法代码很长,我们一步一步来看 func (sched *Scheduler) scheduleOne(ctx context.Context) { // podInfo 就是从队列中获取到的pod对象 podInfo := sched.NextPod() // 检查pod的有效性 if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod // 根据定义的 pod.Spec.SchedulerName 查到对应的profile prof, err := sched.profileForPod(pod) if err != nil { klog.Error(err) return } // 可以跳过调度的情况,一般pod进不来 if sched.skipPodSchedule(prof, pod) { return } // 调用调度算法,获取结果 scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { /* 出现调度失败的情况: 这个时候可能会触发抢占preempt,抢占是一套复杂的逻辑,后面我们专门会讲 目前假设各类资源充足,能正常调度 */ } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // assumePod 是假设这个Pod按照前面的调度算法分配后,进行验证 assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // SuggestedHost 为建议的分配的Host err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { // 失败就重新分配,不考虑这种情况 } // 运行相关插件的代码先跳过 比如一些抢占插件 // 异步绑定pod go func() { // 有一系列的检查工作 // 真正做绑定的动作 err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { // 错误处理,清除状态并重试 } else { // 打印结果,调试时将log level调整到2以上 if klog.V(2).Enabled() { klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) } // metrics中记录相关的监控指标 metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // 运行绑定后的插件 prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() }
ScheduleResult 调度计算结果
// 调用算法下的Schedule func New(){ scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) } func (c *Configurator) create() (*Scheduler, error) { algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.disablePreemption, c.percentageOfNodesToScore, ) return &Scheduler{ Algorithm: algo, }, nil } // genericScheduler 的 Schedule 的实现 func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { // 对 pod 进行 pvc 的信息检查 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } // 对当前的信息做一个快照 if err := g.snapshot(); err != nil { return result, err } // Node 节点数量为0,表示无可用节点 if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } // Predict阶段:找到所有满足调度条件的节点feasibleNodes,不满足的就直接过滤 feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) // 没有可用节点直接报错 if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } // 只有一个节点就直接选用 if len(feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } // Priority阶段:通过打分,找到一个分数最高、也就是最优的节点 priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) host, err := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), FeasibleNodes: len(feasibleNodes), }, err } /* Predict 和 Priority 是选择调度节点的两个关键性步骤, 它的底层调用了各种algorithm算法。我们暂时不细看。 以我们前面讲到过的 NodeName 算法为例,节点必须与 NodeName 匹配,它是属于Predict阶段的。 在新版本中 这部分算法的实现放到了extenders,逻辑是一样的 */
Assume 初步推算
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // 将 host 填入到 pod spec字段的nodename,假定分配到对应的节点上 assumed.Spec.NodeName = host // 调用 SchedulerCache 下的 AssumePod if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) return err } if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil } // 回头去找 SchedulerCache 初始化的地方 func (c *Configurator) create() (*Scheduler, error) { return &Scheduler{ SchedulerCache: c.schedulerCache, }, nil } func New() (*Scheduler, error) { // 这里就是初始化的实例 schedulerCache schedulerCache := internalcache.New(30*time.Second, stopEverything) configurator := &Configurator{ schedulerCache: schedulerCache, } } // 看看AssumePod做了什么 func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { // 获取 pod 的 uid key, err := framework.GetPodKey(pod) if err != nil { return err } // 加锁操作,保证并发情况下的一致性 cache.mu.Lock() defer cache.mu.Unlock() // 根据 uid 找不到 pod 当前的状态 看看被调度了没有 if _, ok := cache.podStates[key]; ok { return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) } // 把 Assume Pod 的信息放到对应 Node 节点中 cache.addPod(pod) // 把 pod 状态设置为 Assume 成功 ps := &podState{ pod: pod, } cache.podStates[key] = ps cache.assumedPods[key] = true return nil }
Bind 实际绑定
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { start := time.Now() // 把 assumed 的 pod 信息保存下来 defer func() { sched.finishBinding(prof, assumed, targetNode, start, err) }() // 阶段1: 运行扩展绑定进行验证,如果已经绑定报错 bound, err := sched.extendersBinding(assumed, targetNode) if bound { return err } // 阶段2:运行绑定插件验证状态 bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) if bindStatus.IsSuccess() { return nil } if bindStatus.Code() == framework.Error { return bindStatus.AsError() } return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message()) }
Update To Etcd
// 这块的代码我不做细致的逐层分析了,大家根据兴趣自行探索 func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName) binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}, } // ClientSet就是访问kube-apiserver的客户端,将数据更新上去 err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } return nil }
站在前人的肩膀上,向前辈致敬,Respect!
Summary
Informer
依赖于Reflector
模块,它有个组件为 xxxInformer,如podInformer
- 具体资源的
Informer
包含了一个连接到kube-apiserver
的client
,通过List
和Watch
接口查询资源变更情况
检测到资源发生变化后,通过Controller
将数据放入队列DeltaFIFOQueue
里,生产阶段完成
在DeltaFIFOQueue
的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
- 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
- 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理
distribute
将object分发到同步监听或者普通监听的列表,然后被对应的handler处理
- Pod的调度是通过一个队列
SchedulingQueue
异步工作的 - 监听到对应pod事件后,放入队列
- 有个消费者从队列中获取pod,进行调度
单个pod的调度主要分为3个步骤:
- 根据Predict和Priority两个阶段,调用各自的算法插件,选择最优的Node
Assume
这个Pod被调度到对应的Node,保存到cache,加锁保证一致性。- 用extender和plugins进行验证,如果通过则绑定
Bind
绑定成功后,将数据通过client向kube-apiserver发送,更新etcd
以上就是Kubernetes Informer数据存储Index与Pod分配流程解析的详细内容,更多关于Kubernetes Informer数据存储的资料请关注脚本之家其它相关文章!