controller-runtime 控制器实现
原文連接:https://jishuin.proginn.com/p/763bfbd2f5b9
controller-runtime(https://github.com/kubernetes-sigs/controller-runtime) 框架實際上是社區(qū)幫我們封裝的一個控制器處理的框架,底層核心實現(xiàn)原理和我們前面去自定義一個 controller 控制器邏輯是一樣的,只是在這個基礎(chǔ)上新增了一些概念,開發(fā)者直接使用這個框架去開發(fā)控制器會更加簡單方便。包括 kubebuilder、operator-sdk 、apiserver-builder這些框架其實都是在 controller-runtime 基礎(chǔ)上做了一層封裝,方便開發(fā)者快速生成項目的腳手架而已。下面我們就來分析下 controller-runtime 是如何實現(xiàn)的控制器處理。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-vBTj1APB-1635493458314)(/home/westone/桌面/3745c125356e65dc9fc670ebc4c9bec0.png)]
Controller 的實現(xiàn)
首先我們還是去查看下控制器的定義以及控制器是如何啟動的。控制器的定義結(jié)構(gòu)體如下所示:
sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
type Controller struct {// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.// Name 用于跟蹤、記錄和監(jiān)控中控制器的唯一標識,必填字段Name string// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.// 可以運行的最大并發(fā) Reconciles 數(shù)量,默認值為1MaxConcurrentReconciles int// Reconciler is a function that can be called at any time with the Name / Namespace of an object and// ensures that the state of the system matches the state specified in the object.// Defaults to the DefaultReconcileFunc.// Reconciler 是一個可以隨時調(diào)用對象的 Name/Namespace 的函數(shù)// 確保系統(tǒng)的狀態(tài)與對象中指定的狀態(tài)一致,默認為 DefaultReconcileFunc 函數(shù)Do reconcile.Reconciler// Client is a lazily initialized Client. The controllerManager will initialize this when Start is called.// Client 是一個延遲初始化的 Client。 在調(diào)用 Start 時,controllerManager 將初始化它。Client client.Client// Scheme is injected by the controllerManager when controllerManager.Start is called// 當controllerManager.Start被調(diào)用時,由controllerManager注入SchemeScheme *runtime.Scheme// informers are injected by the controllerManager when controllerManager.Start is called// 當 controllerManager.Start 被調(diào)用時,informers 被 controllerManager 注入Cache cache.Cache// Config is the rest.Config used to talk to the apiserver. Defaults to one of in-cluster, environment variable// specified, or the ~/.kube/Config.// Config 是用于與 apiserver 通信的 rest.Config。 默認為集群內(nèi)、指定的環(huán)境變量或 ~/.kube/Config 之一。Config *rest.Config// MakeQueue constructs the queue for this controller once the controller is ready to start.// This exists because the standard Kubernetes workqueues start themselves immediately, which// leads to goroutine leaks if something calls controller.New repeatedly.// 一旦控制器準備好啟動,MakeQueue 就會為這個控制器構(gòu)造工作隊列// 這是因為標準的 Kubernetes 工作隊列會立即啟動,如果某些東西重復調(diào)用 controller.New 會導致 goroutine 泄漏。MakeQueue func() workqueue.RateLimitingInterface// Queue is an listeningQueue that listens for events from Informers and adds object keys to// the Queue for processing// 隊列通過監(jiān)聽來自 Infomer 的事件,添加對象鍵到隊列中進行處理// MakeQueue 屬性就是來構(gòu)造這個工作隊列的// 也就是前面我們講解的工作隊列,我們將通過這個工作隊列來進行消費Queue workqueue.RateLimitingInterface// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates// SetFields 用來將依賴關(guān)系注入到其他對象,比如 Sources、EventHandlers 以及 PredicatesSetFields func(i interface{}) error// mu is used to synchronize Controller setup// 控制器同步信號量mu sync.Mutex// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster// 允許測試減少 JitterPeriod,使其更快完成JitterPeriod time.Duration// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error// defaults to Cache.WaitForCacheSync// WaitForCacheSync 允許測試模擬 WaitForCacheSync 函數(shù)以返回錯誤// 默認為 Cache.WaitForCacheSyncWaitForCacheSync func(stopCh <-chan struct{}) bool// Started is true if the Controller has been Started// 控制器是否已經(jīng)啟動Started bool// Recorder is an event recorder for recording Event resources to the// Kubernetes API.// Recorder 是一個事件記錄器,用于將 Event 資源記錄到// Kubernetes API。Recorder record.EventRecorder// TODO(community): Consider initializing a logger with the Controller Name as the tag// watches maintains a list of sources, handlers, and predicates to start when the controller is started.// watches 維護了一個 sources、handlers 以及 predicates 列表以方便在控制器啟動的時候啟動watches []watchDescription }上面的結(jié)構(gòu)體就是 controller-runtime 中定義的控制器結(jié)構(gòu)體,我們可以看到結(jié)構(gòu)體中仍然有一個限速的工作隊列,但是看上去沒有資源對象的 Informer 或者 Indexer 的數(shù)據(jù),實際上這里是通過下面的 startWatches 屬性做了一層封裝,該屬性是一個 watchDescription 隊列,一個 watchDescription 包含了所有需要 watch 的信息:
// watchDescription contains all the information necessary to start a watch. type watchDescription struct {src source.Sourcehandler handler.EventHandlerpredicates []predicate.Predicate }整個控制器中最重要的兩個函數(shù)是 Watch 與 Start,下面我們就來分析下他們是如何實現(xiàn)的。
Watch 函數(shù)實現(xiàn)
sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
// Watch implements controller.Controller func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()// Inject Cache into arguments// 注入 Cache 到參數(shù)中if err := c.SetFields(src); err != nil {return err}if err := c.SetFields(evthdler); err != nil {return err}for _, pr := range prct {if err := c.SetFields(pr); err != nil {return err}}c.watches = append(c.watches, watchDescription{src: src, handler: evthdler, predicates: prct})if c.Started {log.Info("Starting EventSource", "controller", c.Name, "source", src)return src.Start(evthdler, c.Queue, prct...)}return nil }上面的 Watch 函數(shù)可以看到最終是去調(diào)用的 Source 這個參數(shù)的 Start 函數(shù),Source 是事件的源,如對資源對象的 Create、Update、Delete 操作,需要由 event.EventHandlers 將 reconcile.Requests 入隊列進行處理。
使用 Kind 來處理來自集群的事件(如 Pod 創(chuàng)建、Pod 更新、Deployment 更新)。
使用 Channel 來處理來自集群外部的事件(如 GitHub Webhook 回調(diào)、輪詢外部 URL)。
sigs.k8s.io/controller-runtime/pkg/source/source.go
type Source interface {// Start is internal and should be called only by the Controller to register an EventHandler with the Informer// to enqueue reconcile.Requests.// Start 是一個內(nèi)部函數(shù),只應(yīng)該由 Controller 調(diào)用,向 Informer 注冊一個 EventHandler// 將 reconcile.Request 放入隊列Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error }我們可以看到 source.Source 是一個接口,它是 Controller.Watch 的一個參數(shù),所以要看具體的如何實現(xiàn)的 Source.Start 函數(shù),我們需要去看傳入 Controller.Watch 的參數(shù),在 controller-runtime 中調(diào)用控制器的 Watch 函數(shù)的入口實際上位于 sigs.k8s.io/controller-runtime/pkg/builder/controller.go 文件中的 doWatch() 函數(shù):
func (blder *Builder) doWatch() error {// Reconcile typesrc := &source.Kind{Type: blder.forInput.object}hdler := &handler.EnqueueRequestForObject{}allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)err := blder.ctrl.Watch(src, hdler, allPredicates...)if err != nil {return err}// Watches the managed typesfor _, own := range blder.ownsInput {src := &source.Kind{Type: own.object}hdler := &handler.EnqueueRequestForOwner{OwnerType: blder.forInput.object,IsController: true,}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, own.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}// Do the watch requestsfor _, w := range blder.watchesInput {allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, w.predicates...)if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {return err}}return nil }可以看到 Watch 的第一個參數(shù)是一個 source.Kind 的類型,該結(jié)構(gòu)體就實現(xiàn)了上面的 source.Source 接口:
sigs.k8s.io/controller-runtime/pkg/source/source.go
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create) type Kind struct {// Type is the type of object to watch. e.g. &v1.Pod{}// Type 是 watch 對象的類型,比如 &v1.Pod{}Type runtime.Object// cache used to watch APIs// cache 用于 watch 的 APIs 接口cache cache.Cache }var _ Source = &Kind{}// Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {// Type should have been specified by the user.// Type 在使用之前必須提前指定if ks.Type == nil {return fmt.Errorf("must specify Kind.Type")}// cache should have been injected before Start was called// cache 也應(yīng)該在調(diào)用 Start 之前被注入了if ks.cache == nil {return fmt.Errorf("must call CacheInto on Kind before calling Start")}// Lookup the Informer from the Cache and add an EventHandler which populates the Queue// 從 Cache 中獲取 Informer 并添加一個事件處理程序來添加隊列i, err := ks.cache.GetInformer(context.TODO(), ks.Type)if err != nil {if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {log.Error(err, "if kind is a CRD, it should be installed before calling Start","kind", kindMatchErr.GroupKind)}return err}i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})return nil }從上面的具體實現(xiàn)我們就可以看出來 Controller.Watch 函數(shù)就是實現(xiàn)的獲取資源對象的 Informer 以及注冊事件監(jiān)聽函數(shù)。Informer 是通過 cache 獲取的,cache 是在調(diào)用 Start 函數(shù)之前注入進來的,這里其實我們不用太關(guān)心;下面的 AddEventHandler 函數(shù)中是一個 internal.EventHandler 結(jié)構(gòu)體,那這個結(jié)構(gòu)體比如會實現(xiàn) client-go 中提供的 ResourceEventHandler 接口,也就是我們熟悉的 OnAdd、OnUpdate、OnDelete 幾個函數(shù):
sigs.k8s.io/controller-runtime/pkg/source/internal/eventsource.go
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface // EventHandler 實現(xiàn)了 cache.ResourceEventHandler 接口 type EventHandler struct {EventHandler handler.EventHandlerQueue workqueue.RateLimitingInterfacePredicates []predicate.Predicate }// OnAdd creates CreateEvent and calls Create on EventHandler func (e EventHandler) OnAdd(obj interface{}) {// kubernetes 對象被創(chuàng)建的事件c := event.CreateEvent{}// Pull metav1.Object out of the object// 獲取對象 metav1.Objectif o, err := meta.Accessor(obj); err == nil {c.Meta = o} else {log.Error(err, "OnAdd missing Meta","object", obj, "type", fmt.Sprintf("%T", obj))return}// Pull the runtime.Object out of the object// 斷言 runtime.Objectif o, ok := obj.(runtime.Object); ok {c.Object = o} else {log.Error(nil, "OnAdd missing runtime.Object","object", obj, "type", fmt.Sprintf("%T", obj))return}// Predicates 用于事件過濾,循環(huán)調(diào)用 Predicates 的 Create 函數(shù)for _, p := range e.Predicates {if !p.Create(c) {return}}// Invoke create handler// 調(diào)用創(chuàng)建處理程序e.EventHandler.Create(c, e.Queue) }// OnUpdate creates UpdateEvent and calls Update on EventHandler func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {u := event.UpdateEvent{}// Pull metav1.Object out of the objectif o, err := meta.Accessor(oldObj); err == nil {u.MetaOld = o} else {log.Error(err, "OnUpdate missing MetaOld","object", oldObj, "type", fmt.Sprintf("%T", oldObj))return}// Pull the runtime.Object out of the objectif o, ok := oldObj.(runtime.Object); ok {u.ObjectOld = o} else {log.Error(nil, "OnUpdate missing ObjectOld","object", oldObj, "type", fmt.Sprintf("%T", oldObj))return}// Pull metav1.Object out of the objectif o, err := meta.Accessor(newObj); err == nil {u.MetaNew = o} else {log.Error(err, "OnUpdate missing MetaNew","object", newObj, "type", fmt.Sprintf("%T", newObj))return}// Pull the runtime.Object out of the objectif o, ok := newObj.(runtime.Object); ok {u.ObjectNew = o} else {log.Error(nil, "OnUpdate missing ObjectNew","object", oldObj, "type", fmt.Sprintf("%T", oldObj))return}for _, p := range e.Predicates {if !p.Update(u) {return}}// Invoke update handlere.EventHandler.Update(u, e.Queue) }// OnDelete creates DeleteEvent and calls Delete on EventHandler func (e EventHandler) OnDelete(obj interface{}) {d := event.DeleteEvent{}// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a// DeleteFinalStateUnknown struct, so the object needs to be pulled out.// Copied from sample-controller// This should never happen if we aren't missing events, which we have concluded that we are not// and made decisions off of this belief. Maybe this shouldn't be here?var ok boolif _, ok = obj.(metav1.Object); !ok {// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknowntombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown","type", fmt.Sprintf("%T", obj),"object", obj)return}// Set obj to the tombstone objobj = tombstone.Obj}// Pull metav1.Object out of the objectif o, err := meta.Accessor(obj); err == nil {d.Meta = o} else {log.Error(err, "OnDelete missing Meta","object", obj, "type", fmt.Sprintf("%T", obj))return}// Pull the runtime.Object out of the objectif o, ok := obj.(runtime.Object); ok {d.Object = o} else {log.Error(nil, "OnDelete missing runtime.Object","object", obj, "type", fmt.Sprintf("%T", obj))return}for _, p := range e.Predicates {if !p.Delete(d) {return}}// Invoke delete handlere.EventHandler.Delete(d, e.Queue) }上面的 EventHandler 結(jié)構(gòu)體實現(xiàn)了 client-go 中的 ResourceEventHandler 接口,實現(xiàn)過程中我們可以看到調(diào)用了 Predicates 進行事件過濾,過濾后才是真正的事件處理,不過其實真正的事件處理也不是在這里去實現(xiàn)的,而是通過 Controller.Watch 函數(shù)傳遞進來的 handler.EventHandler 處理的,這個函數(shù)通過前面的 doWatch() 函數(shù)可以看出來它是一個 &handler.EnqueueRequestForObject{} 對象,所以真正的事件處理邏輯是這個函數(shù)去實現(xiàn)的:
sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event. // (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. type EnqueueRequestForObject struct{}// Create implements EventHandler func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Meta == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Meta.GetName(),Namespace: evt.Meta.GetNamespace(),}}) }// Update implements EventHandler func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {if evt.MetaOld != nil {q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.MetaOld.GetName(),Namespace: evt.MetaOld.GetNamespace(),}})} else {enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)}if evt.MetaNew != nil {q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.MetaNew.GetName(),Namespace: evt.MetaNew.GetNamespace(),}})} else {enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)} }// Delete implements EventHandler func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {if evt.Meta == nil {enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)return}q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Meta.GetName(),Namespace: evt.Meta.GetNamespace(),}}) }通過 EnqueueRequestForObject 的 Create/Update/Delete 實現(xiàn)可以看出我們放入到工作隊列中的元素不是以前默認的元素唯一的 KEY,而是經(jīng)過封裝的 reconcile.Request 對象,當然通過這個對象也可以很方便獲取對象的唯一標識 KEY。
總結(jié)起來就是 Controller.Watch 函數(shù)就是來實現(xiàn)之前自定義控制器中的 Informer 初始化以及事件監(jiān)聽函數(shù)的注冊。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZVIODAGg-1635493458316)(/home/westone/桌面/5d9da423665e13d130679f0a0f631003.png)]
Start 函數(shù)實現(xiàn)
上面我們分析了控制器的 Watch 函數(shù)的實現(xiàn),下面我們來分析另外一個重要的函數(shù) Controller.Start 函數(shù)的實現(xiàn)。
sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
// Start implements controller.Controller func (c *Controller) Start(stop <-chan struct{}) error {// use an IIFE to get proper lock handling// but lock outside to get proper handling of the queue shutdownc.mu.Lock()// 調(diào)用 MakeQueue() 函數(shù)生成工作隊列c.Queue = c.MakeQueue()defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closederr := func() error {defer c.mu.Unlock()// TODO(pwittrock): Reconsider HandleCrashdefer utilruntime.HandleCrash()// NB(directxman12): launch the sources *before* trying to wait for the// caches to sync so that they have a chance to register their intendeded// caches.// NB(directxman12): 在試圖等待緩存同步之前啟動 sources// 這樣他們有機會注冊他們的目標緩存for _, watch := range c.watches {log.Info("Starting EventSource", "controller", c.Name, "source", watch.src)if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil {return err}}// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches// 啟動 SharedIndexInformer 工廠,開始填充 SharedIndexInformer 緩存log.Info("Starting Controller", "controller", c.Name)// Wait for the caches to be synced before starting workersif c.WaitForCacheSync == nil {c.WaitForCacheSync = c.Cache.WaitForCacheSync}if ok := c.WaitForCacheSync(stop); !ok {// This code is unreachable right now since WaitForCacheSync will never return an error// Leaving it here because that could happen in the futureerr := fmt.Errorf("failed to wait for %s caches to sync", c.Name)log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)return err}if c.JitterPeriod == 0 {c.JitterPeriod = 1 * time.Second}// Launch workers to process resources// 啟動 workers 來處理資源log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)for i := 0; i < c.MaxConcurrentReconciles; i++ {// Process work itemsgo wait.Until(c.worker, c.JitterPeriod, stop)}c.Started = truereturn nil}()if err != nil {return err}<-stoplog.Info("Stopping workers", "controller", c.Name)return nil }上面的 Start 函數(shù)很簡單,和我們之前自定義控制器中啟動控制循環(huán)比較類似,都是先等待資源對象的 Informer 同步完成,然后啟動 workers 來處理資源對象,而且 worker 函數(shù)都是一樣的實現(xiàn)方式:
sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. // worker 運行一個工作線程,從隊列中彈出元素處理,并標記為完成 // 強制要求永遠不和同一個對象同時調(diào)用 reconcileHandler func (c *Controller) worker() {for c.processNextWorkItem() {} }// processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. // processNextWorkItem 將從工作隊列中彈出一個元素,并嘗試通過調(diào)用 reconcileHandler 來處理它 func (c *Controller) processNextWorkItem() bool {// 從隊列中彈出元素obj, shutdown := c.Queue.Get()if shutdown {// Stop workingreturn false}// We call Done here so the workqueue knows we have finished// processing this item. We also must remember to call Forget if we// do not want this work item being re-queued. For example, we do// not call Forget if a transient error occurs, instead the item is// put back on the workqueue and attempted again after a back-off// period.// 標記為處理完成defer c.Queue.Done(obj)// 調(diào)用 reconcileHandler 進行元素處理return c.reconcileHandler(obj) }func (c *Controller) reconcileHandler(obj interface{}) bool {// Update metrics after processing each item// 處理完每個元素后更新指標reconcileStartTS := time.Now()defer func() {c.updateMetrics(time.Since(reconcileStartTS))}()var req reconcile.Requestvar ok bool// 確保對象是一個有效的 request 對象if req, ok = obj.(reconcile.Request); !ok {// As the item in the workqueue is actually invalid, we call// Forget here else we'd go into a loop of attempting to// process a work item that is invalid.// 工作隊列中的元素無效,所以調(diào)用 Forget 函數(shù)// 否則會進入一個循環(huán)嘗試處理一個無效的元素c.Queue.Forget(obj)log.Error(nil, "Queue item was not a Request","controller", c.Name, "type", fmt.Sprintf("%T", obj), "value", obj)// Return true, don't take a breakreturn true}// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the// resource to be synced.// RunInformersAndControllers 的 syncHandler,傳遞給它要同步的資源的 namespace/Name 字符串// 調(diào)用 Reconciler 函數(shù)來處理這個元素,也就是我們真正去編寫業(yè)務(wù)邏輯的地方if result, err := c.Do.Reconcile(req); err != nil {// 如果業(yè)務(wù)邏輯處理出錯,重新添加到限速隊列中去c.Queue.AddRateLimited(req)log.Error(err, "Reconciler error", "controller", c.Name, "request", req)ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc()return false} else if result.RequeueAfter > 0 {// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfter// 如果調(diào)諧函數(shù) Reconcile 處理結(jié)果中包含大于0的 RequeueAfter// 需要注意如果 result.RequeuAfter 與一個非 nil 的錯誤一起返回,則 result.RequeuAfter 會丟失。// 忘記元素c.Queue.Forget(obj)// 加入隊列c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc()return true} else if result.Requeue {c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc()return true}// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.// 最后如果沒有發(fā)生錯誤,我們就會 Forget 這個元素// 這樣直到發(fā)送另一個變化它就不會再被排隊了c.Queue.Forget(obj)// TODO(directxman12): What does 1 mean? Do we want level constants? Do we want levels at all?log.V(1).Info("Successfully Reconciled", "controller", c.Name, "request", req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc()// Return true, don't take a breakreturn true }上面的 reconcileHandler 函數(shù)就是我們真正執(zhí)行元素業(yè)務(wù)處理的地方,函數(shù)中包含了事件處理以及錯誤處理,真正的事件處理是通過 c.Do.Reconcile(req) 暴露給開發(fā)者的,所以對于開發(fā)者來說,只需要在 Reconcile 函數(shù)中去處理業(yè)務(wù)邏輯就可以了。
根據(jù) c.Do.Reconcile(req) 函數(shù)的返回值來判斷是否需要將元素重新加入隊列進行處理:
- 如果返回 error 錯誤,則將元素重新添加到限速隊列中
- 如果返回的 result.RequeueAfter > 0,則先將元素忘記,然后在 result.RequeueAfter 時間后加入到隊列中
- 如果返回 result.Requeue,則直接將元素重新加入到限速隊列中
- 如果正常返回,則直接忘記這個元素
到這里其實基本上就實現(xiàn)了和我們自定義控制器一樣的邏輯,只是將業(yè)務(wù)處理的邏輯暴露給了開發(fā)者去自己實現(xiàn)。接下來我們就需要了解下 controller-runtime 是如何去控制器控制器的初始化以及啟動的。
總結(jié)
以上是生活随笔為你收集整理的controller-runtime 控制器实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入解析 Kubebuilder:让编写
- 下一篇: Kubernetes CRD开发汇总