Volcano 原理、源码分析(一)
- 0. 總結前置
- 1. 概述
-
2. Volcano 核心概念
- 2.1 認識 Queue、PodGroup 和 VolcanoJob
- 2.2. Queue、PodGroup 和 VolcanoJob 的關系
- 3. Volcano 調度框架概覽
-
4. 源碼分析
- 4.1 Action 實現在哪里?
-
4.2 從 main 函數入手看調度器啟動過程
- 4.2.1 入口邏輯
- 4.2.2 NewScheduler() 方法
- 4.2.3 Run() 方法
-
4.3 尋找 actions 和 plugins 的調用邏輯
- 4.3.1 理解 Session 以及 plugins 被調用的本質
- 4.3.2 理解 actions 的執行邏輯
-
4.4 Action 分析:enqueue
- 4.4.1 queues、queueSet 和 jobsMap
- 4.4.2 for 循環遍歷 jobs
- 4.4.3 無限循環 for
-
4.5 Action 分析:allocate
- 4.5.1 allocate.Execute() 整體邏輯
- 4.5.2 第一個 for 循環的邏輯
- 4.5.3 預選函數 predicateFn
- 4.5.4 第二個 for 循環的邏輯
- 4.6 Action 分析:backfill
- 5. 總結
- 6. 結尾
0. 總結前置
這段總結在文末還有,不過我還是決定在開頭放一份,方便第二次翻閱的讀者快速找到結論。你可以選擇跳到1. 概述開始順序閱讀本文。
看到這里,我開始疑惑為什么調度里關注的是 Job,Task 這些,不應該是關注 PodGroup 嗎?然后我找 Volcano 社區的幾個朋友聊了下,回過頭來再理代碼,發現 Scheduler 里的 Job、Task 和 Controller 里的 Job、Task 并不是一回事。
對于熟悉 K8s 源碼的讀者而言,很容易帶著 Job 就是 CR 的 Job 這種先入為主的觀點開始看代碼,并且覺得 Task 就是 CR Job 內的 Task。看到最后才反應過來,其實上面調度器里多次出現的 jobs 里放的那個 job 是 JobInfo 類型,JobInfo 類型對象里面的 Tasks 本質是 TaskInfo 類型對象的 map,而這個 TaskInfo 類型的 Task 和 Pod 是一一對應的,也就是 Pod 的一層 wrapper。
回過來看 Volcano 引入的 CR 中的 VolcanoJob 也不是 Scheduler 里出現的這個 Job。VolcanoJob 里也有一個 Tasks 屬性,對應的類型是 TaskSpec 類型,這個 TaskSpec 類似于 K8s 的 RS 級別資源,里面包含 Pod 模板和副本數等。
因此調度器里的 Task 其實對應 Pod,當做 Pod wrapper 理解;而 Task 的集合也就是 Pod 的集合,名字叫做 job,但是對應 PodGroup;而控制器里的 Job,也就是 VolcanoJob,它的屬性里并沒有 PodGroup;相反調度器那個 JobInfo 類型的 job 其實屬性里包含了一個 PodGroup,其實也可以認為是一個 PodGroup 的 wrapper。
所以看代碼的過程中會一直覺得 Scheduler 在面向 Job 和 Task 調度,和 PodGroup 沒有太大關系。其實這里的 Job 就是 PodGroup wrapper,Task 就是 Pod wrapper。
1. 概述
Volcano 是一個開源的 Kubernetes 批處理系統,專為高性能計算任務設計。它提供了一種高效的方式來管理和調度資源密集型作業,比如大數據處理和機器學習任務。
在批處理領域,任務通常需要大量計算資源,但這些資源在 Kubernetes 集群中可能是有限的或者分布不均。Volcano 嘗試通過一些高級調度功能來解決這些問題,盡可能確保資源被高效利用,同時最小化作業的等待時間。這對于需要快速處理大量數據的場景尤其重要,如科學研究、金融建模或任何需要并行處理大量任務的應用。
Volcano 的關鍵特性之一是它的 gang 調度機制。這個機制允許同時調度一組相關任務,確保它們要么全部啟動,要么都不啟動。這種方法對于那些需要多個任務協同工作的復雜作業來說至關重要,因為它避免了部分任務因資源不足而無法執行的情況。
舉個例子:Kubernetes 原生的調度器只能實現一個 Pod 一個 Pod 順序調度,對于小規模在線服務而言,也基本夠用。不過當一個服務需要大量 Pod 一起啟動才能正常運行時(比如一次模型訓練任務需要用到100個 pods 時,如何保證這100個 pods 要么都成功調度,要么都不被調度呢?這時候就需要 Volcano 提供的 gang 調度能力了。
今天咱就來具體分析下 Volcano 的工作原理。
2. Volcano 核心概念
先認識下 Volcano 的幾個核心概念。
2.1 認識 Queue、PodGroup 和 VolcanoJob
Volcano 引入了幾個新概念:
QueuePodGroupVolcanoJob
這些都是 K8s 里的自定義資源,也就是我們能夠通過 kubectl 命令查到相應的資源對象,好比 Deployment、Service、Pod 這些。
在 Volcano 中,Queue 用于管理和優先級排序任務。它允許用戶根據業務需求或優先級,將作業分組到不同的隊列中。這有助于更好地控制資源分配和調度優先級,確保高優先級的任務可以優先獲取資源。
PodGroup 一組相關的 Pod 集合。這主要解決了 Kubernetes 原生調度器中單個 Pod 調度的限制。通過將相關的 Pod 組織成 PodGroup,Volcano 能夠更有效地處理那些需要多個 Pod 協同工作的復雜任務。
VolcanoJob 是 Volcano 中的一個核心概念,它擴展了 Kubernetes 的 Job 資源。VolcanoJob 不僅包括了 Kubernetes Job 的所有特性,還加入了對批處理作業的額外支持,使得 Volcano 能夠更好地適應高性能和大規模計算任務的需求。
2.2. Queue、PodGroup 和 VolcanoJob 的關系
大致知道了 Volcano 中有 Queue、PodGroup 和 VolcanoJob 三種自定義資源后,我們接著具體看下這三種資源的作用、關系等。
首先,Queue 是一個 PodGroup 隊列,PodGroup 是一組強關聯的 Pod 集合。而 VolcanoJob 則是一個 K8s Job 升級版,對應的下一級資源是 PodGroup。換言之,就好比 ReplicaSet 的下一級資源是 Pod 一樣。
所以 VolcanoJob 背后對應一個 K8s 里的自定義控制器(Operator 模式),這個控制器會根據 VolcanoJob 的具體配置去創建相應的 PodGroup 出來。而 PodGroup 最終會被當做一個整體被 Volcano Scheduler 調度。在調度的過程中,Volcano 還用到了 Queue 來實現 PodGroup 的排隊、優先級控制等邏輯。
3. Volcano 調度框架概覽
繼續看 Volcano 調度邏輯的實現框架。
官方文檔里有一張圖,長這樣:
第一眼看這張圖會有點蒙,主要是如何理解 Action 和 Plugin 兩個概念,以及具體的 actions 和 plugins 作用是啥。
簡單來說,Volcano 調度過程中會執行一系列的動作,這些動作也就是 Action,主要是 enqueue、allocate、backfill 這些。具體有哪些 actions,默認執行哪些 actions,后面我們到源碼里去尋找。然后每個具體的 Action 中執行什么算法邏輯,就取決于注冊進去的 plugins。換言之,actions 是基本固定的,合計6個(剛翻源碼看到的,文檔落后了),可選執行其中某幾個;而 plugins 就有點多了(十幾個),具體哪些 plugins 在哪個 Action 中被調用呢?咱接下來翻源碼扒一扒。
4. 源碼分析
接下來開始帶著問題讀源碼。
4.1 Action 實現在哪里?
Action 相關源碼入口還是很好找,Volcano 在 pkg/scheduler 中放了調度器相關的代碼,里面有一個 actions 目錄。在 actions 目錄里的 factory.go 源文件中包含了一個 init 函數:
pkg/scheduler/actions/factory.go:29
func init() {
framework.RegisterAction(reclaim.New())
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
framework.RegisterAction(enqueue.New())
framework.RegisterAction(shuffle.New())
}
可以看到這里注冊了6個 actions。RegisterAction 方法的實現也很簡單:
pkg/scheduler/framework/plugins.go:102
var actionMap = map[string]Action{}
// RegisterAction register action
func RegisterAction(act Action) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
actionMap[act.Name()] = act
}
有一個 actionMap 來保存所有的 actions。這里的 Action 是一個 interface,定義如下:
pkg/scheduler/framework/interface.go:20
// Action is the interface of scheduler action.
type Action interface {
// The unique name of Action.
Name() string
// Initialize initializes the allocator plugins.
Initialize()
// Execute allocates the cluster's resources into each queue.
Execute(ssn *Session)
// UnIntialize un-initializes the allocator plugins.
UnInitialize()
}
4.2 從 main 函數入手看調度器啟動過程
接著我們從 main 函數入手看調度器啟動過程,看能不能找到 Action 是從哪里被調用的,actions 的調用順序等相關邏輯,進而后面我們可以按照 actions 執行順序來逐個分析具體的 Action 行為。
4.2.1 入口邏輯
調度器源碼入口很直觀:
main 函數中主要邏輯是調用這個 Run() 方法:
cmd/scheduler/main.go:71
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
Run() 方法負責啟動一個 Volcano 調度器,里面核心代碼只有下列2行,先構造 Scheduler 對象,然后調用其 Run() 方法:
sched, err := scheduler.NewScheduler(config, opt)
// ……
sched.Run(ctx.Done())
4.2.2 NewScheduler() 方法
接著看 NewScheduler 和 Run() 兩個方法:
pkg/scheduler/scheduler.go:59
// NewScheduler returns a scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
// ……
cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads)
scheduler := &Scheduler{
schedulerConf: opt.SchedulerConf,
fileWatcher: watcher,
cache: cache,
schedulePeriod: opt.SchedulePeriod,
dumper: schedcache.Dumper{Cache: cache},
}
return scheduler, nil
}
這里主要涉及到一個 Scheduler 對象,看起來是調度過程的核心實現對象:
pkg/scheduler/scheduler.go:44
// Scheduler watches for new unscheduled pods for volcano. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
metricsConf map[string]string
dumper schedcache.Dumper
}
4.2.3 Run() 方法
暫時不忙細看每個屬性,繼續來看 Run 方法:
// Run runs the Scheduler
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
pc.loadSchedulerConf()
go pc.watchSchedulerConf(stopCh)
// Start cache for policy.
pc.cache.SetMetricsConf(pc.metricsConf)
pc.cache.Run(stopCh)
pc.cache.WaitForCacheSync(stopCh)
klog.V(2).Infof("scheduler completes Initialization and start to run")
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
if options.ServerOpts.EnableCacheDumper {
pc.dumper.ListenForSignal(stopCh)
}
go runSchedulerSocket()
}
這個就是 Scheduler 的啟動邏輯了,我們先來看這里被周期性調用的 runOnce 方法,這個方法每隔1秒被執行一次:
pkg/scheduler/scheduler.go:99
func (pc *Scheduler) runOnce() {
// ……
actions := pc.actions
plugins := pc.plugins
configurations := pc.configurations
pc.mutex.Unlock()
//Load configmap to check which action is enabled.
conf.EnabledActionMap = make(map[string]bool)
for _, action := range actions {
conf.EnabledActionMap[action.Name()] = true
}
ssn := framework.OpenSession(pc.cache, plugins, configurations)
defer func() {
framework.CloseSession(ssn)
metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
}()
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}
可以看到在 runOnce 中的2個關鍵步驟:
ssn := framework.OpenSession(pc.cache, plugins, configurations)- 遍歷 actions,調用
action.Execute(ssn)
這里的 actions 集合是什么呢?OpenSession 拿到的 plugins 又是啥呢?
進一步跟代碼可以找到如下默認配置:
pkg/scheduler/util.go:31
var defaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
`
所以默認配置下,執行的 actions 是 enqueue, allocate, backfill 三個。再看默認方式部署后容器內的配置文件:
# cat /volcano.scheduler/volcano-scheduler.conf
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
enablePreemptable: false
- name: conformance
- plugins:
- name: overcommit
- name: drf
enablePreemptable: false
- name: predicates
- name: proportion
- name: nodeorder
- name: binpack
plugins 稍有不同,一個是 glang 和 drf 多了 enablePreemptable,一個是多了 binpack。接下來我們先看 actions 和 plugins 的調用邏輯,再看具體的 actions 和 plugins 分別是什么含義。
4.3 尋找 actions 和 plugins 的調用邏輯
前面我們看到 runOnce() 方法里的2個關鍵步驟:
ssn := framework.OpenSession(pc.cache, plugins, configurations)- 遍歷 actions,調用
action.Execute(ssn)
接下來咱順著這兩步來尋找 actions 和 plugins 的調用邏輯。
4.3.1 理解 Session 以及 plugins 被調用的本質
framework.OpenSession() 函數打開了一個 Session。不過什么是 Session 呢?來具體看下 OpenSession() 函數的實現:
pkg/scheduler/framework/framework.go:30
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
ssn.Configurations = configurations
ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
ssn.PodLister = NewPodLister(ssn)
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
klog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {
plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}
}
}
return ssn
}
這里的 Session 對象屬性很多,不過還是值得瀏覽一遍,大概心里有個印象,知道哪些功能被封裝進去了:
pkg/scheduler/framework/session.go:45
type Session struct {
UID types.UID
kubeClient kubernetes.Interface
recorder record.EventRecorder
cache cache.Cache
restConfig *rest.Config
informerFactory informers.SharedInformerFactory
TotalResource *api.Resource
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
CSINodesStatus map[string]*api.CSINodeStatusInfo
RevocableNodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
// NodeMap is like Nodes except that it uses k8s NodeInfo api and should only
// be used in k8s compatable api scenarios such as in predicates and nodeorder plugins.
NodeMap map[string]*k8sframework.NodeInfo
PodLister *PodLister
Tiers []conf.Tier
Configurations []conf.Configuration
NodeList []*api.NodeInfo
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
}
在 OpenSession() 函數中,plugins 被遍歷,然后依次調用 plugin.OnSessionOpen(ssn) 方法。這個 OnSessionOpen(ssn) 方法的調用并不會執行具體的動作,只是注冊了一堆的方法到 Session 里,比如上面這個 Session 對象的 preemptableFns 屬性就會在 gangPlugin 的 OnSessionOpen() 方法被調用時初始化,執行一行類似 ssn.preemptableFns[gp.Name()] = preemptableFn 的邏輯。所以一堆的 plugins 的調用邏輯就是將算法注冊到 Session 里。
接著看一眼 Plugin 對象的定義,其實很簡潔:
pkg/scheduler/framework/interface.go:35
type Plugin interface {
Name() string
OnSessionOpen(ssn *Session)
OnSessionClose(ssn *Session)
}
4.3.2 理解 actions 的執行邏輯
我們已經看到了 plugins 最終就是被綁到 Session 上的一堆算法,那么這些算法是怎樣被調用的呢?在 runOnce() 方法中的第二個主要邏輯是:
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
也就是 actions 被遍歷,然后依次執行 Execute() 方法,這里傳遞了一個 ssn(*Session 類型)對象進去。所以下一步的重點就是看 Execute() 方法的執行邏輯。
前面提到默認被執行的 actions 只有三個:enqueue, allocate 和 backfill。到這里可以看到接著的邏輯就是逐個調用這些 actions 的 Execute() 方法,那么 Execute() 里放的應該就是 Action 的具體邏輯了。
到這里在回過頭來看官網的圖,主流程就很好理解了:
一個個 plugins 注冊具體的算法函數到 Session 里,然后 actions 順序執行的過程中,到 Session 里去取相應的算法函數來執行。
4.4 Action 分析:enqueue
enqueue Action 的 Execute() 方法骨架如下:
pkg/scheduler/actions/enqueue/enqueue.go:44
func (enqueue *Action) Execute(ssn *framework.Session) {
// ......
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueSet := sets.NewString()
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
// ......
}
klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
for {
// ......
}
}
開頭引入了3個局部變量 queues、queueSet 和 jobsMap,接著執行了2個 for 循環,接著我們逐個來分析。
4.4.1 queues、queueSet 和 jobsMap
1. queues
這里的 queues 是一個 Priority Queue,定義如下:
pkg/scheduler/util/priority_queue.go:26
type PriorityQueue struct {
queue priorityQueue
}
type priorityQueue struct {
items []interface{}
lessFn api.LessFn
}
這個隊列的實現用了 heap 包,實現了一個“最大堆”,也就是每次 Pop() 會拿到一個優先級最高的 item。另外需要注意的是這里的 queues 用了復數形式,其實是因為下文這個隊列的用法中,item 是一個隊列,也就是當前隊列中存放的還是隊列。后面我們具體來看。
2. queueSet
這個沒啥好說的,一個 name set。
3. jobsMap
這是一個從 QueueID 到 PriorityQueue 的 map
4.4.2 for 循環遍歷 jobs
這一段 for 循環的代碼如下:
// 這個 Job 是 Volcano 自定義資源 Job,不是 K8s 里的 Job;這里開始遍歷所有 jobs
for _, job := range ssn.Jobs {
if job.ScheduleStartTimestamp.IsZero() {
ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
Time: time.Now(),
}
}
// 如果 job 中定義的 Queue 在 Session 中存在,那就執行
// queueSet.Insert(string(queue.UID)) 和
// queues.Push(queue);注意這里 Push 進去的是 queue
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if !queueSet.Has(string(queue.UID)) {
klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
// 這里構建了一個 queue UID 的 set 和一個 queue 隊列(優先級隊列,heap 實現)
queueSet.Insert(string(queue.UID))
queues.Push(queue)
}
if job.IsPending() {
// 如果 job 指定的 queue 還沒存到 jobsMap 里,則創建一個對應的 PriorityQueue
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
// 將 job 加到指定 queue 中
jobsMap[job.Queue].Push(job)
}
}
這個 for 循環主要做2件事情,一個是遍歷 jobs 的過程中判斷用到了哪些 Queue(K8s 自定義資源對象),將這些 Queue 保存到 queueSet 和 queues 中;另外一個就是將處于 Pending 狀態的 jobs 加入到 jobsMap 中。這里涉及到自定義資源 Queue 和局部變量 queue、queues 這些,看起來有點繞。
4.4.3 無限循環 for
for {
// 沒有隊列,退出循環
if queues.Empty() {
break
}
// 從優先級隊列 queues 中 Pop 一個高優的隊列出來
queue := queues.Pop().(*api.QueueInfo)
// 如果這個高優隊列在 jobsMap 里沒有保存相應的 jobs,也就是為空,那就繼續下一輪循環
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
// jobs 也是一個優先級隊列,Pop 一個高優 job 出來
job := jobs.Pop().(*api.JobInfo)
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
ssn.JobEnqueued(job)
// Phase 更新為 "Inqueue"
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
// 將當前 job 加入到 ssn.Jobs map
ssn.Jobs[job.UID] = job
}
// 將前面 Pop 出來的 queue 加回到 queues 中,直到 queue 中沒有 job,這樣逐步 queues 為空空,上面的 Empty() 方法就會返回 true,然后循環退出。
queues.Push(queue)
}
這個循環的邏輯是消化隊列里的 jobs。首先將全局隊列按照優先級 Pop 一個高優隊列出來,然后根據這個隊列的 UID 找到本地 jobsMap 里對應的 jobs 隊列,這又是一個優先級隊列。最后從這個優先級隊列中 Pop 一個高優 Job 出來,將其狀態設置成 Inqueue。
總的來說,enqueue 過程就是按照隊列的優先級順序,將隊列中的 jobs 再按照優先級依次標記為 "Inqueue" 狀態(job.PodGroup.Status.Phase = "Inqueue")。
4.5 Action 分析:allocate
接著來看 allocate 過程。
4.5.1 allocate.Execute() 整體邏輯
allocate.Execute() 方法的實現如下:
pkg/scheduler/actions/allocate/allocate.go:44
func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")
// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
// 3. pick a task T from J (using ssn.TaskOrderFn)
// 4. use predicateFn to filter out node that T can not be allocated on.
// 5. use ssn.NodeOrderFn to judge the best node and assign it to T
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
// ......
}
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
pendingTasks := map[api.JobID]*util.PriorityQueue{}
allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error){
// ......
}
for {
// ......
}
我把三個相對獨立的邏輯模塊替換成了省略號,剩下的內容就不到十行了,相對好理解很多。我們先看這不到十行的方法主體,再看省略的三部分邏輯。
首先這里還是引入了一個優先級隊列 queues 和一個從 queue id 到一個優先級隊列的 map jobsMap。
- queues:一個元素為優先級隊列的優先級隊列,也就是一個保存 queue 的“最大堆”,從而方便獲取一個優先級最高的 queue;
- jobsMap:一個 map,key 是 queue 的 id,value 是一個優先級隊列,也就是一個特定的 queue,queue 中存著 jobs;通過這個 map 可以方便獲取指定 queue 中的一個優先 job;
4.5.2 第一個 for 循環的邏輯
for _, job := range ssn.Jobs {
// ......
jobsMap[job.Queue].Push(job)
}
這個 for 看著長,不過除了一些健壯性邏輯之外,核心邏輯只有這樣一行,也就是遍歷 jobs,將其按照 queue 不同存到 jobsMap 中。
4.5.3 預選函數 predicateFn
接著來看預選函數 predicateFn 的實現邏輯。
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resources))
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, api.NewFitError(task, node, err.Error())
}
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, api.NewFitError(task, node, statusSets.Message())
}
return nil, nil
}
這里的邏輯是接收一個 task 和 node 作為參數,然后判斷這個 node 上能否跑起來這個 task。返回值 Status 類型是一個結構體,定義如下:
type Status struct {
Code int
Reason string
}
Code 的可選值有5個:Success、Error、Unschedulable、UnschedulableAndUnresolvable、Wait 和 Skip。這里主要需要理解三個狀態:
- Success:可調度
- Unschedulable:不可調度,但是驅逐后可能可調度
- UnschedulableAndUnresolvable:不可調度且驅逐也不可調度
接著我們去看這個 predicateFn 是如何被調用的。
4.5.4 第二個 for 循環的邏輯
這個 for 循環行數超過 160,真是,,,不優雅。
pkg/scheduler/actions/allocate/allocate.go:120
for {
if queues.Empty() {
break
}
// Pop 一個最高優的 queue 出來
queue := queues.Pop().(*api.QueueInfo)
// ......
// jobs 也就是這個高優 queue 中的所有 jobs
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
klog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
// job 就是 jobs 這個優先級隊列中的最高優條目
job := jobs.Pop().(*api.JobInfo)
if _, found = pendingTasks[job.UID]; !found {
// tasks 也是一個優先級隊列,里面保存一個 job 下的所有 tasks
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
// Skip BestEffort task in 'allocate' action.
if task.Resreq.IsEmpty() {
klog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
task.Namespace, task.Name)
continue
}
// 將 task Push 到 tasks 隊列中
tasks.Push(task)
}
// 這個 map 的 key 是 job 的 id,value 是 tasks 隊列
pendingTasks[job.UID] = tasks
}
tasks := pendingTasks[job.UID]
// Added Queue back until no job in Namespace.
queues.Push(queue)
if tasks.Empty() {
continue
}
klog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)
stmt := framework.NewStatement(ssn)
ph := util.NewPredicateHelper()
// tasks 不為空時,開一個循環來消化這些 tasks;這里的 tasks 屬于同一個 job
for !tasks.Empty(){
// ......
}
if ssn.JobReady(job) {
stmt.Commit()
} else {
if !ssn.JobPipelined(job) {
stmt.Discard()
}
}
}
繼續來看內部循環,也就是 tasks 不 Empty 的時候相應的處理邏輯:
pkg/scheduler/actions/allocate/allocate.go:169
for !tasks.Empty() {
// 取出最高優的 task
task := tasks.Pop().(*api.TaskInfo)
// ......
// 跑一次預選算法,具體算法內容后面再分析
if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
break
}
// 拿到預選通過的節點列表
predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}
// 候選節點列表,注意這里是二維切片,后面會依次直接保存 idleCandidateNodes 和 futureIdleCandidateNodes 兩個切片本身進去
var candidateNodes [][]*api.NodeInfo
// 空閑候選節點列表
var idleCandidateNodes []*api.NodeInfo
// 未來空閑候選節點列表(預期即將有資源會被釋放出來的節點)
var futureIdleCandidateNodes []*api.NodeInfo
for _, n := range predicateNodes {
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
idleCandidateNodes = append(idleCandidateNodes, n)
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
} else {
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
n.Name, n.Idle, n.FutureIdle(), task.Name)
}
}
// 填充候選節點列表
candidateNodes = append(candidateNodes, idleCandidateNodes)
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)
// 準備尋找最優節點
var bestNode *api.NodeInfo
// for 循環變量里用的是 nodes,也就是先拿到 idleCandidateNodes,再拿 futureIdleCandidateNodes
for index, nodes := range candidateNodes {
// ......
switch {
case len(nodes) == 0:
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
case len(nodes) == 1: // If only one node after predicate, just use it.
bestNode = nodes[0]
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
// 優選算法來打分
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
bestNode = ssn.BestNodeFn(task, nodeScores)
if bestNode == nil {
bestNode = util.SelectBestNode(nodeScores)
}
}
// 如果在 idleCandidateNodes 中找到合適的節點,那就不看 futureIdleCandidateNodes 了
if bestNode != nil {
break
}
}
// 將前面找到的最佳節點相應資源分配給當前 task
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
// ......
if err := stmt.Allocate(task, bestNode); err != nil {
// ......
}
// ......
} else {
// 將 node 上預期要釋放的資源分配給當前 task
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
// ......
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, err)
}
// ......
}
}
if ssn.JobReady(job) && !tasks.Empty() {
jobs.Push(job)
break
}
}
這個 for 循環的邏輯主要是按照優先級依次給 tasks 尋找最合適的 node,找到后“預占”資源,于是按順序逐步給所有的 tasks 都找到了最佳節點。
到這里我們沒有具體去深究最后 pods 是如何被綁定到節點上的,也沒有去看 Pipeline、Summit 這些邏輯;先放放,往后看完最后一個 Action backfill 之后,對整體框架熟悉了,再進一步分析細節。
4.6 Action 分析:backfill
backfill 的邏輯是遍歷待調度 jobs(Inqueue 狀態),然后將沒有沒有指明資源申請大小的 task 調度掉。不過這里沒有處理一個 job 中部分 task 指明了資源大小,部分沒有指定的場景。總之看起來不是核心邏輯,考慮到本文篇幅已經過長,這塊暫時不贅述。
5. 總結
看到這里,我開始疑惑為什么調度里關注的是 Job,Task 這些,不應該是關注 PodGroup 嗎?然后我找 Volcano 社區的幾個朋友聊了下,回過頭來再理代碼,發現 Scheduler 里的 Job、Task 和 Controller 里的 Job、Task 并不是一回事。
對于熟悉 K8s 源碼的讀者而言,很容易帶著 Job 就是 CR 的 Job 這種先入為主的觀點開始看代碼,并且覺得 Task 就是 CR Job 內的 Task。看到最后才反應過來,其實上面調度器里多次出現的 jobs 里放的那個 job 是 JobInfo 類型,JobInfo 類型對象里面的 Tasks 本質是 TaskInfo 類型對象的 map,而這個 TaskInfo 類型的 Task 和 Pod 是一一對應的,也就是 Pod 的一層 wrapper。
回過來看 Volcano 引入的 CR 中的 VolcanoJob 也不是 Scheduler 里出現的這個 Job。VolcanoJob 里也有一個 Tasks 屬性,對應的類型是 TaskSpec 類型,這個 TaskSpec 類似于 K8s 的 RS 級別資源,里面包含 Pod 模板和副本數等。
因此調度器里的 Task 其實對應 Pod,當做 Pod wrapper 理解;而 Task 的集合也就是 Pod 的集合,名字叫做 job,但是對應 PodGroup;而控制器里的 Job,也就是 VolcanoJob,它的屬性里并沒有 PodGroup;相反調度器那個 JobInfo 類型的 job 其實屬性里包含了一個 PodGroup,其實也可以認為是一個 PodGroup 的 wrapper。
所以看代碼的過程中會一直覺得 Scheduler 在面向 Job 和 Task 調度,和 PodGroup 沒有太大關系。其實這里的 Job 就是 PodGroup wrapper,Task 就是 Pod wrapper。
6. 結尾
在大致知道 Scheduler 的工作過程后,還有很多的細節等著我們進一步分析。比如:
- 從 PodGroup 的創建入手,Scheduler 如何接手 PodGroup 完成調度過程的呢?(這條路一定走得通,不然其他框架,比如 Kubeflow 等就無法和 Volcano 整合了。)
- PodGroup 里不包含 pods 信息,那 Scheduler 如何找到對應的 Pod 完成節點綁定呢?(粗看應該是通過 Pod 的 annotation 來過濾特定 PodGroup 名下的 pods,然后完成的調度。
- Job(vcjob)和 PodGroup 控制器的主要工作邏輯是什么?
- ……
2023年最后一個工作日了,肝不動了,節后繼續刷。(預知下文,記得關注微信公眾號:胡說云原生,寶子們年后見!)
總結
以上是生活随笔為你收集整理的Volcano 原理、源码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2023总结与展望--Empirefre
- 下一篇: “报错”是编程世界中,最简单的事情!