k8s replicaset controller源码分析(1)- 初始化与启动分析
replicaset controller分析
replicaset controller簡介
replicaset controller是kube-controller-manager組件中眾多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,當這2種資源發生變化時會觸發 replicaset controller 對相應的replicaset對象進行調諧操作,從而完成replicaset期望副本數的調諧,當實際pod的數量未達到預期時創建pod,當實際pod的數量超過預期時刪除pod。
replicaset controller主要作用是根據replicaset對象所期望的pod數量與現存pod數量做比較,然后根據比較結果創建/刪除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等。
replicaset controller架構圖
replicaset controller的大致組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然后將對應的replicaset對象放入到queue中,然后syncReplicaSet方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。
replicaset controller分析將分為3大塊進行,分別是:
(1)replicaset controller初始化和啟動分析;
(2)replicaset controller核心處理邏輯分析;
(3)replicaset controller expectations機制分析。
本篇博客先進行replicaset controller初始化和啟動分析。
ReplicaSetController的初始化與啟動分析
基于tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
直接以startReplicaSetController函數作為garbage collector的初始化與啟動源碼分析入口。
startReplicaSetController中調用了replicaset.NewReplicaSetController來進行ReplicaSetController的初始化,初始化完成后調用Run進行啟動。
這里留意傳入Run方法的參數ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs,后面會詳細分析。
// cmd/kube-controller-manager/app/apps.go func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {return nil, false, nil}go replicaset.NewReplicaSetController(ctx.InformerFactory.Apps().V1().ReplicaSets(),ctx.InformerFactory.Core().V1().Pods(),ctx.ClientBuilder.ClientOrDie("replicaset-controller"),replicaset.BurstReplicas,).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)return nil, true, nil }初始化分析
分析入口 NewReplicaSetController
NewReplicaSetController主要是初始化ReplicaSetController,定義replicaset與pod對象的informer,并注冊EventHandler-AddFunc、UpdateFunc與DeleteFunc等,用于監聽replicaset與pod對象的變動。
// pkg/controller/replicaset/replica_set.go // NewReplicaSetController configures a replica set controller with the specified event recorder func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(klog.Infof)eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,apps.SchemeGroupVersion.WithKind("ReplicaSet"),"replicaset_controller","replicaset",controller.RealPodControl{KubeClient: kubeClient,Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),},) }// NewBaseController is the implementation of NewReplicaSetController with additional injected // parameters so that it can also serve as the implementation of NewReplicationController. func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())}rsc := &ReplicaSetController{GroupVersionKind: gvk,kubeClient: kubeClient,podControl: podControl,burstReplicas: burstReplicas,expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),}rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: rsc.addRS,UpdateFunc: rsc.updateRS,DeleteFunc: rsc.deleteRS,})rsc.rsLister = rsInformer.Lister()rsc.rsListerSynced = rsInformer.Informer().HasSyncedpodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: rsc.addPod,// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from// local storage, so it should be ok.UpdateFunc: rsc.updatePod,DeleteFunc: rsc.deletePod,})rsc.podLister = podInformer.Lister()rsc.podListerSynced = podInformer.Informer().HasSyncedrsc.syncHandler = rsc.syncReplicaSetreturn rsc }queue
queue是replicaset controller做sync操作的關鍵。當replicaset或pod對象發生改變,其對應的EventHandler會把該對象往queue中加入,而replicaset controller的Run方法中調用的rsc.worker(后面再做分析)會從queue中獲取對象并做相應的調諧操作。
queue中存放的對象格式:namespace/name
type ReplicaSetController struct {...// Controllers that need to be syncedqueue workqueue.RateLimitingInterface }queue的來源是replicaset與pod對象的EventHandler,下面來一個個分析。
1 rsc.addRS
當發現有新增的replicaset對象,會調用該方法。
主要邏輯:調用rsc.enqueueRS將該對象加入queue中。
// pkg/controller/replicaset/replica_set.go func (rsc *ReplicaSetController) addRS(obj interface{}) {rs := obj.(*apps.ReplicaSet)klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)rsc.enqueueRS(rs) }rsc.enqueueRS
組裝key,將key加入queue。
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {key, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))return}rsc.queue.Add(key) }2 rsc.updateRS
當發現replicaset對象有更改,會調用該方法。
主要邏輯:
(1)如果新舊replicaset對象的uid不一致,則調用rsc.deleteRS(rsc.deleteRS在后面分析);
(2)調用rsc.enqueueRS,組裝key,將key加入queue。
3 rsc.deleteRS
當發現replicaset對象被刪除,會調用該方法。
主要邏輯:
(1)調用rsc.expectations.DeleteExpectations方法刪除該rs的expectations(關于expectations機制,會在后面單獨進行分析,這里有個印象就行);
(2)組裝key,放入queue中。
4 rsc.addPod
當發現有新增的pod對象,會調用該方法。
主要邏輯:
(1)如果pod的DeletionTimestamp屬性不為空,則調用rsc.deletePod(后面再做分析),然后返回;
(2)調用metav1.GetControllerOf獲取該pod對象的OwnerReference,并判斷該pod是否有上層controller,有則再調用rsc.resolveControllerRef查詢該pod所屬的replicaset是否存在,不存在則直接返回;
(3)調用rsc.expectations.CreationObserved方法,將該rs的expectations期望創建pod數量減1(關于expectations機制,會在后面單獨進行分析,這里有個印象就行);
(4)組裝key,放入queue中。
5 rsc.updatePod
當發現有pod對象發生更改,會調用該方法。
主要邏輯:
(1)判斷新舊pod的ResourceVersion,如一致,代表無變化,直接返回;
(2)如果pod的DeletionTimestamp不為空,則調用rsc.deletePod(后面再做分析),然后返回;
(3)…
6 rsc.deletePod
當發現有pod對象被刪除,會調用該方法。
主要邏輯:
(1)調用metav1.GetControllerOf獲取該pod對象的OwnerReference,并判斷是否是controller,是則再調用rsc.resolveControllerRef查詢該pod所屬的replicaset是否存在,不存在則直接返回;
(2)調用rsc.expectations.DeletionObserved方法,將該rs的expectations期望刪除pod數量減1(關于expectations機制,會在后面單獨進行分析,這里有個印象就行);
(3)組裝key,放入queue中。
啟動分析
分析入口 Run
根據workers的值啟動相應數量的goroutine,循環調用rsc.worker,從queue中取出一個key做replicaset資源對象的調諧處理。
// pkg/controller/replicaset/replica_set.go// Run begins watching and syncing. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer rsc.queue.ShutDown()controllerName := strings.ToLower(rsc.Kind)glog.Infof("Starting %v controller", controllerName)defer glog.Infof("Shutting down %v controller", controllerName)if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {return}for i := 0; i < workers; i++ {go wait.Until(rsc.worker, time.Second, stopCh)}<-stopCh }此處的workers參數由startReplicaSetController方法中傳入,值為ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs,它的值實際由kube-controller-manager組件的concurrent-replicaset-syncs啟動參數決定,當不配置時,默認值設置為5,代表會起5個goroutine來并行處理和調諧隊列中的replicaset對象。
下面來看一下kube-controller-manager組件中replicaset controller相關的concurrent-replicaset-syncs啟動參數。
ReplicaSetControllerOptions
// cmd/kube-controller-manager/app/options/replicasetcontroller.go // ReplicaSetControllerOptions holds the ReplicaSetController options. type ReplicaSetControllerOptions struct {*replicasetconfig.ReplicaSetControllerConfiguration }// AddFlags adds flags related to ReplicaSetController for controller manager to the specified FlagSet. func (o *ReplicaSetControllerOptions) AddFlags(fs *pflag.FlagSet) {if o == nil {return}fs.Int32Var(&o.ConcurrentRSSyncs, "concurrent-replicaset-syncs", o.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load") }// ApplyTo fills up ReplicaSetController config with options. func (o *ReplicaSetControllerOptions) ApplyTo(cfg *replicasetconfig.ReplicaSetControllerConfiguration) error {if o == nil {return nil}cfg.ConcurrentRSSyncs = o.ConcurrentRSSyncsreturn nil }默認值設置
concurrent-replicaset-syncs參數默認值配置為5。
// pkg/controller/apis/config/v1alpha1/register.go func init() {// We only register manually written functions here. The registration of the// generated functions takes place in the generated files. The separation// makes the code compile even when the generated files are missing.localSchemeBuilder.Register(addDefaultingFuncs) } // pkg/controller/apis/config/v1alpha1/defaults.go func addDefaultingFuncs(scheme *kruntime.Scheme) error {return RegisterDefaults(scheme) } // pkg/controller/apis/config/v1alpha1/zz_generated.defaults.go func RegisterDefaults(scheme *runtime.Scheme) error {scheme.AddTypeDefaultingFunc(&v1alpha1.KubeControllerManagerConfiguration{}, func(obj interface{}) {SetObjectDefaults_KubeControllerManagerConfiguration(obj.(*v1alpha1.KubeControllerManagerConfiguration))})return nil }func SetObjectDefaults_KubeControllerManagerConfiguration(in *v1alpha1.KubeControllerManagerConfiguration) {SetDefaults_KubeControllerManagerConfiguration(in)SetDefaults_KubeCloudSharedConfiguration(&in.KubeCloudShared) } // pkg/controller/apis/config/v1alpha1/defaults.go func SetDefaults_KubeControllerManagerConfiguration(obj *kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration) {...// Use the default RecommendedDefaultReplicaSetControllerConfiguration optionsreplicasetconfigv1alpha1.RecommendedDefaultReplicaSetControllerConfiguration(&obj.ReplicaSetController)... } // pkg/controller/replicaset/config/v1alpha1/defaults.go func RecommendedDefaultReplicaSetControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.ReplicaSetControllerConfiguration) {if obj.ConcurrentRSSyncs == 0 {obj.ConcurrentRSSyncs = 5} }分析完replicaset controller啟動參數后,來看一下啟動后調用的核心處理方法。
1 rsc.worker
前面提到,在replicaset controller的Run方法中,會根據workers的值啟動相應數量的goroutine,循環調用rsc.worker,從queue中取出一個key做replicaset資源對象的調諧處理。
rsc.worker主要邏輯:
(1)從queue中獲取一個key;
(2)調用rsc.syncHandler對該key做進一步處理;
(3)從queue中去除該key。
1.1 rsc.syncHandler
調用rsc.syncHandler實際為調用rsc.syncReplicaSet方法,rsc.syncHandler在NewBaseController中被賦值為rsc.syncReplicaSet,后續分析核心處理邏輯時再具體分析rsc.syncHandler,此處不做深入分析。
// NewBaseController is the implementation of NewReplicaSetController with additional injected // parameters so that it can also serve as the implementation of NewReplicationController. func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {...rsc.syncHandler = rsc.syncReplicaSetreturn rsc }總結
replicaset controller是kube-controller-manager組件中眾多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,當這2種資源發生變化時會觸發 replicaset controller 對相應的replicaset對象進行調諧操作,從而完成replicaset期望副本數的調諧,當實際pod的數量未達到預期時創建pod,當實際pod的數量超過預期時刪除pod。
本篇博客對replicaset controller的初始化和啟動做了分析,其中對replicaset controller注冊的pod和replicaet對象的event handler做了代碼分析,以及replicaset controller如何啟動,注冊了什么方法作為核心處理邏輯方法做了分析與介紹。
replicaset controller架構圖
replicaset controller的大致組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然后將對應的replicaset對象放入到queue中,然后syncReplicaSet方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。
接下來的兩篇博客,會依次給大家做replicaset controller的核心處理邏輯以及expectations機制的分析,敬請期待。
總結
以上是生活随笔為你收集整理的k8s replicaset controller源码分析(1)- 初始化与启动分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue 所见即所得_Vue html5编
- 下一篇: 我的弟弟