Netty学习笔记(三)EventLoopGroup开篇
?
使用Netty都需要定義EventLoopGroup,也就是線程池
前面講過在客戶端只需要一個EventLoopGroup就夠了,而在服務端就需要兩個Group--bossGroup和workerGroup,這與Netty的線程模型有關(guān),使用的是主從Reactor多線程模型?,兩個線程池,一個用于監(jiān)聽端口,創(chuàng)建新連接(boosGroup),一個用于處理每一條連接的數(shù)據(jù)讀寫和業(yè)務邏輯(workerGroup)
以下的代碼里都去掉了一些try...catch和非核心代碼,只保留了主要的代碼流程
EventLoopGroup初始化
其類圖如下所示:
可以發(fā)現(xiàn)EventLoopGroup都實現(xiàn)了ScheduledExecutorService,本質(zhì)是一個帶有schedule的線程池
NioEventLoopGroup有很多重載的構(gòu)造方法,最后都調(diào)用了如下方法:
調(diào)用其父類MultithreadEventLoopGroup的構(gòu)造方法:?
private static final int DEFAULT_EVENT_LOOP_THREADS;static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));} protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);}這里會判斷當前nThreads是否為0,如果為0的話則使用默認的Threads數(shù),其實就是處理器核心數(shù)*2 ,我的demo里都沒有指定線程數(shù),那么最終生成的EventLoopGroup的線程數(shù)就處理器核心數(shù)*2
再跟蹤下去,最后會調(diào)用MultithreadEventExecutorGroup的如下構(gòu)造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;}chooser = chooserFactory.newChooser(children);}上面的代碼會先創(chuàng)建一個executor,然后再初始化一個EventExecutor數(shù)組(長度就是nThreads),然后調(diào)用newChild對每個元素進行初始化,然后調(diào)用newChooser方法創(chuàng)建一個chooser
先看下這里的executor的創(chuàng)建,其實就是創(chuàng)建一個Executor的實例對象,對于execute傳入的command,都會創(chuàng)建一個線程并啟動來執(zhí)行,線程id為poolName + '-' + poolId.incrementAndGet() + '-'+ nextId.incrementAndGet()
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();} }這里的newChild方法,就是實例化一個?NioEventLoop 對象, 并返回,所以EventLoopGroup里的每一個元素都是NioEventLoop,源碼如下:
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}看下這里NioEventLoop的類圖:注意下這里的NioEventLoop是實現(xiàn)了SingleThreadEventExecutor,參數(shù)Executor最后也會保存在該類的executor屬性字段里
接下來看下newChooser方法的實現(xiàn) : 如果executor,length是2的冪次其實就是nThreads是2的冪次,那么就會使用PowerOfTowEventExecutorChooser來進行選擇,否則就使用普通的選擇器
public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}兩個選擇器實現(xiàn)的區(qū)別在于獲取下一個EventExecutor的方法next(),普通選擇器是對idx遞增后對nThreads取模
PowerOfTow實現(xiàn)的也是這個邏輯,只不過使用了位運算符,運算速度更快
總結(jié)下EventLoopGroup的初始化:
- EventLoopGroup的父類MultithreadEventExecutorGroup內(nèi)部維護一個類型為 EventExecutor的 線程數(shù)組, 其大小是 nThreads
- 如果實例化NioEventLoopGroup 時,沒有指定默認值nThreads就等于處理器*2
- MultithreadEventExecutorGroup 中通過newChild()抽象方法來初始化 children 數(shù)組,每個元素都是NioEventLoop
- 根據(jù)nThreads數(shù)選擇不同的chooser
EventLoopGroup執(zhí)行
在ServerBootstrap 初始化時,調(diào)用了serverBootstrap.group(bossGroup,workerGroup)設置了兩個EventLoopGroup,我們跟
蹤進去以后會看到:
這個方法初始化了兩個字段,一個是在 super.group(parentGroup)中完成初始化,另一個是通過this.childGroup = childGroup,分別將bossGroup和workerGroup保存在AbstractBootstrap的group屬性和ServerBootstrap的childGroup屬性
接著從應用程序的啟動代碼 serverBootstrap.bind()來監(jiān)聽一個本地端口
通過bind方法會調(diào)用eventLoop()的execute()方法,最后會進入SingleThreadEventExecutor的execute()方法
SingleThreadEventExecutor對于添加進來的task,會判斷當前執(zhí)行的currentThread是否等于SingleThreadEventExecutor的thread,如果第一次添加或者當前調(diào)用的線程不是SingleThreadEventExecutor的thread,inEventLoop()就會返回false,就會先執(zhí)行啟動當前SingleThreadEventExecutor的startThread()方法再添加task到任務隊列(LinkedBlockingQueue);否則就直接添加任務到任務隊列
private final Queue<Runnable> taskQueue;public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}//對于有新任務添加,就會執(zhí)行wakeupif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}簡單來說,這里的inEventLoop()就是判斷當前線程是否是reactor線程,這樣的作用是:
1.讓task只在reactor線程進行,保證單線程
2.第一次判斷會幫我們啟動reactor線程
這里的startThread()就是通過一個標志判斷reactor線程是否已啟動,如果沒有啟動就執(zhí)行doStartThread來啟動,
SingleThreadEventExecutor 在執(zhí)行doStartThread()方法的時候,會調(diào)用executor的execute方法,會將調(diào)用NioEventLoop(SingleThreadEventExecutor 的子類)的run方法封裝成一個Runnable讓線程池executor去執(zhí)行(還會將當前線程保存在SingleThreadEventExecutor的thread屬性字段里)。這里的executor就是前面講到的ThreadPerTaskExecutor ,它的execute會對每個傳入的Runnable創(chuàng)建一個FastThreadLocalThread線程對象并調(diào)用它的start方法去執(zhí)行
?通過前面的分析我們可以看出,最終執(zhí)行的主體方法是:NioEventLoop的run方法,那么我們看下這里的run方法到底執(zhí)行了什么
@Overrideprotected void run() {for (;;) {try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT://select輪詢, 設置wakenUp為false并返回之前的wakenUp值select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}//去除了無關(guān)緊要的代碼processSelectedKeys();runAllTasks(); } catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception....}}先看下這里的策略選擇
@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}如果任務隊列里沒有task,就返回策略SELECT,否則就執(zhí)行selectSupplier.get(),實際就是執(zhí)行了一次selectNow(非阻塞)方法并返回
可以看到,上面的代碼是一個死循環(huán),做的事情主要是以下三個:
- 輪詢注冊到reactor線程上的對應的selector的所有channel的IO事件
- 根據(jù)不同的SelectKeys進行處理??processSelectedKeys();
- 處理任務隊列 runAllTasks(); ??
輪詢Select
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;//第一個退出條件if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.//第二個退出條件 if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;//第三個退出條件if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}...}不難看出這里的select是一個死循環(huán),它的退出條件有三種:
- 距離當前截止時間快到了(<=0.5ms)就跳出循環(huán),如果此時還沒有執(zhí)行select,就執(zhí)行一次selectNow
- 如果任務隊列里有任務需要執(zhí)行就退出(避免由于select阻塞導致任務不能及時執(zhí)行),退出前也執(zhí)行一下selectNow
- selector.select(XX)的阻塞被喚醒后,如果滿足上面的條件就會退出(selectedKeys不為0,任務隊列里有任務等)
前面提到過,如果SingleThreadEventExecutor執(zhí)行execute(Runnable task)添加任務會執(zhí)行wakeup方法,然后會執(zhí)行NioEventLoop重寫的wakeup方法
@Override public void execute(Runnable task) {//addTaskWakesUp 默認是false 如果是外部線程添加的,inEventLoop就會是falseif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);} }當inEventLoop為false,并且wakenUp變量CAS操作成功(由false變?yōu)閠rue,保證線程安全),則調(diào)用selector.wakeup()喚醒阻塞的select方法
@Overrideprotected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup();}}Netty解決JDK空輪訓Bug? ? ??
出現(xiàn)此 Bug 是因為當 Selector 的輪詢結(jié)果為空,也沒有wakeup 或新消息處理,則發(fā)生空
輪詢,CPU 使用率達到100%,導致Nio Server不可用,Netty通過一種巧妙的方式來避開了這個空輪詢問題
從上面的代碼中可以看出,Selector每一次輪詢都會進行計數(shù),selectCnt++,開始輪詢和輪詢完成都會把當前時間戳賦值給currentTimeNanos和time,兩個時間的時間差就是本次輪詢消耗的時間
如果持續(xù)的時間大于等于timeoutMillis(輪詢的時間),說明就是一次有效的輪詢,重置selectCnt標志,否則,表明該阻塞方法并沒有阻塞這么長時間,可能觸發(fā)了jdk的空輪詢bug,當空輪詢的次數(shù)超過一個閥值的時候,默認是512,就開始重建selector
public void rebuildSelector() {final Selector oldSelector = selector;final Selector newSelector;newSelector = openSelector();int nChannels = 0;for (;;) {try {for (SelectionKey key: oldSelector.keys()) {Object a = key.attachment();if (!key.isValid() || key.channel().keyFor(newSelector) != null) {continue;}int interestOps = key.interestOps();key.cancel();SelectionKey newKey = key.channel().register(newSelector, interestOps, a);if (a instanceof AbstractNioChannel) {// Update SelectionKey((AbstractNioChannel) a).selectionKey = newKey;}nChannels ++;}} catch (ConcurrentModificationException e) {// Probably due to concurrent modification of the key set.continue;}break;}selector = newSelector;oldSelector.close();}rebuildSelector主要做了三件事:
- 創(chuàng)建一個新的 Selector。
- 將原來Selector 中注冊的事件全部取消。
- 將可用事件重新注冊到新的 Selector 中,并激活。
參考:?
netty源碼分析之揭開reactor線程的面紗
Netty 源碼分析-EventLoop
總結(jié)
以上是生活随笔為你收集整理的Netty学习笔记(三)EventLoopGroup开篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty学习笔记(一)Netty客户端
- 下一篇: 看完这篇文章,我奶奶都懂了https的原