Storm系列(四)Topology提交校验过程
功能:提交一個新的Topology,并為Topology創建storm-id(topology-id),校驗其結構,設置必要的元數據,最后為Topology分配任務.
實現源碼:
| 1? | (^void?submitTopology |
| 2? | ????????[this?^String?storm-name?^String?uploadedJarLocation?^String?serializedConf?^StormTopology?topology] |
| 3? | ????????(.submitTopologyWithOpts?this?storm-name?uploadedJarLocation?serializedConf?topology |
| 4? | ?????????????????????????????????(SubmitOptions.?TopologyInitialStatus/ACTIVE))) |
從以上源碼中看出submitTopology內部是對submitTopologyWithOpts方法的調用。
submitTopologyWithOpts函數原型如下:
| 1? | ^void?submitTopologyWithOpts |
| 2? | ????????[this?^String?storm-name?^String?uploadedJarLocation?^String?serializedConf?^StormTopology?topology |
| 3? | ?????????^SubmitOptions?submitOptions] |
在submitTopologyWithOpts中主要做了以下幾件事情:
normalize-topology
實現源碼:
| 1? | (defn?normalize-topology?[storm-conf?^StormTopology?topology] |
| 2? | ??(let?[ret?(.deepCopy?topology)] |
| 3? | ????(doseq?[[_?component]?(all-components?ret)] |
| 4? | ??????(.set_json_conf |
| 5? | ????????(.get_common?component) |
| 6? | ????????(->>?{TOPOLOGY-TASKS?(component-parallelism?storm-conf?component)} |
| 7? | ?????????????(merge?(component-conf?component)) |
| 8? | ?????????????to-json?))) |
| 9? | ret?)) |
實現說明:
- 調用deepCopy對topology進行深度拷貝,賦值給ret.
- 遍歷topology(ret)所有組件,調用component-parallelism更新組件配置中的TOPOLOGY_TASKS信息。
component-parallelism實現源碼(計算組件并行度):
| 1? | (defn-?component-parallelism?[storm-conf?component] |
| 2? | ??(let?[storm-conf?(merge?storm-conf?(component-conf?component)) |
| 3? | ????????num-tasks?(or?(storm-conf?TOPOLOGY-TASKS)?(num-start-executors?component)) |
| 4? | ????????max-parallelism?(storm-conf?TOPOLOGY-MAX-TASK-PARALLELISM) |
| 5? | ????????] |
| 6? | ????(if?max-parallelism |
| 7? | ??????(min?max-parallelism?num-tasks) |
| 8? | ??????num-tasks))) |
- 將Topology配置信息與組件(component)配置信息進行合并,兩者存在重復的配置項時以組件的配置項為準。
- 計算組件并行度(num-tasks),若果配置storm-conf中配置了TOPOLOGY-TASKS信息,就以該配置值做為組件的并行度,否則通過調用num-start-executors獲取用戶對組件設置的并行度做為num-tasks.
- 獲取storm-conf配置中TOPOLOGY-MAX-TASK-PARALLELISM配置項的值。
- 返回TOPOLOGY-MAX-TASK-PARALLELISM與num-tasks較小的值做為組件的并行度。
| 1? | TopologyBuilder?builder?=?new?TopologyBuilder(); |
| 2? | //?4對應對用用戶設置的組件并行度,10對應TOPOLOGY-TASK配置項的值 |
| 3? | builder.setBolt("transfer",?new?TransferBolt(),?4).shuffleGrouping("random").setNumTasks(6);?Config?conf?=?new?Config(); |
| 4? | //?8對應?TOPOLOGY-MAX-TASK-PARALLELISM配置項的值 |
| 5? | Conf.setMaxTaskParallelism(8); |
?
system-topology!
功能:
驗證用戶提交的Topology,同時為提交的topology添加一些系統組件和流。
實現源碼:
| 1? | (defn?system-topology!?[storm-conf?^StormTopology?topology] |
| 2? | ??(validate-basic!?topology) |
| 3? | ??(let?[ret?(.deepCopy?topology)] |
| 4? | ????(add-acker!?storm-conf?ret) |
| 5? | ????(add-metric-components!?storm-conf?ret)???? |
| 6? | ????(add-system-components!?storm-conf?ret) |
| 7? | ????(add-metric-streams!?ret) |
| 8? | ????(add-system-streams!?ret) |
| 9? | ????(validate-structure!?ret) |
| 10? | ????ret |
| 11? | )) |
實現說明:
- 使用validate-basic!校驗所提交的Topology.
主要用于確保topology中的組件id不重復而且不是系統id,以及確保每個組件的TOPOLOGY-TASKS配置項大于0時,組件的并行度設置也一定大于0. - 調用deepCopy對topology進行深度拷貝,賦值給ret.
- 為Topology添加acker-bolt.
用于追蹤發送出去的消息是否被成功處理。 - 使用add-metric-components為Topology添加metric-bolt.
- 為Topology添加system-bolt.
System-bolt沒有輸入流只有輸出流分別為:SYSTEM-TICK-STREAM-ID,聲明字段是[“rate_secs”],非直接模式;另一個為METRICS-TICK-STREAM-ID,聲明字段為[“interval”]非直接模式,并行度為0. - 為Topology中的所有組件添加統計流。
Stream-id為METRICS-STREAM-ID,聲明字段為[“task-info”,”data-points”],非直接流模式. - 為Topology中的所有組件添加系統流。
stream-id為SYSTEM-STREAM-ID,聲明字段為[“event”],非直接流模式. - 使用validate-structure!檢驗以上步驟所組合后的Topology.
驗證過程:
獲取Topology中所有組件和組件的輸入(包括component-id、stream-id、Grouping),對輸入組件依次判斷輸入組件ID(component-id)是否在該Topology中,不存在則拋出異常,存在則再判斷該組件的流類型是否為所對應的stream-id,若不存在則拋出異常,存在則繼續檢查該流的分組方式(Grouping)是否與能對應,所有組件檢查完畢后沒有異常拋出表示該Topology有效.
轉載于:https://www.cnblogs.com/jianyuan/p/4792443.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Storm系列(四)Topology提交校验过程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于mysql ERROR 1045 (
- 下一篇: hp