Istio Pilot 源码分析(一)
張海東,??多點生活(成都)云原生開發工程師。
Istio?作為目前 Servic Mesh 方案中的翹楚,吸引著越來越多的企業及開發者。越來越多的團隊想將其應用于微服務的治理,但在實際落地時卻因為不了解?Istio?黑盒中的運行機制而左右為難,本文將基于 1.7 的源碼講解?Istio?的核心組件?Pilot?的結構及運行流程,希望對讀者應用?Istio?有所助益。
注:本文基于?istio release-1.7?分支分析,其他版本的代碼結構會有所不同。
背景
隨著?Istio?1.7 的發布,內部組件精簡后的?istiod?日趨穩定,越來越多的公司將其應用到自身微服務的流量治理、安全通信及監測中。多點也不例外,應用?Istio?來落地業務系統所有?Dubbo?服務的網格化,下沉?SDK?邏輯,解決基礎中間件與業務系統過于耦合等痛點。目前,我們是通過自己開發的?Controller?組件對接?Zookeeper?等注冊中心,將注冊到?Zookeeper?的節點實時轉化為?ServiceEntry?及?WorkloadEntry?等?Istio?配置類型寫入?kube-apiserver,再由?Pilot?轉化為?xDS?協議下發至數據面,同時對集群、虛擬機中的服務進行治理。隨著公司服務網格化的逐步落地,對?Istio?及數據面組件源碼級掌握的訴求越來越高,沒有足夠的深度及廣度很難解決開發過程中遇到的難題,讓我們一起揭開?Istio?神秘的面紗,看看黑箱內部是如何運作的。
本文作為?Istio?控制面組件?Pilot?的源碼分析系列,主要面向剛接觸?Istio?或僅停留在使用?Istio?基本配置類型(如?VirtualService、DestinationRule?等)的同學,需要熟悉?Istio?的一些?基礎概念及名詞[1]?。文章會涉及較多的代碼細節,我們會以不同的篇幅分別介紹以下內容:
1.pilot-discovery?宏觀架構及啟動流程梳理2.pilot-discovery?接口設計及關鍵接口分析3.pilot-discovery xDS?生成及下發流程梳理4.pilot-agent?流程梳理5.pilot?中的身份認證及安全通信解析
相信通過源碼一步一步分析,能消除讀者對?Pilot?的陌生感,在基于?Pilot?做適配開發時會更加清楚的了解其底層運行邏輯,碰到問題時也能更好的定位。
Pilot?的代碼主要分為兩部分:
?pilot-discovery?pilot-agent
其中?pilot-agent?負責數據面?Sidecar?實例的生命周期管理,而?pilot-discovery?負責控制面流量管理配置及路由規則的生成和下發。
宏觀架構
pilot-discovery?的核心組件如圖:
pilot-discovery-struct其中?Server?為?pilot-discovery?的主服務,包含了三個比較重要的組件:
?Config Controller:從不同來源接收流量控制和路由規則等?Istio?的配置,并響應各類事件。?Service Controller:從不同注冊中心同步服務及實例,并響應各類事件。?EnvoyXdsServer:核心的?xDS?協議推送服務,根據上面組件的數據生成?xDS?協議并下發。
Config Controller?比較核心的就是對接?Kubernetes,從?kube-apiserver?中?Watch?集群中的?VirtualService、ServiceEntry、DestinationRules?等配置信息,有變化則生成?PushRequest?推送至?EnvoyXdsServer?中的推送隊列。除此之外,還支持對接?MCP(Mesh Configuration Protocol)?協議的?gRPC Server,如?Nacos?的?MCP?服務等,只需要在?meshconfig?中配置?configSources?即可。最后一種是基于內存的?Config Controller?實現,通過?Watch?一個文件目錄,加載目錄中的?yaml?文件生成配置數據,主要用來測試。
Service Controller?目前原生支持?Kubernetes?和?Consul,注冊在這些注冊中心中的服務可以無痛接入?Mesh,另外一種比較特殊,就是?ServiceEntryStore,它本質是儲存在?Config Controller?中的?Istio?配置數據,但它描述的卻是集群外部的服務信息,詳情可閱讀文檔?ServiceEntry[2],Istio?通過它將集群外部,如部署在虛擬機中的服務、非?Kubernetes?的原生服務同步到?Istio?中,納入網格統一進行流量控制和路由,所以?ServiceEntryStore?也可以視為一種注冊中心。還有一種就是?Mock Service Registry,主要用來測試。
ServiceEntryStore?從?Config Controller?到?Service Controller?的轉化流程大致如圖(后續會做詳細的代碼分析,這里簡單了解一下即可):
pilot-discovery-serviceentrystoreConfigStores?是一個列表,里面存儲了各類?Istio?配置文件,包括?ServiceEntry?、WorkloadEntry?等服務數據,也包括?VirtualService、DestinationRules、Sidecar?等流量控制、路由規則的配置數據,pilot-discovery?將這些?ConfigStores?聚合成一個?configController?統一進行管理,之后再從其中衍生出?IstioConfigStore,將其作為?serviceEntryStore?的配置源。serviceEntryStore?其實就是?ServiceEntry Controller,響應?ServiceEntry?和?WorkloadEntry?這類服務信息的變化。
EnvoyXdsServer?比較核心,一切與?xDS?協議相關的接收、轉換、下發操作都由它完成。EnvoyXdsServer?對接所有集群中的邊車代理,如?Envoy、MOSN?等,當配置或服務發生變化時主動推送,也會響應代理發送的請求,依據請求的信息下發相應的?xDS?配置。
理解了這三個核心組件的定義,就能比較好的理解下面分析的各類流程了。
pilot-discovery?的整個業務流程梳理如下,可以先大概瀏覽一遍,之后我們逐一進行分析:
pilot-discovery-sequence-all啟動流程梳理
首先詳細看一下?pilot-discovery?的啟動流程。pilot-discovery?組件的入口代碼在?istio/pilot/cmd/pilot-discovery?中。該目錄中包含兩個文件:?main.go?和?request.go。main.go?中定義了?pilot-discovery?根命令及?discovery?命令,是啟動服務發現及配置下發的主流程; 另一個文件?request.go?中定義了?request?命令,用來請求?Pilot?中的?metrics/debug?接口,多用來調試。
main.go?中?discoveryCmd的?RunE?函數定義了啟動過程,代碼如下:
// 創建一個接收空結構的 stop channel 用來停止所有 servers stop := make(chan struct{}) // 創建服務發現的 Server discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil {return fmt.Errorf("failed to create discovery service: %v", err) } // 運行 Server 中注冊的所有服務 if err := discoveryServer.Start(stop); err != nil {return fmt.Errorf("failed to start discovery service: %v", err) } // 等待 SIGINT 和 SIGTERM 信號并關閉 stop channel cmd.WaitSignal(stop)啟動流程如圖所示:
pilot-discovery-init初始化流程
接下來介紹?discoveryServer?,即?pilot-discovery?組件的核心。在這之前先看下?Server?的結構,代碼位于?istio/pilot/pkg/bootstrap/server.go?文件中。
Server?的關鍵字段如下:
type Server struct {XDSServer *xds.DiscoveryServer // Xds 服務environment *model.Environment // Pilot 環境所需的 API 集合kubeRegistry *kubecontroller.Controller // 處理 Kubernetes 主集群的注冊中心multicluster *kubecontroller.Multicluster // 處理 Kubernetes 多個集群的注冊中心configController model.ConfigStoreCache // 統一處理配置數據(如 VirtualService 等) 的 ControllerConfigStores []model.ConfigStoreCache // 不同配置信息的緩存器,提供 Get、List、Create 等方法serviceEntryStore *serviceentry.ServiceEntryStore // 單獨處理 ServiceEntry 的 ControllerfileWatcher filewatcher.FileWatcher // 文件監聽器,主要 watch meshconfig 和 networks 配置文件等startFuncs []startFunc // 保存了上述所有服務的啟動函數,便于在 Start() 方法中批量啟動及管理 }再看?NewServer()?方法中的內容,有以下幾個關鍵步驟:
image.png我們對每個步驟逐一進行分析:
初始化?Environment
什么是?Environment?呢?根據定義?Environment?為?Pilot?提供了一個匯總的、運行中所需的 API 集合。?Environment?中字段(接口)如下:
type Environment struct {ServiceDiscovery // 服務發現的接口模型,主要列出 services 和 instancesIstioConfigStore // Istio 配置文件的存儲器,主要列出 ServiceEntry 等配置mesh.Watcher // mesh config 文件的監聽器mesh.NetworksWatcher // mesh network config 文件的監聽器PushContext *PushContext // 在推送(下發 xDS)生成期間保存信息的上下文DomainSuffix string // istio server 默認的后綴域名 }其中?PushContext?是?Pilot?在推送?xDS?前,生成配置期間保存相關信息的上下文的地方,在全量推送配置和配置發生改變時重置。它會保存所有的錯誤和統計信息,并緩存一些配置的計算信息。?ServiceDiscovery?提供了枚舉?Istio?中服務和實例的方法。?mesh.Watcher?和?mesh.NetworksWatcher?負責監聽?istiod?啟動時掛載的兩個配置文件,這兩個配置文件是通過?configmap?映射到?Pod?的文件系統中的,監聽器將在監聽到配置文件變化時運行預先注冊的?Handler?。文件掛載參考?istiod?的配置文件:
apiVersion: v1 kind: Pod metadata:name: istiod-56c488887d-z9k5cnamespace: istio-system spec:containers:volumeMounts:- mountPath: /etc/istio/configname: config-volumevolumes:- configMap:defaultMode: 420name: istioname: config-volume相應的配置存儲在?istio-system/istio?這個?configmap?中,里面保存了?mesh?和?meshNetworks?兩種配置,樣例如下:
apiVersion: v1 kind: ConfigMap metadata:name: istionamespace: istio-system data:mesh: |-accessLogEncoding: TEXTaccessLogFile: ""accessLogFormat: ""defaultConfig:binaryPath: /usr/local/bin/mosnconcurrency: 2configPath: ./etc/istio/proxy...meshNetworks: 'networks: {}'再回頭看?Environment?的初始化:
e := &model.Environment{PushContext: model.NewPushContext(),DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix, } ac := aggregate.NewController(aggregate.Options{MeshHolder: e, }) e.ServiceDiscovery = ac首先是初始化了一份?PushContext?,創建?PushContext?所需的各種列表和?Map?。其次是初始化了一個聚合所有注冊中心的?Controller?作為?Environment?中的?ServiceDiscovery?。該?Controller?提供從所有注冊中心(如?Kubernetes, Consul, MCP?等)獲取服務和實例列表的方法。這里傳入了一個參數?MeshHolder?是想利用?Environment?中的?mesh.Watcher?將?mesh?這個配置同步過去。
初始化?Server
Server?的結構之前分析過,這里將之前初始化的?Environment?傳入后,開始初始化?XDSServer?。
s := &Server{clusterID: getClusterID(args),environment: e,XDSServer: xds.NewDiscoveryServer(e, args.Plugins), // 初始化 XDSServerfileWatcher: filewatcher.NewWatcher(),httpMux: http.NewServeMux(),monitoringMux: http.NewServeMux(),readinessProbes: make(map[string]readinessProbe), }XDSServer?相關的代碼在?istio/pilot/pkg/xds/discovery.go?中,對應為?DiscoveryServer?,該服務為?Envoy xDS APIs的?gRPC?實現。?DiscoveryServer?關鍵定義如下:
type DiscoveryServer struct {Env *model.Environment // 即上述 pilot server 中的 EnvironmentConfigGenerator core.ConfigGenerator // 控制面 Istio 配置的生成器,如 VirtualService 等Generators map[string]model.XdsResourceGenerator // 針對不同配置類型的定制化生成器concurrentPushLimit chan struct{}// 不同服務所有實例的集合,增量更新,key 為 service 和 namespace// EndpointShards 中是以不同的注冊中心名為 key 分組保存實例EndpointShardsByService map[string]map[string]*EndpointShards pushChannel chan *model.PushRequest // 接收 push 請求的 channelpushQueue *PushQueue // 防抖之后,真正 Push xDS 之前所用的緩沖隊列adsClients map[string]*Connection // ADS 和 EDS 的 gRPC 連接StatusReporter DistributionStatusCache // 監聽 xDS ACK 和連接斷開// xDS 狀態更新的生成器(更新 connect, disconnect, nacks, acks)// 狀態更新后向所有 connection 推送 DiscoveryResponseInternalGen *InternalGen serverReady bool // 表示緩存已同步,server 可以接受請求debounceOptions debounceOptions // 防抖設置cache Cache // xDS 資源的緩存,目前僅適用于 EDS,線程安全 }初始化?MeshConfig?、?KubeClient?、?MeshNetworks?和?MeshHandlers
s.initMeshConfiguration(args, s.fileWatcher) if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err) } s.initMeshNetworks(args, s.fileWatcher) s.initMeshHandlers()這幾個初始化函數比較好理解,?initMeshConfiguration?和?initMeshNetworks?都是通過?fileWatcher?對?istiod?從?configmap?中掛載的兩個配置文件?mesh?和?meshNetworks?進行監聽。當配置文件發生變化時重載配置并觸發相應的?Handlers?。
filewatcher?的代碼在另一個管理通用工具包的項目里:?github.com/istio/pkg/filewatcher?,感興趣的同學可以再詳細研究下,底層使用到了?fsnotify[3]?這個庫來推送文件變化事件。
initMeshHandlers?為上述兩個配置文件注冊了兩個?Handler?,當配置文件發生變化時觸發全量?xDS?下發。
初始化?Controllers
這部分比較核心,初始化了三種控制器分別處理證書、配置信息和注冊信息,證書及安全相關的內容本篇先暫不討論。主要來看?initConfigController?和?initServiceControllers?。
func (s *Server) initControllers(args *PilotArgs) error {log.Info("initializing controllers")if err := s.initCertController(args); err != nil {return fmt.Errorf("error initializing certificate controller: %v", err)}if err := s.initConfigController(args); err != nil {return fmt.Errorf("error initializing config controller: %v", err)}if err := s.initServiceControllers(args); err != nil {return fmt.Errorf("error initializing service controllers: %v", err)}return nil }配置信息大都是?Istio?定義的一系列?CRD(如?VirtualService?、?DestinationRules?等),一個控制面可以通過?MCP?同時接入多個?Kubernetes?之外的配置數據源,也可通過文件目錄(主要用來調試)掛載,默認是讀取 Kubernetes 中的配置數據:
func (s *Server) initK8SConfigStore(args *PilotArgs) error {configController, err := s.makeKubeConfigController(args)...s.initStatusController(args, features.EnableStatus) // 初始化上面提到的 StatusReporterreturn nil }配置數據包括以下類型,具體每個類型的含義?Istio?官網都有介紹及用例,這里不再贅述:
// PilotServiceApi contains only collections used by Pilot, including experimental Service Api. PilotServiceApi = collection.NewSchemasBuilder().MustAdd(IstioNetworkingV1Alpha3Destinationrules).MustAdd(IstioNetworkingV1Alpha3Envoyfilters).MustAdd(IstioNetworkingV1Alpha3Gateways).MustAdd(IstioNetworkingV1Alpha3Serviceentries).MustAdd(IstioNetworkingV1Alpha3Sidecars).MustAdd(IstioNetworkingV1Alpha3Virtualservices).MustAdd(IstioNetworkingV1Alpha3Workloadentries).MustAdd(IstioNetworkingV1Alpha3Workloadgroups).MustAdd(IstioSecurityV1Beta1Authorizationpolicies).MustAdd(IstioSecurityV1Beta1Peerauthentications).MustAdd(IstioSecurityV1Beta1Requestauthentications).MustAdd(K8SServiceApisV1Alpha1Gatewayclasses).MustAdd(K8SServiceApisV1Alpha1Gateways).MustAdd(K8SServiceApisV1Alpha1Httproutes).MustAdd(K8SServiceApisV1Alpha1Tcproutes).Build()詳細看下?initK8SConfigStore?中的?makeKubeConfigController?方法,這里初始化了一個處理?Istio CRDs?的?Client?,實現?ConfigStoreCache?這個接口中增刪改查等方法。
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {c, err := crdclient.New(s.kubeClient, buildLedger(args.RegistryOptions), args.Revision, args.RegistryOptions.KubeOptions)if err != nil {return nil, err}return c, nil }Client?定義如下:
type Client struct {schemas collection.Schemas // Istio CRDs shemasdomainSuffix stringconfigLedger ledger.Ledgerrevision stringkinds map[resource.GroupVersionKind]*cacheHandler // 跟蹤已知類型的所有緩存 handlerqueue queue.InstanceistioClient istioclient.InterfaceserviceApisClient serviceapisclient.Interface }再依次對這些類型創建?Informer?開啟監聽。回到?initConfigController?,創建好?ConfigStore?之后,再對其進一步包裝:
// 將所有 ConfigStore 聚合并緩存 aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores) // 通過 s.configController 統一操作上面聚合的 ConfigStores s.configController = aggregateConfigController // 將其包裝為 IstioConfigStore 傳入 environment,便于操作 ServiceEntry/Gateway 等資源 // IstioConfigStore 會在之后的 ServiceEntryStore 中用到 s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)最后將該?Controller?的啟動函數注冊到?startFuncs?中:
s.addStartFunc(func(stop <-chan struct{}) error {go s.configController.Run(stop)return nil })再來看?initServiceControllers?處理服務發現的?Controller?初始化:
func (s *Server) initServiceControllers(args *PilotArgs) error {serviceControllers := s.ServiceController()for _, r := range args.RegistryOptions.Registries {// ...switch serviceRegistry {case serviceregistry.Kubernetes:if err := s.initKubeRegistry(serviceControllers, args); err != nil {return err}// ...}// ... }從之前初始化的?environment.ServiceDiscovery?中獲取已注冊的服務中心,如果是?Kubernetes?則執行?initKubeRegistry:
// initKubeRegistry creates all the k8s service controllers under this pilot func (s *Server) initKubeRegistry(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {// ...log.Infof("Initializing Kubernetes service registry %q", args.RegistryOptions.KubeOptions.ClusterID)kubeRegistry := kubecontroller.NewController(s.kubeClient, args.RegistryOptions.KubeOptions)s.kubeRegistry = kubeRegistryserviceControllers.AddRegistry(kubeRegistry)return }進一步初始化?Kubernetes?注冊中心,方法為?NewController?,先看一下這個?Controller?的結構:
type Controller struct {client kubernetes.Interfacequeue queue.InstanceserviceInformer cache.SharedIndexInformerserviceLister listerv1.ServiceListerendpoints kubeEndpointsControllernodeInformer cache.SharedIndexInformernodeLister listerv1.NodeListerpods *PodCachemetrics model.MetricsnetworksWatcher mesh.NetworksWatcherxdsUpdater model.XDSUpdaterdomainSuffix stringclusterID stringserviceHandlers []func(*model.Service, model.Event)instanceHandlers []func(*model.ServiceInstance, model.Event)workloadHandlers []func(*model.WorkloadInstance, model.Event)sync.RWMutexservicesMap map[host.Name]*model.ServicenodeSelectorsForServices map[host.Name]labels.InstancenodeInfoMap map[string]kubernetesNodeexternalNameSvcInstanceMap map[host.Name][]*model.ServiceInstanceworkloadInstancesByIP map[string]*model.WorkloadInstanceranger cidranger.RangernetworkForRegistry stringonce sync.Once }可以看到?Controller?對?Services?、?Nodes?、?Pods?等資源各自初始化了?Informer?、 Lister 以及對應的 Map,各類 Handlers 在 Informer 監聽到增刪改查時推送相應的事件到 queue ,再由?onServiceEvent?、?onNodeEvent?、?c.pods.onEvent?中更新對應的 Map 。
回到?initServiceControllers?,初始化完 Kubernetes 注冊中心之后,還需要關注 Kubernetes 集群之外的服務,這些服務基本都是通過?ServiceEntry?注冊到控制面的,所有?ServiceEntry?配置數據目前還都在之前初始化的?configController?配置中心控制器中,這里將?ServiceEntry?數據單獨拎出來初始化一個?ServicEntry?注冊中心,加入到?serviceControllers?中:
s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer) serviceControllers.AddRegistry(s.serviceEntryStore)serviceEntryStore?相關的邏輯會在后續 xDS 下發流程的分析中再闡述。
最后將?serviceControllers?中所有的服務注冊中心的?Controller?的啟動函數都注冊到?startFuncs?中:
s.addStartFunc(func(stop <-chan struct{}) error {go serviceControllers.Run(stop)return nil }) // Run starts all the controllers func (c *Controller) Run(stop <-chan struct{}) {for _, r := range c.GetRegistries() {go r.Run(stop)}<-stoplog.Info("Registry Aggregator terminated") }初始化?RegistryEventHandlers
initRegistryEventHandlers?設置了三個事件處理器?serviceHandler?、?instanceHandler?和?configHandler?分別響應服務、實例和配置數據的更新事件。
serviceHandler?如下:
serviceHandler := func(svc *model.Service, _ model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},}s.XDSServer.ConfigUpdate(pushReq) } if err := s.ServiceController().AppendServiceHandler(serviceHandler); err != nil {return fmt.Errorf("append service handler failed: %v", err) }可以看到當服務本身發生變化時,會觸發?xDS?的全量下發,所有與該服務相關的代理都會收到推送。
實例的變動也會觸發?xDS?的全量下發,不過僅在連接?Consul?時生效。Kubernetes?和?MCP?這兩種服務發現的場景下,更新事件的?Handler?是在別的地方注冊的。
instanceHandler := func(si *model.ServiceInstance, _ model.Event) {// TODO: This is an incomplete code. This code path is called for consul, etc.// In all cases, this is simply an instance update and not a config update. So, we need to update// EDS in all proxies, and do a full config push for the instance that just changed (add/update only).s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: string(si.Service.Hostname),Namespace: si.Service.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},}) } // 跳過 Kubernetes 和 MCP for _, registry := range s.ServiceController().GetRegistries() {// Skip kubernetes and external registries as they are handled separatelyif registry.Provider() == serviceregistry.Kubernetes ||registry.Provider() == serviceregistry.External {continue}if err := registry.AppendInstanceHandler(instanceHandler); err != nil {return fmt.Errorf("append instance handler to registry %s failed: %v", registry.Provider(), err)} }上一步初始化了?configController?,它操作的對象主要是像?VirtualService?、?DestinationRules?這些?Istio?定義的配置,這些配置的變化也會觸發?xDS?的全量下發,所有與該配置相關的代理都會收到推送。不過?ServiceEntry?和?WorkloadEntry?除外,這兩個資源的配置下發是由?ServiceEntryStore?管理的,之前在初始化?ServiceController?時定義的?s.serviceEntryStore?會處理,之后的篇幅再做詳細介紹。
configHandler := func(_, curr model.Config, event model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: curr.GroupVersionKind,Name: curr.Name,Namespace: curr.Namespace,}: {}},Reason: []model.TriggerReason{model.ConfigUpdate},}s.EnvoyXdsServer.ConfigUpdate(pushReq) }下面是跳過?ServiceEntry?和?WorkloadEntry?的代碼:
for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind() {continue}if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.Resource().GroupVersionKind() {continue}s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler) }初始化?DiscoveryService
func (s *Server) initDiscoveryService(args *PilotArgs) error {log.Infof("starting discovery service")// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop <-chan struct{}) error {s.EnvoyXdsServer.Start(stop)return nil})s.initGrpcServer(args.KeepaliveOptions)grpcListener, err := net.Listen("tcp", args.ServerOptions.GRPCAddr)if err != nil {return err}s.GRPCListener = grpcListenerreturn nil }這里將?EnvoyXdsServer?的啟動添加至?startFuncs?中,便于后續統一啟動。并初始化?gRPC?服務器,監聽對應的端口。
初始化?gRPC?服務器,并注冊?xDS V2?和?xDS V3?的?ADS?服務到?gRPC?服務器上:
func (s *Server) initGrpcServer(options *istiokeepalive.Options) {grpcOptions := s.grpcServerOptions(options)s.grpcServer = grpc.NewServer(grpcOptions...)s.EnvoyXdsServer.Register(s.grpcServer)reflection.Register(s.grpcServer) } func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v2 and v3 serversdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)discoveryv2.RegisterAggregatedDiscoveryServiceServer(rpcs, s.createV2Adapter()) }可以看到?ADS?的?gRPC?服務包含兩個流式方法,一個是全量推送,一個是增量推送。
var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{ServiceName: "envoy.service.discovery.v3.AggregatedDiscoveryService",HandlerType: (*AggregatedDiscoveryServiceServer)(nil),Methods: []grpc.MethodDesc{},Streams: []grpc.StreamDesc{{StreamName: "StreamAggregatedResources",Handler: _AggregatedDiscoveryService_StreamAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},{StreamName: "DeltaAggregatedResources",Handler: _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},},Metadata: "envoy/service/discovery/v3/ads.proto", }注冊?kubeClient.RunAndWait
將?kubeClient.RunAndWait?方法注冊至?startFuncs?中,?RunAndWait?啟動后所有?Informer?將開始緩存,并等待它們同步完成。之所以在最后運行,可以保證所有的?Informer?都已經注冊。
if s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil}) }啟動過程
啟動流程比較簡單,核心是依次啟動初始化過程中注冊到?startFuncs?中的啟動函數:
for _, fn := range s.startFuncs {if err := fn(stop); err != nil {return err} }然后調用?waitForCache?等待需要監聽資源的?Informer?緩存完畢,完成后開啟?HTTP?服務響應?readiness?事件。
至此?pilot-discovery?的啟動流程就結束了,有了大概了解后,可以大致歸納出整個?Pilot?的接口架構。
接口設計
在接口設計方面,Pilot?主要有兩類接口:一種是?Store?類接口,定義對資源的增刪改查等方法;另一種是?Controller?類接口,定義了?RegisterEventHandler?和?Run?方法。
Store?類接口主要指?ConfigStore?接口,以及它衍生出的?IstioConfigStore,后者操作的對象為?Istio?定義的配置類型,如?VirtualService、ServiceEntry?等。
而?Controller?類接口指基于?ConfigStore?定義的?ConfigStoreCache?接口,這個接口在哪里用到了呢?之前討論初始化流程的時候,分析過?Pilot?的?Server?的結構,其中用到該接口的有如下幾個字段:
type Server struct {configController model.ConfigStoreCacheConfigStores []model.ConfigStoreCacheserviceEntryStore *serviceentry.ServiceEntryStore } type ServiceEntryStore struct {store model.IstioConfigStore }可以看到?ConfigStores?是存儲所有配置類數據的?Controller?的地方,ConfigStores?都是在哪里添加的呢?之前分析?initConfigController?方法中提到過,可以再對照代碼看一下調用的地方:
image.png都添加完畢后,會把這些?ConfigStoreCache?都聚合到?Server.configController?中統一處理。
// Wrap the config controller with a cache.aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)if err != nil {return err}s.configController = aggregateConfigController而?ServiceEntryStore?中用到的?IstioConfigStore?也是在這里得到的:
s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)以上,當服務啟動后,會逐個調用這些?ConfigStoreCache?中的?Run?方法處理資源的增刪改事件。
總結
pilot-discovery?的啟動流程初看是比較復雜,但理清楚中間核心的步驟后結構也比較清晰。有了本篇的介紹,之后再走讀幾遍代碼,相信就能很好的掌握?pilot-discovery?初始化的流程。
Pilot?源碼分析的第一部分就到這里,后續會針對重要的組件和接口做更細致的分析,如?EnvoyXdsServer?、ServiceEntryStore?等,以及梳理?xDS?協議的生成和下發流程,會比?pilot-discovery?的啟動流程復雜的多,敬請期待。
參考
?Istio Pilot 代碼深度解析 - 趙化冰[4]
引用鏈接
[1]?基礎概念及名詞:?https://istio.io/latest/zh/docs/concepts/traffic-management/
[2]?ServiceEntry:?https://istio.io/latest/docs/reference/config/networking/service-entry/
[3]?fsnotify:?https://github.com/fsnotify/fsnotify
[4]?Istio Pilot 代碼深度解析 - 趙化冰:?https://zhaohuabing.com/post/2019-10-21-pilot-discovery-code-analysis/
總結
以上是生活随笔為你收集整理的Istio Pilot 源码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ASP.NET Core 3.x控制IH
- 下一篇: 解决 WPF 绑定集合后数据变动界面却不