Storm概念学习系列之Topology拓扑
?
?
不多說,直接上干貨!
?
??Hadoop 上運行的是 MapReduce 作業,而在?Storm 上運行的是拓撲 Topology,這兩者之間是非常不同的。一個關鍵的區別是:一個MapReduce 作業最終會結束,而一個 Topology 拓撲會永遠運行(除非手動殺掉)。
?
?
?
Topology拓撲
從字面上解釋Topology,就是網絡拓撲,是指用傳輸介質互連各種設備的物理布局,是構成網絡的成員間特定的物理的(即真實的),或者邏輯的,即虛擬的排列方式。拓撲是一種不考慮物體的大小、形狀等物理屬性,而只使用點或者線描述多個物體實際位置與關系的抽象表示方法。拓撲不關心事物的細節,也不在乎相互的比例關系,只是以圖的形式表示一定范圍內多個物體之間的相互關系。從Storm角度考慮,它不是網絡拓撲,但是又類似于網絡拓撲的結構,所以取名Topology。
那么Storm的Topology指的是類似于網絡拓撲圖的一種虛擬結構。Storm的拓撲Topology類似于MapReduce任務,一個關鍵的區別是MapReduce任務運行一段時間后最終會完成,而Storm拓撲一直運行(直到殺掉它)。一個拓撲是由Spout和Bolt組成的圖,Spout和Bolt之間通過流分組連接起來。圖1形象地描述了Topology中的Spout和Bolt之間的關系。
?
圖1 ? ?Spout和Bolt的關系圖
?
?
?
通過對圖1的理解可以看出,Topology是由Spout、Bolt、數據載體Tuple等構成的一定規則的網絡拓撲圖。Storm提供了TopologyBuilder類來創建Topology。打個比方,TopologyBuilder是Topology的骨架,Spout、Bolt是Topology的肉和血液。TopologyBuilder類的主要方法如圖2所示。
圖 2 ? ?TopologyBuilder類的主要方法
?
?TopologyBuilder實際上是封裝了Topology的Thrift接口,也就是說Topology實際上是通過Thrift定義的一個結構,TopologyBuilder將這個對象建立起來,然后Nimbus實際上運行一個Thrift服務器,用于接收用戶提交的結構。由于采用Thrift實現,所以用戶可以用其他語言建立Topology,這樣就提供了比較方便的多語言操作支持。
?
?
?
?
?
?Topology實例
下面從一個簡單的例子開始介紹Topology的構建和定義,通過此案例能夠基本理解Storm,并且能夠構建一個簡單的Topology。本實例使用Topology來統計一個句子中單詞出現的頻率。下面詳細介紹如何設計和運行Topology,以及一些注意事項。
1. 設計Topology結構
在編寫代碼之前,首先要設計Topology。在理清數據處理邏輯之后,創建Topology就非常簡單了。統計單詞詞頻的Topology的大致結構如圖3所示??梢詫?strong>Topology分成3個部分:一是數據源KafkaSpout,負責發送語句;二是數據處理者SplitSentenceBolt,負責切分語句;三是數據再處理者WordCountBolt,負責累加單詞的頻率。
? ? ? ? ? ? ?
? 圖 3 ? ? ?Topology的結構
?
?
? ? ? ? ? ? ? ? ?
? ? ? ? ?2. 設計數據流
設計的Topology是從KafkaSpout中讀取句子,并把句子劃分成單詞,然后匯總每個單詞出現的次數,一個Bolt負責獲取句子后劃分成單詞,一個Bolt分別對應計算每一個單詞出現的次數,然后Tuple在Spout和Bolt之間傳遞,如圖3-15所示。
圖4 ? ?Topology內部數據流圖
? 3. 代碼實現
(1)構建Maven環境
為了開發Topology,需要把Storm相關的JAR包添加到CLASSPATH中,要么手動添加所有相關的JAR包,要么使用Maven來管理所有的依賴。Storm的JAR包發布在Clojars(一個Maven庫),如果使用Maven,需要把下面的配置代碼添加在項目的pom.xml中。
?
?
(2)定義Topology
定義Topology的內部邏輯,代碼如下:
SpoutConf?ig kafkaConf?ig = new SpoutConf?ig(brokerHosts, "storm-sentence", "", "storm"); kafkaConf?ig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(1,new KafkaSpout(kafkaConf?ig), 10);// id, spout, parallelism_hint builder.setBolt(2, new SplitSentence(), 10) .shuffleGrouping(1); builder.setBolt(3, new WordCount(), 20) .f?ieldsGrouping(2, new Fields("word"));
?
?
聲明的Topology的Spout是從Kafka中讀取句子,Spout用setSpout方法插入一個獨特的ID到Topology中。Topology中的每個節點必須給予一個ID,ID是由其他Bolt用于訂閱該節點的輸出流,KafkaSpout在Topology中的ID為1。
setBolt用于在Topology中插入Bolt。在Topology中定義的第一個Bolt是切割句子的Bolt,該Bolt(即SplitSentence)將句子流轉成單詞流;setBolt的最后一個參數是Bolt的并行量,因為SplitSentence是10個并發,所以在Storm集群中有10個線程并行執行。當Topology遇到性能瓶頸時,可以通過增加Bolt并行數量來解決。setBolt方法返回一個對象,用來定義Bolt的輸入。例如,SplitSentence約定使用組件ID為1的輸出流,1是指已經定義的KafkaSpout。SplitSentence會消耗KafkaSpout發出的每一個元組。
SplitSentence的關鍵方法是execute,它將句子拆分成單詞,并發出每個單詞作為新的元組。另一個重要的方法是declareOutputFields,其中聲明了Bolt輸出元組的架構,這個方法聲明它發出一個域為“word”的元組。
SplitSentence對句子中的每個單詞發射一個新的Tuple,WordCount在內存中維護每個單詞出現次數的映射,WordCount每收到一個單詞,都會更新內存中的統計狀態。
?
?
SplitSentence的實現代碼如下:
public class SplitSentence implements IBasicBolt{public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));}}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}?
?
?
WordCount的實現代碼如下:
?
public class WordCount implements IBasicBolt {private Map<String, Integer> _counts = new HashMap<String, Integer>();public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);int count;if(_counts.containsKey(word)) {count = _counts.get(word);} else {count = 0;}count++;_counts.put(word, count);collector.emit(new Values(word, count));}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));} }?
?
?
4. Topology運行
Topology運行有兩種模式:本地模式和分布式模式。這兩種模式的接口區別很大,使用場景也不相同。另外,下面還將介紹Topology的運行流程、方法調用過程以及并行度等。
1. Topology運行模式
Topology的運行模式可以分為本地模式和分布式模式,模式可以在配置文件中和代碼中設置。
(1)本地模式
Storm用一個進程中的線程來模擬所有的Spout和Bolt。本地模式對開發和測試來說比較有用。storm-starter中的Topology是以本地模式運行的,可以看到Topology中的每一個組件發射的消息。示例代碼如下:
?
Config conf = new Conf?ig(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); 首先,這段代碼通過定義一個LocalCluster對象來定義一個進程內的集群。提交Topology給這個虛擬的集群和提交Topology給分布式集群相同。通過調用submitTopology方法來提交Topology,共有3個參數:要運行的Topology的名稱、一個配置對象,以及要運行的Topology本身。
Topology是以名稱來唯一區別的,可以用這個名稱來殺掉該Topology,而且必須顯式地殺掉,否則它會一直運行。
?
conf對象可以配置內容很多,下面兩個是最常見的:
TOPOLOGY_WORKERS (setNumWorkers):定義希望集群分配多少個工作進程來執行這個Topology。Topology中的每個組件都需要線程來執行。每個組件到底用多少個線程是通過setBolt和setSpout來指定的。這些線程都運行在工作進程中。每一個工作進程包含一些節點的一些工作線程。例如,指定300個線程,60個進程,那么每個工作進程中要執行6個線程,而這6個線程可能屬于不同的組件(Spout或Bolt)??梢哉{整每個組件的并行度以及這些線程所在的進程數量來調整Topology的性能。
TOPOLOGY_DEBUG (setDebug):當它設置為true時,Storm會記錄下每個組件發射的每條消息。這在本地環境調試Topology時很有用,但是在生產環境中如果這么做,則會影響性能。
?
?
?(2)分布式模式
Storm由若干節點組成。提交Topology給Nimbus時,也會提交Topology代碼。Nimbus負責分發代碼和給Topolgoy分配工作進程。如果一個工作進程掛掉了,Nimbus節點會將其重新分配到其他節點。分布式模式提交拓撲的代碼如下:
?
? 在Storm代碼編寫完成之后,需要打包成JAR包放到Nimbus中運行。在打包時,不需要把依賴的JAR都打進去,否則運行時會出現重復的配置文件錯誤導致Topology無法運行,因為在Topology運行之前,會加載本地的storm.yaml配置文件。
在Nimbus運行的命令如下。
?
?
?
2. Topology運行流程
在Topology的運行流程中,有幾點需要特別說明。
1)提交Topology后,Storm會把代碼先存放到Nimbus節點的inbox目錄下;之后,把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,此目錄中同時還有序列化之后的Topology代碼文件。
2)在設定Topology關聯的Spout和Bolt時,可以同時設置當前Spout和Bolt的Executor和Task數量。在默認情況下,一個Topology的Task總和與Executor的總和一致。之后,系統根據Worker的數量,盡量將這些Task平均分配到不同的Worker上執行。Worker在哪個Supervisor節點上運行是由Storm本身決定的。
3)在任務分配好之后,Nimbus節點將任務的信息提交到ZooKeeper集群,同時在ZooKeeper集群中有Workerbeats,這里存儲了當前Topology所有Worker進程的心跳信息。
4)Supervisor節點不斷輪詢ZooKeeper集群,在ZooKeeper的assignments中保存了所有Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容來領取自己的任務,啟動Worker進程運行。
5)一個Topology運行之后,不斷通過Spout來發送流,通過Bolt來不斷處理接收到的流,流是無界的。最后一步會不間斷地執行,除非手動結束該Topology。
?
3. Topology的方法調用流程
Topology中的流處理時,調用方法的過程如圖3-16所示。
Topology方法調用的過程有如下一些要點:
1)每個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
2)open方法和prepare方法被調用多次。在入口函數中設定的setSpout或者setBolt中的并行度參數是指Executor的數量,是負責運行組件中的Task的線程數量,此數量是多少,上述兩個方法就會被調用多少次,在每個Executor運行時調用一次。
3)nextTuple方法和execute方法是一直運行的,nextTuple方法不斷發射Tuple,Bolt的execute不斷接收Tuple進行處理。只有這樣不斷地運行,才會產生無界的Tuple流,體現實時性。這類似于Java線程的run方法。
4)提交一個Topology之后,Storm創建Spout/Bolt實例并進行序列化。之后,將序列化的組件發送給所有任務所在的節點(即Supervisor節點),在每一個任務上反序列化組件。
5)Spout和Bolt之間、Bolt和Bolt之間的通信,通過ZeroMQ的消息隊列實現。
6)圖3-16沒有列出ack和fail方法,在一個Tuple成功處理之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理該Tuple。
? ?
? 圖5 ? ?Topology流處理過程圖
?
?
4. Topology并行度
在Topology的執行單元中,有幾個和并行度相關的概念。
(1)Worker
每個Worker都屬于一個特定的Topology,每個Supervisor節點的Worker可以有多個,每個Worker使用一個單獨的端口,Worker對Topology中的每個組件運行一個或者多個Executor線程來提供Task的執行服務。
(2)Executor
Executor是產生于Worker進程內部的線程,會執行同一個組件的一個或者多個Task。
(3)Task
實際的數據處理由Task完成。在Topology的生命周期中,每個組件的Task數量不會變化,而Executor的數量卻不一定。Executor數量小于等于Task的數量,在默認情況下,二者是相等的。
在運行一個Topology時,可以根據具體的情況來設置不同數量的Worker、Task、Executor,設置的位置也可以在多個地方。
1)Worker設置:可以設置yaml中的topology.workers屬性。在代碼中通過Conf?ig的setNumWorkers方法設定。
2)Executor設置:通過Topology的入口類中的setBolt、setSpout方法的最后一個參數指定,如果不指定,則使用默認值1。
3)Task設置:在默認情況下,和executor數量一致。在代碼中通過TopologyBuilder的setNumTasks方法設定具體某個組件的Task數量。
?
5. 終止Topology
在Nimbus啟動的節點上,使用下面的命令來終止一個Topology的運行。
storm kill topologyName
執行kill之后,通過UI界面查看Topology狀態,其先變成KILLED狀態,清理完本地目錄和ZooKeeper集群中與當前Topology相關的信息之后,此Topology將徹底消失。
?
6.Topology跟蹤
提交Topology后,可以在Storm UI界面查看整個Topology運行的過程。
?
?
?
?
?
?
如下
?
總結
以上是生活随笔為你收集整理的Storm概念学习系列之Topology拓扑的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cocos渲染流程
- 下一篇: Swift3.0 键盘高度监听获取