聊聊storm的IWaitStrategy
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
序
本文主要研究一下storm的IWaitStrategy
IWaitStrategy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
public interface IWaitStrategy {static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> topologyConf) {IWaitStrategy producerWaitStrategy =ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);return producerWaitStrategy;}void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);/*** Implementations of this method should be thread-safe (preferably no side-effects and lock-free)* <p>* Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to estimate how long caller has been idling.* <p>* <pre>* <code>* int idleCounter = 0;* int consumeCount = consumeFromQ();* while (consumeCount==0) {* idleCounter = strategy.idle(idleCounter);* consumeCount = consumeFromQ();* }* </code>* </pre>** @param idleCounter managed by the idle method until reset* @return new counter value to be used on subsequent idle cycle*/int idle(int idleCounter) throws InterruptedException;enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}}- 這個接口提供了一個工廠方法,默認(rèn)是讀取topology.backpressure.wait.strategy參數(shù)值,創(chuàng)建producerWaitStrategy,并使用WAIT_SITUATION.BACK_PRESSURE_WAIT初始化
- WAIT_SITUATION一共有三類,分別是SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT
- 該接口定義了int idle(int idleCounter)方法,用于static或dynamic backoff
SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public class SpoutExecutor extends Executor {private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);private final IWaitStrategy spoutWaitStrategy;private final IWaitStrategy backPressureWaitStrategy;private final AtomicBoolean lastActive;private final MutableLong emittedCount;private final MutableLong emptyEmitStreak;private final SpoutThrottlingMetrics spoutThrottlingMetrics;private final boolean hasAckers;private final SpoutExecutorStats stats;private final BuiltinMetrics builtInMetrics;SpoutOutputCollectorImpl spoutOutputCollector;private Integer maxSpoutPending;private List<ISpout> spouts;private List<SpoutOutputCollector> outputCollectors;private RotatingMap<Long, TupleInfo> pending;private long threadId = 0;public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);//......}//...... }- 這里創(chuàng)建了兩個watiStrategy,一個是spoutWaitStrategy,一個是backPressureWaitStrategy
- spoutWaitStrategy讀取的是topology.spout.wait.strategy參數(shù),在defaults.yaml里頭值為org.apache.storm.policy.WaitStrategyProgressive
- backPressureWaitStrategy讀取的是topology.backpressure.wait.strategy參數(shù),在defaults.yaml里頭值為org.apache.storm.policy.WaitStrategyProgressive
BoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
public class BoltExecutor extends Executor {private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);private final BooleanSupplier executeSampler;private final boolean isSystemBoltExecutor;private final IWaitStrategy consumeWaitStrategy; // employed when no incoming dataprivate final IWaitStrategy backPressureWaitStrategy; // employed when outbound path is congestedprivate final BoltExecutorStats stats;private final BuiltinMetrics builtInMetrics;private BoltOutputCollectorImpl outputCollector;public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {super(workerData, executorId, credentials, ClientStatsUtil.BOLT);this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);if (isSystemBoltExecutor) {this.consumeWaitStrategy = makeSystemBoltWaitStrategy();} else {this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);}this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));this.builtInMetrics = new BuiltinBoltMetrics(stats);}private static IWaitStrategy makeSystemBoltWaitStrategy() {WaitStrategyPark ws = new WaitStrategyPark();Map<String, Object> conf = new HashMap<>();conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);return ws;}//...... }- 這里創(chuàng)建了兩個IWaitStrategy,一個是consumeWaitStrategy,一個是backPressureWaitStrategy
- consumeWaitStrategy在非SystemBoltExecutor的情況下讀取的是topology.bolt.wait.strategy參數(shù),在defaults.yaml里頭值為org.apache.storm.policy.WaitStrategyProgressive;如果是SystemBoltExecutor則使用的是WaitStrategyPark策略
- backPressureWaitStrategy讀取的是讀取的是topology.backpressure.wait.strategy參數(shù),在defaults.yaml里頭值為org.apache.storm.policy.WaitStrategyProgressive
WaitStrategyPark
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
public class WaitStrategyPark implements IWaitStrategy {private long parkTimeNanoSec;public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter}// Convenience alternative to prepare() for use in Testspublic WaitStrategyPark(long microsec) {parkTimeNanoSec = microsec * 1_000;}@Overridepublic void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC));} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));} else {throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);}}@Overridepublic int idle(int idleCounter) throws InterruptedException {if (parkTimeNanoSec == 0) {return 1;}LockSupport.parkNanos(parkTimeNanoSec);return idleCounter + 1;} }- 該策略使用的是LockSupport.parkNanos(parkTimeNanoSec)方法
WaitStrategyProgressive
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
/*** A Progressive Wait Strategy* <p> Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level.* Level 1 - No idling. Returns immediately. Stays in this level for `level1Count` iterations. Level 2 - Calls LockSupport.parkNanos(1).* Stays in this level for `level2Count` iterations Level 3 - Calls Thread.sleep(). Stays in this level until wait situation changes.** <p>* The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when the upstream component is a bit* relatively slower. Allows downstream bolt can enter deeper wait states only if the traffic to it appears to have reduced.* <p>*/ public class WaitStrategyProgressive implements IWaitStrategy {private int level1Count;private int level2Count;private long level3SleepMs;@Overridepublic void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT));level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT));level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT));level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT));level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));} else {throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);}}@Overridepublic int idle(int idleCounter) throws InterruptedException {if (idleCounter < level1Count) { // level 1 - no waiting++idleCounter;} else if (idleCounter < level1Count + level2Count) { // level 2 - parkNanos(1L)++idleCounter;LockSupport.parkNanos(1L);} else { // level 3 - longer idling with Thread.sleep()Thread.sleep(level3SleepMs);}return idleCounter;} }- WaitStrategyProgressive是一個漸進式的wait strategy,它分為3個level的idling
- level 1是no idling,立刻返回;在level 1經(jīng)歷了level1Count的次數(shù)之后進入level 2
- level 2使用的是LockSupport.parkNanos(1),在level 2經(jīng)歷了level2Count次數(shù)之后進入level 3
- level 3使用的是Thread.sleep(level3SleepMs),在wait situation改變的時候跳出
- 不同的WAIT_SITUATION讀取不同的LEVEL1_COUNT、LEVEL2_COUNT、LEVEL3_SLEEP_MILLIS參數(shù),對于spout,它們的默認(rèn)值分別為0、0、1;對于bolt它們的默認(rèn)值分別為1、1000、1;對于back pressure,它們的默認(rèn)值分別為1、1000、1
SpoutExecutor.call
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@Overridepublic Callable<Long> call() throws Exception {init(idToTask, idToTaskBase);return new Callable<Long>() {final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();int recvqCheckSkips = 0;int swIdleCount = 0; // counter for spout wait strategyint bpIdleCount = 0; // counter for back pressure wait strategyint rmspCount = 0;@Overridepublic Long call() throws Exception {int receiveCount = 0;if (recvqCheckSkips++ == recvqCheckSkipCountMax) {receiveCount = receiveQueue.consume(SpoutExecutor.this);recvqCheckSkips = 0;}long currCount = emittedCount.get();boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);boolean isActive = stormActive.get();if (!isActive) {inactiveExecute();return 0L;}if (!lastActive.get()) {lastActive.set(true);activateSpouts();}boolean pendingEmitsIsEmpty = tryFlushPendingEmits();boolean noEmits = true;long emptyStretch = 0;if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.spouts.get(j).nextTuple();}noEmits = (currCount == emittedCount.get());if (noEmits) {emptyEmitStreak.increment();} else {emptyStretch = emptyEmitStreak.get();emptyEmitStreak.set(0);}}if (reachedMaxSpoutPending) {if (rmspCount == 0) {LOG.debug("Reached max spout pending");}rmspCount++;} else {if (rmspCount > 0) {LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);}rmspCount = 0;}if (receiveCount > 1) {// continue without idlingreturn 0L;}if (!pendingEmits.isEmpty()) { // then facing backpressurebackPressureWaitStrategy();return 0L;}bpIdleCount = 0;if (noEmits) {spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);return 0L;}swIdleCount = 0;return 0L;}private void backPressureWaitStrategy() throws InterruptedException {long start = Time.currentTimeMillis();if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loopLOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");}bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);}private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {emptyEmitStreak.increment();long start = Time.currentTimeMillis();swIdleCount = spoutWaitStrategy.idle(swIdleCount);if (reachedMaxSpoutPending) {spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);} else {if (emptyStretch > 0) {LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);}}}// returns true if pendingEmits is emptyprivate boolean tryFlushPendingEmits() {for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {if (executorTransfer.tryTransfer(t, null)) {pendingEmits.poll();} else { // to avoid reordering of emits, stop at first failurereturn false;}}return true;}};}- spout維護了pendingEmits隊列,即emit沒有成功或者等待emit的隊列,同時也維護了pending的RotatingMap,即等待ack的tuple的id及數(shù)據(jù)
- spout從topology.max.spout.pending讀取TOPOLOGY_MAX_SPOUT_PENDING配置,計算maxSpoutPending=ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(),默認(rèn)為null,即maxSpoutPending為0
- spout在!reachedMaxSpoutPending && pendingEmitsIsEmpty的條件下才調(diào)用nextTuple發(fā)送數(shù)據(jù);在pendingEmits不為空的時候觸發(fā)backPressureWaitStrategy;在noEmits((currCount == emittedCount.get()))時觸發(fā)spoutWaitStrategy
- 在每次調(diào)用call的時候,在調(diào)用nextTuple之間記錄currCount = emittedCount.get();如果有調(diào)用nextTuple的話,則會在SpoutOutputCollectorImpl的emit或emitDirect等方法更新emittedCount;之后用noEmits=(currCount == emittedCount.get())判斷是否有發(fā)射數(shù)據(jù)
- spout維護了bpIdleCount以及swIdleCount,分別用于backPressureWaitStrategy.idle(bpIdleCount)、spoutWaitStrategy.idle(swIdleCount)
BoltExecutor.call
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@Overridepublic Callable<Long> call() throws Exception {init(idToTask, idToTaskBase);return new Callable<Long>() {int bpIdleCount = 0;int consumeIdleCounter = 0;private final ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();@Overridepublic Long call() throws Exception {boolean pendingEmitsIsEmpty = tryFlushPendingEmits();if (pendingEmitsIsEmpty) {if (bpIdleCount != 0) {LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount);}bpIdleCount = 0;int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);if (consumeCount == 0) {if (consumeIdleCounter == 0) {LOG.debug("Invoking consume wait strategy");}consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);if (Thread.interrupted()) {throw new InterruptedException();}} else {if (consumeIdleCounter != 0) {LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter);}consumeIdleCounter = 0;}} else {if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loopLOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size());}bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);}return 0L;}// returns true if pendingEmits is emptyprivate boolean tryFlushPendingEmits() {for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {if (executorTransfer.tryTransfer(t, null)) {pendingEmits.poll();} else { // to avoid reordering of emits, stop at first failurereturn false;}}return true;}};}- bolt executor同樣也維護了pendingEmits,在pendingEmits不為空的時候,觸發(fā)backPressureWaitStrategy.idle(bpIdleCount)
- 在pendingEmits為空時,根據(jù)receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits)返回的consumeCount,若為0則觸發(fā)consumeWaitStrategy.idle(consumeIdleCounter)
- bolt executor維護了bpIdleCount及consumeIdleCounter,分別用于backPressureWaitStrategy.idle(bpIdleCount)以及consumeWaitStrategy.idle(consumeIdleCounter)
小結(jié)
- spout和bolt的executor里頭都用到了backPressureWaitStrategy,讀取的是topology.backpressure.wait.strategy參數(shù)(for any producer (spout/bolt/transfer thread) when the downstream Q is full),使用的實現(xiàn)類為org.apache.storm.policy.WaitStrategyProgressive,在下游component的recv queue滿的時候使用的背壓策略;具體是使用pendingEmits隊列來判斷,spout或bolt的call方法里頭每次判斷pendingEmitsIsEmpty都是調(diào)用tryFlushPendingEmits,先嘗試發(fā)送數(shù)據(jù),如果下游成功接收,則pendingEmits隊列為空,通過這種機制來動態(tài)判斷下游負(fù)載,決定是否觸發(fā)backpressure
- spout使用的spoutWaitStrategy,讀取的是topology.spout.wait.strategy參數(shù)(employed when there is no data to produce),使用的實現(xiàn)類為org.apache.storm.policy.WaitStrategyProgressive,在沒有數(shù)據(jù)發(fā)射的時候使用;具體是使用emittedCount來判斷
- bolt使用的consumeWaitStrategy,在非SystemBoltExecutor的情況下讀取的是topology.bolt.wait.strategy參數(shù)(employed when there is no data in its receive buffer to process),使用的實現(xiàn)類為org.apache.storm.policy.WaitStrategyProgressive,在receive buffer沒有數(shù)據(jù)處理的時候使用;具體是使用receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits)返回的consumeCount來判斷
- spout與bolt不同的還有一點就是spout除了pendingEmitsIsEmpty還多了一個reachedMaxSpoutPending參數(shù),來判斷是否繼續(xù)產(chǎn)生數(shù)據(jù),bolt則使用pendingEmitsIsEmpty來判斷是否可以繼續(xù)消費數(shù)據(jù)
- IWaitStrategy除了WaitStrategyProgressive實現(xiàn),還有WaitStrategyPark實現(xiàn),該策略在bolt是SystemBolt的情況下使用
doc
- IWaitStrategy
- WaitStrategyProgressive
- WaitStrategyPark
轉(zhuǎn)載于:https://my.oschina.net/go4it/blog/2253850
總結(jié)
以上是生活随笔為你收集整理的聊聊storm的IWaitStrategy的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode 225 用队列实现栈(
- 下一篇: Hadoop新手篇:hadoop入门基础