Volcano 原理、源码分析(二)
- 0. 總結前置
- 1. 概述
-
2. 尋找調度器中的 PodGroup
- 2.1 從 PodGroup 到 JobInfo 的封裝
- 2.2 從 Pod 到 TaskInfo 的封裝
-
3. 控制器中 PodGroup 和 Pod 的創建邏輯
- 3.1 從 main 開始尋找 SyncJob 的蹤跡
-
3.2 SyncJob 過程如何創建 PodGroup 和 Pod
- 3.2.1 創建 PodGroup
- 3.2.2 創建 Pods
- 4. 總結
- 5. 最后
0. 總結前置
你也可以選擇直接跳到1. 概述開始閱讀。
今天我們先順著 Volcano Scheduler 部分的代碼找到了
PodGroup的處理邏輯,看到了 Scheduler 拿到 PodGroup 后會組裝 JobInfo 對象;拿到 Pod 后會組裝 TaskInfo 對象(這里根據 Pod 的注解中指定的 PodGroup 名字來將 TaskInfo 和 JobInfo 關聯,也就是 Pod 和 PodGroup 的關聯。接著我們又從 Volcano Controller(Job Controller)中找到了
PodGroup和Pod的創建邏輯。在 Job 對象創建后,控制器會根據 Job 的信息創建一個唯一對應的 PodGroup,然后根據 Job 中的 Tasks 信息創建一系列的 pods,這些 pods 會帶上 PodGroup 名字(在注解里)至此,我們知道了 Volcano 中調度器和控制器的職責分層,進一步也就能夠理解 Volcano 如何和 kubeflow 等其他框架結合完成復雜任務的批調度過程了。(上層框架創建 PodGroup 和 Pods,Volcano 根據 PodGroup 信息和 Pods 注解信息完成批調度過程。
1. 概述
話接上回,在《Volcano 原理、源碼分析(一)》中我們聊到了 Volcano Scheduler 部分的主要工作邏輯,發現 Volcano Scheduler 是圍繞了 Job 和 Job 里面的 Tasks 在調度。但是理論上 Volcano Scheduler 應該以 PodGroup 為基礎單元執行調度邏輯,這里的 gap 出現在哪里呢?
后來在文末我提到了 Scheduler 部分的 Job 就是 PodGroup wrapper,Task 就是 Pod wrapper,這樣邏輯上才說得通。今天我準備接著這條路,分析 PodGroup 的“調諧”邏輯。
2. 尋找調度器中的 PodGroup
根據經驗(其實我也沒有啥 Volcano 的經驗,不過早幾年看過不少 K8s 里的控制器和調度器相關代碼,Volcano 既然在 K8s 體系內抽象控制器和調度器,那實現邏輯就應該類似),Volcano 的控制器部分應該負責根據 Job 資源配置來創建相應的 PodGroup 資源對象實例,然后調度器部分應該通過相應的 Informer 能力拿到 PodGroup 資源對象實例創建事件,接著進行相應的調度邏輯。(盲猜的,接著從代碼里順著這個思路看能不能找到相應邏輯。)
2.1 從 PodGroup 到 JobInfo 的封裝
在 pkg/scheduler 目錄內搜 PodGroup 相關代碼,一個 PodGroup 相關的 EventHandler 邏輯出現在我眼前。K8s 自定義控制器開發的主要工作之一就是定義 “Resource Event Handlers”。
pkg/scheduler/cache/cache.go:669
sc.podGroupInformerV1beta1.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
var pg *vcv1beta1.PodGroup
switch v := obj.(type) {
case *vcv1beta1.PodGroup:
pg = v
// ......
return responsibleForPodGroup(pg, mySchedulerPodName, c)
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPodGroupV1beta1,
UpdateFunc: sc.UpdatePodGroupV1beta1,
DeleteFunc: sc.DeletePodGroupV1beta1,
},
})
順著這里的代碼接著看 AddPodGroupV1beta1 方法的實現:
pkg/scheduler/cache/event_handlers.go:707
func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {
ss, ok := obj.(*schedulingv1beta1.PodGroup)
// ......
podgroup := scheduling.PodGroup{}
if err := scheme.Scheme.Convert(ss, &podgroup, nil); err != nil {
klog.Errorf("Failed to convert podgroup from %T to %T", ss, podgroup)
return
}
pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
// ......
if err := sc.setPodGroup(pg); err != nil {
klog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err)
return
}
}
這里定義了一個 PodGroup 類型(不是 CRD 里的 PodGroup):
type PodGroup struct {
scheduling.PodGroup
Version string
}
然后將這個包含 CR PodGroup + Version 的 pg 傳給了 sc.setPodGroup(pg) 方法。sc 的類型是 *SchedulerCache。
接著來看 setPodGroup 方法的實現:
pkg/scheduler/cache/event_handlers.go:668
func (sc *SchedulerCache) setPodGroup(ss *schedulingapi.PodGroup) error {
// 這里的 job 是一個字符串,內容是 PodGroup 的 namespace/name
job := getJobID(ss)
if _, found := sc.Jobs[job]; !found {
// Jobs 這個 map 的 key 就是 pg 的 namespace/name,value 是一個新的類型 *JobInfo
sc.Jobs[job] = schedulingapi.NewJobInfo(job)
}
// 這里存的 *JobInfo 類型的 Job 中很多屬性都來自于 ss 這個 pg
sc.Jobs[job].SetPodGroup(ss)
// ......
return nil
}
到這里,PodGroup 的信息就被轉存到了 JobInfo 中,JobInfo 也就對應一個 PodGroup 在 Scheduler 內的 wrapper。
2.2 從 Pod 到 TaskInfo 的封裝
PodGroup 這個 CR 中其實不包含 Pod 的具體信息。在 PodGroup 的 Spec 定義中,我們可以看到如下字段:
volcano.sh/apis@v1.8.0/pkg/apis/scheduling/types.go:166
type PodGroupSpec struct {
MinMember int32
MinTaskMember map[string]int32
Queue string
PriorityClassName string
MinResources *v1.ResourceList
}
換言之,通過 PodGroup 資源對象實例其實找不到相應的 pods 信息,也就是說 spec 里沒有類似 Pods 這樣的字段。那么 Pod 和 PodGroup 如何關聯呢?既然沒有直接綁定,那么 Pod 中就一定會保存 PodGroup 的信息,比如通過在 Pod 的 annotation 中保存所屬 PodGroup 的 id 之類的方式,然后在處理 Pod 變更事件對應的控制邏輯中完成 Pod 和 PodGroup 的關聯過程。
好,下一步理所當然看一下當 Pod 相關的 events 產生的時候,Scheduler 里相應的 handlers 是什么。
pkg/scheduler/cache/cache.go:616
sc.podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *v1.Pod:
if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) {
if len(v.Spec.NodeName) == 0 {
return false
}
if !responsibleForNode(v.Spec.NodeName, mySchedulerPodName, c) {
return false
}
}
return true
// ......
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPod,
UpdateFunc: sc.UpdatePod,
DeleteFunc: sc.DeletePod,
},
})
在 Filter 過程中,主要是根據 pod.Spec.SchedulerName 來判斷這個 Pod 是不是應該被當前調度器調度。順著繼續看 AddPod 方法的實現:
pkg/scheduler/cache/event_handlers.go:362
func (sc *SchedulerCache) AddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// ......
err := sc.addPod(pod)
// ......
}
再看 addPod 方法:
pkg/scheduler/cache/event_handlers.go:237
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi, err := sc.NewTaskInfo(pod)
// ......
return sc.addTask(pi)
}
這里干了2件事:
- 先拿著 pod 信息創建一個
*TaskInfo類型的 pi,這里的 TaskInfo 也就是一個 Pod 信息的 wrapper,和前面的 JobInfo 封裝 PodGroup 邏輯非常接近。 - SchedulerCache 的 addTask 方法將 TaskInfo 加到 JobInfo.Tasks 屬性中。這里的 Tasks 類型是
map[TaskID]*TaskInfo。
另外還需要關注 NewTaskInfo 方法里的一個細節:
pkg/scheduler/cache/event_handlers.go:226
func (sc *SchedulerCache) NewTaskInfo(pod *v1.Pod) (*schedulingapi.TaskInfo, error) {
taskInfo := schedulingapi.NewTaskInfo(pod)
// ......
return taskInfo, nil
}
這里調用了 schedulingapi.NewTaskInfo(pod),繼續往里:
pkg/scheduler/api/job_info.go:162
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
initResReq := GetPodResourceRequest(pod)
resReq := initResReq
bestEffort := initResReq.IsEmpty()
preemptable := GetPodPreemptable(pod)
revocableZone := GetPodRevocableZone(pod)
topologyInfo := GetPodTopologyInfo(pod)
jobID := getJobID(pod)
ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
Priority: 1,
Pod: pod,
Resreq: resReq,
InitResreq: initResReq,
// ......
}
// ......
return ti
}
注意到這里設置了一個 jobID 到 TaskInfo 里,而這個 getJobID() 方法的實現就很有意思了:
pkg/scheduler/api/job_info.go:141
func getJobID(pod *v1.Pod) JobID {
if gn, found := pod.Annotations[v1beta1.KubeGroupNameAnnotationKey]; found && len(gn) != 0 {
// Make sure Pod and PodGroup belong to the same namespace.
jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn)
return JobID(jobID)
}
return ""
}
這里嘗試從 pod 中尋找 key 為 "scheduling.k8s.io/group-name" 的 annotation,假如這個 value 是 pg1,那么 JobID 就是 "pod-namespace/pg1",其實也就是 PodGroup 的標識。于是到這里,表示 Pod 的 TaskInfo 也就關聯上了表示 PodGroup 的 TaskInfo。
3. 控制器中 PodGroup 和 Pod 的創建邏輯
到這里,我們知道了 Scheduler 中是如何處理 PodGroup 和 Pod,將其轉換成 jobs 和 job.tasks 然后進一步執行調度邏輯的。那么控制器層面是如何創建 PodGroup 和 Pod 的呢?
在 Volcano 中有一個自定義資源 Job,按理說這個 Job 類型的資源對象被創建后,相應的 Controller 應該負責完成 Job 對應的 PodGroup 和 pods 的創建,并且打上合適的 annotation。同理其他框架,比如 kubeflow 里的 operator 也應該是類似的邏輯,負責創建 PodGroup 以及 pods(也可能只創建 pods),然后和 Volcano Scheduler 協作完成批調度流程。
總之,接著先看下 Volcano 中的控制器部分是如何倒騰 PodGroup 和 Pod 的。
3.1 從 main 開始尋找 SyncJob 的蹤跡
接著我們從主函數入手,尋找當 Job 被創建后,Controller 對應的 worker 邏輯。
cmd/controller-manager/main.go:45
func main() {
// ......
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
這里的主要邏輯就是調用 Run() 方法,在 Run() 方法內有一個 startControllers(config, opt) 調用,startControllers(config, opt) 方法內又有一個 c.Run(ctx.Done()) 調用,這幾層函數基本都是框架性質的邏輯,這里不贅述。
c.Run(ctx.Done()) 方法對應的 JobController 的啟動方法是:
pkg/controllers/job/job_controller.go:238
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
cc.informerFactory.Start(stopCh)
cc.vcInformerFactory.Start(stopCh)
// ......
go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}
go cc.cache.Run(stopCh)
// Re-sync error tasks.
go wait.Until(cc.processResyncTask, 0, stopCh)
klog.Infof("JobController is running ...... ")
}
從這里就能看到 worker() 方法的調用入口,繼續往后跟 worker() 方法肯定能找到一個 processNextReq() 方法:
func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)
for cc.processNextReq(i) {
}
}
從 processNextReq() 方法中就開始有“干貨邏輯”了:
pkg/controllers/job/job_controller.go:310
func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
// ......
jobInfo, err := cc.cache.Get(key)
// ......
st := state.NewState(jobInfo)
// ......
if err := st.Execute(action); err != nil {
// ......
}
queue.Forget(req)
return true
}
這里的代碼主要邏輯就上面這幾行,我們來關注 st.Execute(action) 的邏輯。
首先 state.NewState(jobInfo) 調用返回了一個 st,這個 st 是什么呢?
pkg/controllers/job/state/factory.go:62
func NewState(jobInfo *apis.JobInfo) State {
job := jobInfo.Job
switch job.Status.State.Phase {
case vcbatch.Pending:
return &pendingState{job: jobInfo}
case vcbatch.Running:
return &runningState{job: jobInfo}
case vcbatch.Restarting:
return &restartingState{job: jobInfo}
case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
return &finishedState{job: jobInfo}
case vcbatch.Terminating:
return &terminatingState{job: jobInfo}
case vcbatch.Aborting:
return &abortingState{job: jobInfo}
case vcbatch.Aborted:
return &abortedState{job: jobInfo}
case vcbatch.Completing:
return &completingState{job: jobInfo}
}
// It's pending by default.
return &pendingState{job: jobInfo}
}
盲猜 State 這時候對應一個 *pendingState 類型。所以我們接著找 *pendingState 對象的 Execute() 方法實現:
pkg/controllers/job/state/pending.go:29
func (ps *pendingState) Execute(action v1alpha1.Action) error {
switch action {
// ......
default:
return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
status.State.Phase = vcbatch.Running
return true
}
return false
})
}
}
SyncJob() 函數在這里出現了。
3.2 SyncJob 過程如何創建 PodGroup 和 Pod
繼續來看 SyncJob 的具體實現。上一節找到的 SyncJob() 函數中調用到了 *jobcontroller.syncJob() 方法,sync job 的具體邏輯就在這個 syncJob() 方法中實現。
這里主要有2個過程:
-
initiateJob方法創建 PodGroup; - 創建 pods;
pkg/controllers/job/job_controller_actions.go:224
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
// ......
}
這個方法有點長,哎,一言難盡。我想提個 pr 給它拆分一下…… Anyway,這個方法里主要關注2個過程,我們直接來看吧。
3.2.1 創建 PodGroup
創建 PodGroup 的邏輯在 initiateJob() 方法中:
pkg/controllers/job/job_controller_actions.go:166
func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
// ......
if err := cc.createOrUpdatePodGroup(newJob); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return nil, err
}
return newJob, nil
}
這里拿著 job 信息調用了一個 createOrUpdatePodGroup() 方法來完成和 Job 對應的 PodGroup 的創建。這個方法里 pg 實例化的主要邏輯是:
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// 這個 pgName 內容是 job.Name + "-" + string(job.UID)
Name: pgName,
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}
換言之 PodGroup 和 Job 是一一對應的關系。
3.2.2 創建 Pods
繼續來看創建 pods 的過程:
pkg/controllers/job/job_controller_actions.go:335
for _, ts := range job.Spec.Tasks {
ts.Template.Name = ts.Name
tc := ts.Template.DeepCopy()
name := ts.Template.Name
pods, found := jobInfo.Pods[name]
if !found {
pods = map[string]*v1.Pod{}
}
var podToCreateEachTask []*v1.Pod
// 每個 Task 對應一組 pods,所以這里有一個循環
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
if pod, found := pods[podName]; !found {
// 這個 createJobPod 只是組裝 Pod 資源對象,類型是 *v1.Pod
newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)
if err := cc.pluginOnPodCreate(job, newPod); err != nil {
return err
}
// 加到隊列中
podToCreateEachTask = append(podToCreateEachTask, newPod)
waitCreationGroup.Add(1)
} else {
// ......
}
}
podToCreate[ts.Name] = podToCreateEachTask
// ......
}
這一輪循環負責解析 Job 中的所有 Tasks,然后給每個 Task 創建對應的 pods,加入到 podToCreateEachTask 這個切片中,進而得到 podToCreate (類型是 map[string][]*v1.Pod)這個 map,map 的 key 是 Task 的 Name,value 是每個 Task 對應的需要創建的 pods 列表。
在 createJobPod() 方法中有這樣幾行和 annotation 相關的代碼:
pkg/controllers/job/job_controller_util.go:100
index := strconv.Itoa(ix)
pod.Annotations[batch.TaskIndex] = index
pod.Annotations[batch.TaskSpecKey] = tsKey
pgName := job.Name + "-" + string(job.UID)
pod.Annotations[schedulingv2.KubeGroupNameAnnotationKey] = pgName
pod.Annotations[batch.JobNameKey] = job.Name
pod.Annotations[batch.QueueNameKey] = job.Spec.Queue
pod.Annotations[batch.JobVersion] = fmt.Sprintf("%d", job.Status.Version)
pod.Annotations[batch.PodTemplateKey] = fmt.Sprintf("%s-%s", job.Name, template.Name)
可以看到 Pod 的 annotation 里有一個 KubeGroupNameAnnotationKey = pgName,也就是 scheduling.k8s.io/group-name=pg-name,和前面我們在調度器里找到 annotation 匹配邏輯就對應上了。
然后來到第二個循環:
pkg/controllers/job/job_controller_actions.go:373
// 遍歷剛才組裝的 podToCreate map
for taskName, podToCreateEachTask := range podToCreate {
if len(podToCreateEachTask) == 0 {
continue
}
go func(taskName string, podToCreateEachTask []*v1.Pod) {
// ......
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
// 創建 Pods
newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
// ......
}(pod)
}
}(taskName, podToCreateEachTask)
}
4. 總結
今天我們先順著 Volcano Scheduler 部分的代碼找到了 PodGroup 的處理邏輯,看到了 Scheduler 拿到 PodGroup 后會組裝 JobInfo 對象;拿到 Pod 后會組裝 TaskInfo 對象(這里根據 Pod 的注解中指定的 PodGroup 名字來將 TaskInfo 和 JobInfo 關聯,也就是 Pod 和 PodGroup 的關聯。
接著我們又從 Volcano Controller(Job Controller)中找到了 PodGroup 和 Pod 的創建邏輯。在 Job 對象創建后,控制器會根據 Job 的信息創建一個唯一對應的 PodGroup,然后根據 Job 中的 Tasks 信息創建一系列的 pods,這些 pods 會帶上 PodGroup 名字(在注解里)
至此,我們知道了 Volcano 中調度器和控制器的職責分層,進一步也就能夠理解 Volcano 如何和 kubeflow 等其他框架結合完成復雜任務的批調度過程了。(上層框架創建 PodGroup 和 Pods,Volcano 根據 PodGroup 信息和 Pods 注解信息完成批調度過程。
5. 最后
下一步?我也沒想好。
看了幾天 Volcano 的源碼,整體感覺還是比較酣暢淋漓。一開始被調度器里的 Job 和 Task 概念帶坑里,感覺代碼很混亂;但是理解了 wrapper 的用意,知道了“調度”領域的 job 和 task 有不一樣的含義后,今天再刷就很輕松了。
Volcano 源碼整體還是 K8s 的“控制器+調度器”的邏輯,然后加上“任務調度領域”內的各種算法組成,代碼質量總的來說還可以,就是部分函數過長,循環嵌套過多,加上注釋和文檔的缺失,對于新人并不友好。
下一步我嘗試參與下 Volcano 社區,看能不能在“代碼可讀性”方向出一份力吧。
總結
以上是生活随笔為你收集整理的Volcano 原理、源码分析(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 剪映怎么去除水印
- 下一篇: 项目国际化的难点痛点是什么