Storm学习二
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>>
?
Nimbus
功能:對(duì)Topology任務(wù)進(jìn)行分配調(diào)度,接收用戶的命令做相應(yīng)的處理,submit,kill,activate,deactivate,rebalance
nimbus數(shù)據(jù)結(jié)構(gòu)
java數(shù)據(jù)結(jié)構(gòu)和Clojure數(shù)據(jù)結(jié)構(gòu)
Nimbus中除了主服務(wù)線程之外,還有個(gè)計(jì)時(shí)器線程
作用如下:
1調(diào)用mk-assignment啟動(dòng)新一輪的任務(wù)分配,調(diào)用do-cleanup方法清理storm元數(shù)據(jù),操作每隔(NIMBUS-MONITOR-FREQ-SECS)10秒執(zhí)行一次。
2調(diào)用clean-inbox方法清理nimbus本地目錄中topology的jar包(Cleanup-inbox-freq-secs 600s)執(zhí)行一次
3執(zhí)行topology的狀態(tài)轉(zhuǎn)移事件kill,rebalance等
?
mk-assignments方法會(huì)將所有的分配信息保存或更新到zookeeper中,supervisor會(huì)周期性地檢查和分配這些信息,并根據(jù)這些信息做相應(yīng)的調(diào)度處理
executor->node+port信息
do-cleanup
clean-inbox 清除本地目錄
Topology狀態(tài)轉(zhuǎn)移
transition!-name->transition!
delay-event方法表示延遲一段時(shí)間后再處理轉(zhuǎn)移事件,參數(shù)包括nimbus-data,storm-id,延遲執(zhí)行的時(shí)間及轉(zhuǎn)移事件。schedule方法
kill-transition方法定義了一個(gè)方法參數(shù)為kill-time;
rebalance-transition也返回了一個(gè)方法
?
啟動(dòng)Nimbus服務(wù)
涉及兩個(gè)方法
launch-server!定義了核心的處理邏輯,啟動(dòng)nimbus服務(wù)
service-handler 方法是nimbus真正處理請(qǐng)求的地方,定義了一些數(shù)據(jù)結(jié)構(gòu),以及用于啟動(dòng)任務(wù)調(diào)度和數(shù)據(jù)清理的線程,它還會(huì)返回一個(gè)實(shí)現(xiàn)了Nimbus$Iface接口,Shutdownable接口以及DaemonCommon接口的對(duì)象,nimbus-data方法構(gòu)建nimbus數(shù)據(jù)結(jié)構(gòu),調(diào)用cleanup-corrupt-topologies!方法清除哪些在ZooKeeper上還有元數(shù)據(jù)但在nimbus本地目錄中沒有對(duì)應(yīng)文件夾的Topology,將它們遺留在ZooKepper中的記錄徹底刪除。
將當(dāng)前所有處于活躍狀態(tài)的Topology調(diào)用transition!方法,設(shè)置Topology的狀態(tài):start-up
?
關(guān)閉Nimbus服務(wù)
包括殺掉計(jì)時(shí)器線程,釋放zookeeper連接,以及清除nimbus-data中上傳下載的緩存。
?
?
Nimbus主要服務(wù)方法:
Topology的提交
submitTopology:提交一個(gè)新的Topology,并為topology創(chuàng)建topology-id設(shè)置一些必要的元數(shù)據(jù),最后用mk-assignments方法為Topology分配任務(wù)
?
jar文件的上傳與下載
nimbus作為服務(wù)器,一方面接收用戶提交的Topology jar 包,另一方面還要向supervisor下達(dá)任務(wù)分配的jar包.
?
文件上傳beginFileUpload,uploadChunk finishFileUpload
文件下載beginFileDownload和downloadChunk
?
UI信息
Nimbus服務(wù)器本身記錄了當(dāng)前集群的任務(wù)和調(diào)度信息
getClusterInfo當(dāng)前集群的統(tǒng)計(jì)信息 :系統(tǒng)的資源占用情況,Nimbus服務(wù)運(yùn)行了多少時(shí)間,以及當(dāng)前系統(tǒng)中所有Topology的運(yùn)行統(tǒng)計(jì);
<supervisor-id,SupervisorInfo>信息構(gòu)造supervisorSummary對(duì)象,參數(shù)分別為主機(jī)名,啟動(dòng)時(shí)間,所有可用的端口數(shù)目,使用的端口號(hào)的數(shù)目以及supervisor-id,最后返回一個(gè)SupervisorSummary集合
<topology-id,stormBase>集合
根據(jù)topology-id獲取其任務(wù)分配信息,構(gòu)建TopologySummary對(duì)象,其參數(shù)依次為topology-id,storm-name,所有的Task數(shù)目,所有的Executor數(shù)目,所有被占用的slot數(shù)目;
根據(jù)supervisorSummary集合,nimbus的啟動(dòng)時(shí)間以及TopologySummary集合,創(chuàng)建ClusterSummary對(duì)象并返回。
?
獲得storm配置項(xiàng)和topology對(duì)象獲取等基本工作
getNimbusConf直接返回JSON序列化后的nimbus-data中保存的nimbus使用的storm配置項(xiàng)
getTopology方法獲得系統(tǒng)中所有的topology信息
?
?
?
?
輔助方法
system-topology!
驗(yàn)證提交的topology,同時(shí)添加系統(tǒng)組件和流
?
normalize-topology
計(jì)算提交的Topology中每個(gè)組件的并行度并更新該組件的Topology-tasks配置項(xiàng)
?
component-parallelism方法,用來(lái)計(jì)算組件并行度
?
compute-new-topology->executor->node+por
根據(jù)系統(tǒng)當(dāng)前已經(jīng)存在的分配情況,結(jié)合當(dāng)前系統(tǒng)的運(yùn)行情況找出需要進(jìn)行任務(wù)分配的Topology集合,并為他們分配任務(wù)。
即<topology-id,<executor,[node,port]>>每個(gè)topology對(duì)應(yīng)的任務(wù)分配情況,計(jì)算出新的集合結(jié)果
?
compute-executors根據(jù)當(dāng)前topology設(shè)置的組件的并行度創(chuàng)建對(duì)應(yīng)的executor.
nimbus:nimbus-data對(duì)象
storm-id:topology-id
?
?
?
?
Scheduler
是storm調(diào)度器,為topology分配當(dāng)前集群中可用的資源
IScheduler接口
prepare方法
scheduler方法
?
Storm提供了3種scheduler-EvenScheduler,DefaultScheduler和IsolationScheduler;
?
?
EvenScheduler:將可用資源均勻地分配給當(dāng)前小任務(wù)分配的多個(gè)Topology;
?
DefaultScheduler:是Storm默認(rèn)的任務(wù)調(diào)度器首先釋放掉其他topology不再需要的資源,然后調(diào)用evenScheduler方法為topology均勻分配資源;
?
IsolationScheduler:提供一種機(jī)制來(lái)確保集群中的某些Topology有足夠的運(yùn)行資源,可以單獨(dú)為某個(gè)Topology指定需要的資源;
sort-slots資源列表排序
?
Supervisor
可以理解為單擊任務(wù)調(diào)度器,負(fù)責(zé)箭筒nimbus的任務(wù)調(diào)度器,啟動(dòng)相應(yīng)的worker對(duì)nimbus分配的任務(wù)進(jìn)行處理,同時(shí)也會(huì)監(jiān)聽由他啟動(dòng)的worker的工作狀態(tài)
?
與supervisor相關(guān)的數(shù)據(jù)結(jié)構(gòu)
standalone-supervisor方法:返回一個(gè)實(shí)現(xiàn)了 ISupervisor接口的對(duì)象,獲取和創(chuàng)建supervisor的id
supervisor-data方法:定義了整個(gè)supervisor代碼共享數(shù)據(jù)結(jié)構(gòu),很多常用的成員變量
?
本地?cái)?shù)據(jù)存儲(chǔ),使用LocalState在本地保存相關(guān)的信息,LocalState保存重要的數(shù)據(jù),保證supervisor失敗重啟后能夠正常運(yùn)行
?
?
1 supervisor id
2 localAssigment
3 Approved Workers有效的<work-id,port>映射集合
?
?
Supervisor中的線程
計(jì)時(shí)器線程和兩個(gè)時(shí)間線程。
計(jì)數(shù)器線程負(fù)責(zé)維持心跳,得到各個(gè)Supervisor的最新狀態(tài),同時(shí)也負(fù)責(zé)每隔一段時(shí)間將事件線程要執(zhí)行的時(shí)間添加到對(duì)應(yīng)的隊(duì)列中。
同步nimbus任務(wù)的線程通過(guò)不斷執(zhí)行mk-synchronize-supervisor函數(shù)來(lái)保證supervisor與nimbus的任務(wù)同步,獲取新的任務(wù),移除舊任務(wù)
管理Worker進(jìn)程的線程
?
?
啟動(dòng)Supervisor
通過(guò)mk-supervisor方法來(lái)啟動(dòng)服務(wù)Supervisor
關(guān)閉Supervisor
將運(yùn)行狀態(tài)設(shè)置為false,關(guān)閉計(jì)時(shí)器線程,關(guān)閉Supervisor與Nimbus同步任務(wù)的線程,關(guān)閉管理Worker的線程,釋放掉與ZooJeeper的連接
?
?
幾個(gè)Supervisor中重要的輔助方法
launch-worker啟動(dòng)worker進(jìn)程,分分布式和本地模式
read-allocated-workers 用于獲得worker及其對(duì)應(yīng)的心跳信息,并根據(jù)心跳信息判斷worker狀態(tài)
wait-for-worker-launch啟動(dòng)worker時(shí)被調(diào)用,保證直到Worker成功啟動(dòng)起來(lái)后才返回
shutdown-worker該方法用于關(guān)閉Worker進(jìn)程并清理Worker的本地文件夾
download-storm-code這個(gè)方法用于從Nimbus下載與分配給當(dāng)前Supervisor的任務(wù)相對(duì)應(yīng)的Topology信息。跟launch-worker方法類似,該方法也有兩種模式— Local模式和分布式模式
?
?
?
?
轉(zhuǎn)載于:https://my.oschina.net/iioschina/blog/812358
總結(jié)
- 上一篇: Linux操作系统安装---centos
- 下一篇: ASP.NET Core中显示自定义错误