storm调优
storm調(diào)優(yōu)
@(STORM)[storm]
本文從2個(gè)方面討論storm的調(diào)優(yōu),第一個(gè)是集群的調(diào)優(yōu),第二個(gè)是運(yùn)行在集群中的拓?fù)涞恼{(diào)優(yōu),這部分還包括了使用storm-kafka從kafka中讀取消息的調(diào)優(yōu)。
官方的一些建議請(qǐng)見:http://storm.apache.org/documentation/FAQ.html
中文版:http://ifeve.com/storm-faq/
一、集群調(diào)優(yōu)
1、netty的調(diào)優(yōu)
netty的配置項(xiàng)主要包括以下幾個(gè):
storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 300 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # Sets the backlog value to specify when the channel binds to a local address storm.messaging.netty.socket.backlog: 500# By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology. storm.messaging.netty.authentication: false2、GC記錄打印
- 在配置文件中開啟 GC 日志記錄;如果一切正常,日志中記錄的 major GC 應(yīng)該會(huì)非常少
二、拓?fù)湔{(diào)優(yōu)
1、使用組件的并行度代替線程池
在storm中,我們可以很方便的調(diào)整spout/bolt的并行度,即使啟動(dòng)拓?fù)鋾r(shí)設(shè)置不合理,也可以使用rebanlance命令進(jìn)行動(dòng)態(tài)調(diào)整。
但有些人可能會(huì)在一個(gè)spout/bolt組件的task內(nèi)部啟動(dòng)一個(gè)線程池,這些線程池所在的task會(huì)比其余task消耗更多的資源,因此這些task所在的worker會(huì)消耗較多的資源,有可能影響其它拓?fù)涞恼?zhí)行。
因此,應(yīng)該使用組件自身的并行度來(lái)代替線程池,因?yàn)檫@些并行度會(huì)被合理分配到不同的worker中去。除此之外,還可以使用CGroup等技術(shù)進(jìn)行資源的控制。
2、不要在spout中處理耗時(shí)的操作
在storm中,spout是單線程的。如果nextTuple方法非常耗時(shí),某個(gè)消息被成功執(zhí)行完畢后,acker會(huì)給spout發(fā)送消息,spout若無(wú)法及時(shí)消費(fèi),則有可能導(dǎo)致 ack消息被丟棄,然后spout認(rèn)為執(zhí)行失敗了。
在jstorm中將spout分成了3個(gè)線程,分別執(zhí)行nextTuple, fail, ack方法。
3、fieldsGrouping的數(shù)據(jù)均衡性
fieldsGrouping根據(jù)某個(gè)field的值進(jìn)行分組,以u(píng)serId為例,如果一個(gè)組件以u(píng)serId的值作為分組,則具有相同userId的值會(huì)被發(fā)送到同一個(gè)task。如果某些userId的數(shù)據(jù)量特別大,會(huì)導(dǎo)致這接收這些數(shù)據(jù)的task負(fù)載特別高,從而導(dǎo)致數(shù)據(jù)均衡出現(xiàn)問(wèn)題。
因此必須合理選擇field的值,或者更換分組策略。
4、優(yōu)先使用localOrShuffleGrouping代替shuffleGrouping
localOrShuffleGrouping是指如果task發(fā)送消息給目標(biāo)task時(shí),發(fā)現(xiàn)同一個(gè)worker中有目標(biāo)task,則優(yōu)先發(fā)送到這個(gè)task;如果沒(méi)有,則進(jìn)行shuffle,隨機(jī)選取一個(gè)目標(biāo)task。
localOrShuffleGrouping其實(shí)是對(duì)shuffleGrouping的一個(gè)優(yōu)化,因?yàn)橄司W(wǎng)絡(luò)開銷和序列化操作。
5、設(shè)置合理的MaxSpoutPending
另附官方建議:
- 開始時(shí)設(shè)置一個(gè)很小的 TOPOLOGY_MAX_SPOUT_PENDING(對(duì)于 trident 可以設(shè)置為 1,對(duì)于一般的 topology 可以設(shè)置為 executor 的數(shù)量),然后逐漸增大,直到數(shù)據(jù)流不再發(fā)生變化。這時(shí)你可能會(huì)發(fā)現(xiàn)結(jié)果大約等于 “2 × 吞吐率(每秒收到的消息數(shù)) × 端到端時(shí)延” (最小的額定容量的2倍)。
注意,此參數(shù)慎用,過(guò)大的maxspoutpending會(huì)增加某個(gè)batch fail的風(fēng)險(xiǎn),如果不能合理處理fail(如寫磁盤),則將其設(shè)置為1以盡量降低其fail的風(fēng)險(xiǎn)。如果可以通過(guò)state來(lái)處理fail,則可選擇最優(yōu)參數(shù)。
在啟用了ack的情況下,spout中有個(gè)RotatingMap來(lái)保存spout已經(jīng)發(fā)送出去,但未收到ack結(jié)果的消息。RotatingMap最大的大小為p*num-task,其中num-task就是spout的task數(shù)量,而p為topology.max.spout.pending的值,也可以通過(guò)setMaxSpoutPending來(lái)指定,是指每個(gè)task最多已經(jīng)發(fā)送出去但未被ack的消息數(shù)量。
若設(shè)置過(guò)小,則task的處理能力未充分應(yīng)用,不能達(dá)到最佳的吞吐量。若設(shè)置過(guò)大,則消費(fèi)過(guò)多的內(nèi)存,還有可能spout的消息不能及時(shí)處理,從而導(dǎo)致fail的出現(xiàn)。
1、spout的Execute latency(執(zhí)行nextTuple的時(shí)間)為17ms,因此理論上每秒每個(gè)spout task的最大發(fā)送速度是60個(gè)tuple。
2、一個(gè)tuple的處理時(shí)長(zhǎng)約為200ms(topo的complete latency)
3、200ms內(nèi)有大約有60*0.2=12個(gè)tuple被發(fā)送。
4、因此MaxSpoutPenging被設(shè)置為12較為合理。
小結(jié):1/Execute latency*complete latency,即使用topo的complete latency除以Execute latency即可,但實(shí)際上不應(yīng)該考慮如此極端的情況,以避免過(guò)多的fail出現(xiàn),所以可以設(shè)置為上述值除以1.5左右。
默認(rèn)值為1,需要改為合理的值。對(duì)trident是否適用??
//任何時(shí)刻中,一個(gè)spout task最多可以同時(shí)處理的tuple數(shù)量,即已經(jīng)emite,但未acked的tuple數(shù)量。默認(rèn)為1Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);if(active==null) {_maxTransactionActive = 1;} else {_maxTransactionActive = active.intValue();}6、避免出現(xiàn)fail
bolt在處理消息時(shí),worker 的日志中出現(xiàn)Failing message
原因:可能是因?yàn)門opology 的消息處理超時(shí)所致。一個(gè)常見的原因是supervisor的負(fù)載太高(如網(wǎng)絡(luò)、磁盤IO等),不能及時(shí)的處理消息,從而導(dǎo)致fail。
解決方法:提交Topology 時(shí)設(shè)置適當(dāng)?shù)南⒊瑫r(shí)時(shí)間,比默認(rèn)消息超時(shí)時(shí)間(30
秒)更長(zhǎng)。比如:
或者:
config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,60);也可以在storm.yaml中修改這個(gè)參數(shù):
topology.message.timeout.secs: 30因?yàn)镃onfig.java中定義了:
public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";一個(gè)場(chǎng)景:拓?fù)渲羞M(jìn)行大量的磁盤IO輸出,當(dāng)負(fù)載過(guò)高時(shí),機(jī)器不能在超時(shí)時(shí)間內(nèi)處理消息,從而消息fail掉,導(dǎo)致重傳。而由于已經(jīng)寫入磁盤的IO無(wú)法清除,所以重傳時(shí)再寫入磁盤會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。
7、batch interval
- 將 trident 的 batch interval 配置為你的集群的端到端時(shí)延的 50% 左右
8、Coordinator 是什么,為什么會(huì)有很多 Coordinator?
Trident spout 實(shí)際上是通過(guò) Storm 的 bolt 運(yùn)行的。MasterBatchCoordinator(MBC)封裝了 Trident 拓?fù)涞?spout,它負(fù)責(zé)整合 Trident 中的 batch,這一點(diǎn)對(duì)于你所使用的任何類型的 spout 而言都是一樣的。Trident 的 batch 就是在 MBC 向各個(gè) spout-coordinator 分發(fā)種子 tuple 的過(guò)程中生成的。Spout-coordinator bolt 知道你所定義的 spout 是如何互相協(xié)作的 —— 實(shí)際上,在使用 Kafka 的情況下,各個(gè) spout 就是通過(guò) spout-coordinator 來(lái)獲取 pull 消息所需要的 partition 和 offset 信息的。
在 spout 的 metadata 記錄中能夠存儲(chǔ)什么信息?
只能存儲(chǔ)少量靜態(tài)數(shù)據(jù),而且是越少越好(盡管你確實(shí)可以向其中存儲(chǔ)更多的信息,不過(guò)我們不推薦這樣做)。
emitPartitionBatchNew 函數(shù)是多久調(diào)用一次的?
由于在 Trident 中 MBC 才是實(shí)際運(yùn)行的 spout,一個(gè) batch 中的所有 tuple 都是 MBC 生成的 tuple 樹的節(jié)點(diǎn)。也就是說(shuō),Storm 的 “max spout pending” 參數(shù)實(shí)際上定義的是可以并發(fā)運(yùn)行的 batch 數(shù)量。MBC 在滿足以下兩個(gè)條件下會(huì)發(fā)送出一個(gè)新的 batch:首先,掛起的 tuple 數(shù)需要小于 “max pending” 參數(shù);其次,距離上一個(gè) batch 的發(fā)送已經(jīng)過(guò)去了至少一個(gè)trident batch interval 的間隔時(shí)間。
如果沒(méi)有數(shù)據(jù)發(fā)送,Trident 會(huì)降低發(fā)送頻率嗎?
是的,Storm 中有一個(gè)可選的 “spout 等待策略”,默認(rèn)配置是 sleep 一段指定的配置時(shí)間。
Trident batch interval 參數(shù)有什么用?
你知道 486 時(shí)代的計(jì)算機(jī)上面為什么有個(gè) trubo button 嗎?這個(gè)參數(shù)的作用和這個(gè)按鈕有點(diǎn)像。
實(shí)際上,trident batch interval 有兩個(gè)用處。首先,它可以用于減緩 spout 從遠(yuǎn)程數(shù)據(jù)源獲取數(shù)據(jù)的速度,但這不會(huì)影響數(shù)據(jù)處理的效率。例如,對(duì)于一個(gè)從給定的 S3 存儲(chǔ)區(qū)中讀取批量上傳文件并按行發(fā)送數(shù)據(jù)的 spout,我們就不希望它經(jīng)常觸發(fā) S3 的閾值,因?yàn)槲募魩追昼姴艜?huì)上傳一次,而且每個(gè) batch 也需要花費(fèi)一定的時(shí)間來(lái)執(zhí)行。
另一個(gè)用處是限制啟動(dòng)期間或者突發(fā)數(shù)據(jù)負(fù)載情況下內(nèi)部消息隊(duì)列的負(fù)載壓力。如果 spout 突然活躍起來(lái),并向系統(tǒng)中擠入了 10 個(gè) batch 的記錄,那么可能會(huì)有從 batch7 開始的大量不緊急的 tuple 堵塞住傳輸緩沖區(qū),并且阻塞了從 batch3 中的 tuple(甚至可能包含 batch3 中的部分舊 tuple)的 commit 過(guò)程#。對(duì)于這種情況,我們的解決方法就是將 trident batch interval 設(shè)置為正常的端到端處理時(shí)延的一半左右 —— 也就是說(shuō)如果需要花費(fèi) 600 ms 的時(shí)間處理一個(gè) batch,那么就可以每 300 ms 處理一個(gè) batch。
注意,這個(gè) 300 ms 僅僅是一個(gè)上限值,而不是額外增加的延時(shí)時(shí)間,如果你的 batch 需要花費(fèi) 258 ms 來(lái)運(yùn)行,那么 Trident 就只會(huì)延時(shí)等待 42 ms。
9、fetch Size
在讀取streaming數(shù)據(jù)流時(shí),需要將某個(gè)時(shí)間點(diǎn)前的數(shù)據(jù)掛掉,然后從這個(gè)時(shí)間點(diǎn)開始計(jì)算指標(biāo)。
問(wèn)題來(lái)了,如果指定每個(gè)批次的大小很大的話,數(shù)據(jù)丟得很快,很快就可以進(jìn)入處理邏輯,但是這批數(shù)據(jù)由于非常大,后面的bolt處理起來(lái)就會(huì)有問(wèn)題,甚至出現(xiàn)OOM。
如果指定批次很小的話話,則需要很長(zhǎng)時(shí)間才能把數(shù)據(jù)丟完,可能半天以上,基本不能接受。
解決辦法:
默認(rèn)情況下,每個(gè)kafka的數(shù)據(jù)文件大小為1G,因此丟棄數(shù)據(jù)時(shí),每個(gè)分區(qū)均需要丟1G以上的數(shù)據(jù)。
減小這些topic的文件大小,如64M,則每次只需要丟掉100G左右的數(shù)據(jù)。
bin/kafka-topics.sh –create –zookeeper 10.120.69.44:2181/kafka –topic streaming_g18_sdc –partitions 20 –replication-factor 2 –config segment.bytes=67108864
10、設(shè)置拓?fù)涞膉vm
可以在拓?fù)浼?jí)別指定Jvm參數(shù),覆蓋storm.yaml中的配置:
config.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx2048m -Xms2048m -Xmn384m -XX:PermSize=128m -XX:+UseConcMarkSweepGC");總結(jié)
- 上一篇: hadoop关键进程
- 下一篇: hprofile教程