Kubernetes Client-go Informer 源码分析
幾乎所有的Controller manager 和CRD Controller 都會使用Client-go 的Informer 函數,這樣通過Watch 或者Get List 可以獲取對應的Object,下面我們從源碼分析角度來看一下Client go Informer 的機制。
kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) }kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh)這里的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go節選,主要以 k8s 默認的Deployment Informer?為例子??梢钥吹街苯邮褂肅lient-go Informer 還是非常簡單的,先不管NewCOntroller函數里面執行了什么,順著代碼來看一下kubeInformerFactory.Start?都干了啥。
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}} }可以看到這里遍歷了f.informers,而informers?的定義我們來看一眼數據結構
type sharedInformerFactory struct {client kubernetes.Interfacenamespace stringtweakListOptions internalinterfaces.TweakListOptionsFunclock sync.MutexdefaultResync time.DurationcustomResync map[reflect.Type]time.Durationinformers map[reflect.Type]cache.SharedIndexInformer// startedInformers is used for tracking which informers have been started.// This allows Start() to be called multiple times safely.startedInformers map[reflect.Type]bool }我們這里的例子,在運行的時候,f.informers里面含有的內容如下
type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}也就是說,每一種k8s 類型都會有自己的Informer函數。下面我們來看一下這個函數是在哪里注冊的,這里以Deployment Informer 為例。
首先回到剛開始初始化kubeClient?的代碼,
controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})注意這里的傳參,?kubeInformerFactory.Apps().V1().Deployments(), 這句話的意思就是指創建一個只關注Deployment 的Informer.
controller := &Controller{kubeclientset: kubeclientset,sampleclientset: sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister: fooInformer.Lister(),foosSynced: fooInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder: recorder,}deploymentInformer.Lister()?這里就是初始化了一個Deployment Lister,下面來看一下Lister函數里面做了什么。
// NewFilteredDeploymentInformer constructs a new informer for Deployment type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(options)},},&appsv1.Deployment{},resyncPeriod,indexers,) }func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) }func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) }func (f *deploymentInformer) Lister() v1.DeploymentLister {return v1.NewDeploymentLister(f.Informer().GetIndexer()) }注意這里的Lister?函數,它調用了Informer?,然后觸發了f.factory.InformerFor?,
這就最終調用了sharedInformerFactory InformerFor函數,
這里可以看到,informer = newFunc(f.client, resyncPeriod)這句話最終完成了對于informer的創建,并且注冊到了Struct object中,完成了前面我們的問題。
下面我們再回到informer start?
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}} }這里可以看到,它會遍歷所有的informer,然后選擇異步調用Informer 的RUN方法。我們來全局看一下Run方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,Process: s.HandleDeltas,}func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh) }首先它根據得到的 key 拆分函數和Store index 創建一個FIFO隊列,這個隊列是一個先進先出的隊列,主要用來保存對象的各種事件。
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {f := &DeltaFIFO{items: map[string]Deltas{},queue: []string{},keyFunc: keyFunc,knownObjects: knownObjects,}f.cond.L = &f.lockreturn f }可以看到這個隊列創建的比較簡單,就是使用 Map 來存放數據,String 數組來存放隊列的 Key。
后面根據client 創建的List 和Watch 函數,還有隊列創建了一個 config,下面將根據這個config 來初始化controller. 這個controller是client-go 的Cache controller ,主要用來控制從 APIServer 獲得的對象的 cache 以及更新對象。
下面主要關注這個函數調用
wg.StartWithChannel(processorStopCh, s.processor.run)這里進行了真正的Listering 調用。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopChp.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}p.wg.Wait() // Wait for all .pop() and .run() to stop }主要看 run 方法,還記得前面已經把ADD UPDATE DELETE 注冊了自定義的處理函數了嗎。這里就實現了前面函數的觸發
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retrieserr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})// the only way to get here is if the p.nextCh is empty and closedif err == nil {close(stopCh)}}, 1*time.Minute, stopCh) }可以看到當p.nexhCh channel?接收到一個對象進入的時候,就會根據通知類型的不同,選擇對應的用戶注冊函數去調用。那么這個channel 誰來向其中傳入參數呢
func (p *processorListener) pop() {defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stopvar nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}} }答案就是這個pop?函數,這里會從p.addCh中讀取增加的通知,然后轉給p.nexhCh??并且保證每個通知只會讀取一次。
下面就是最終的Controller run 函數,我們來看看到底干了什么
// Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = rc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)wait.Until(c.processLoop, time.Second, stopCh) }這里主要的就是wg.StartWithChannel(stopCh, r.Run),
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)wait.Until(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)}}, r.period, stopCh) }這里就調用了r.ListAndWatch?方法,這個方法比較復雜,我們慢慢來看。
// watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {start := r.clock.Now()eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()// update metricsdefer func() {r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))r.metrics.watchDuration.Observe(time.Since(start).Seconds())}()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():if !ok {break loop}if event.Type == watch.Error {return apierrs.FromObject(event.Object)}if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added:err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified:err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Now().Sub(start)if watchDuration < 1*time.Second && eventCount == 0 {r.metrics.numberOfShortWatches.Inc()return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)return nil }這里就是真正調用watch 方法,根據返回的watch 事件,將其放入到前面創建的 FIFO 隊列中。
最終調用了controller 的POP 方法
// processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == FIFOClosedError {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}} }前面是將 watch 到的對象加入到隊列中,這里的goroutine 就是用來消費的。具體的消費函數就是前面創建的Process 函數
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil }這個函數就是根據傳進來的obj,先從自己的cache 中取一下,看是否存在,如果存在就代表是Update ,那么更新自己的隊列后,調用用戶注冊的Update 函數,如果不存在,就調用用戶的 Add 函數。
到此Client-go 的Informer 流程源碼分析基本完畢。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Kubernetes Client-go Informer 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 这个情人节,工程师用阿里云来试着表达不一
- 下一篇: 2018最佳GAN论文回顾(下)