Netty学习笔记(四)EventLoopGroup续篇
前面講到Reactor的核心是執(zhí)行了NioEventLoop的run方法,主要做了上面三件事:
- 輪詢注冊到reactor線程上的對應(yīng)的selector的所有channel的IO事件
- 根據(jù)不同的SelectKeys進(jìn)行處理??processSelectedKeys();
- 處理任務(wù)隊(duì)列 runAllTasks(); ??
接下來再詳細(xì)看下processSelectedKeys()和runAllTasks(); ?方法做了什么
processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}這里的processSelectedkeys()方法會根據(jù)selectedKeys是否為空,判斷執(zhí)行優(yōu)化后的processSelectedKeysOptimized()還是普通的processSelectedKeysPlain()方法
這里的selectedKeys Netty在調(diào)用openSelector時對其進(jìn)行了優(yōu)化
private SelectedSelectionKeySet selectedKeys;private Selector openSelector() {final Selector selector;selector = provider.openSelector();final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");selectedKeysField.setAccessible(true);publicSelectedKeysField.setAccessible(true);selectedKeysField.set(selector, selectedKeySet);publicSelectedKeysField.set(selector, selectedKeySet);return null; }});selectedKeys = selectedKeySet;return selector;}先創(chuàng)建一個空的SelectedSelectionKeySet對象,然后通過反射獲取jdk?底層的Selector 的class 對象的 selectedKeys和publicSelectedKeys字段,并將Netty的SelectedSelectionKeySet通過反射賦值,這樣在底層調(diào)用jdk的api存儲注冊事件時,最后都會把事件保存到Netty的SelectedSelectionKeySet 對象里
可以看下替換前后有什么區(qū)別,jdk底層的SelectImpl對象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>類型,而Netty里的SelectedSelectionKeySet對象是這樣的一個結(jié)構(gòu):
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {private SelectionKey[] keysA;private int keysASize;private SelectionKey[] keysB;private int keysBSize;private boolean isA = true;@Overridepublic boolean add(SelectionKey o) {if (o == null) {return false;}//添加元素到數(shù)組的最后,如果數(shù)組滿了,就進(jìn)行擴(kuò)容(*2)if (isA) {int size = keysASize;keysA[size ++] = o;keysASize = size;if (size == keysA.length) {doubleCapacityA();}} else {...}return true;}//移除對應(yīng)的SelectionKey數(shù)組的最后一個元素SelectionKey[] flip() {if (isA) {isA = false;keysA[keysASize] = null;keysBSize = 0;return keysA;} else {...}}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<SelectionKey> iterator() {throw new UnsupportedOperationException();} }SelectedSelectionKeySet是AbstractSet的一個子類,底層通過SelectionKey[]數(shù)組方法實(shí)現(xiàn),并且將一些不需要的方法remove,contains方法進(jìn)行重寫,Netty里輪詢事件的時候?qū)Σ僮鬟M(jìn)行了簡化,不需要通過集合的Iterator進(jìn)行移除,而直接通過flip方法去掉集合的最后一個SelectionKey就可以了(這樣的操作的時間復(fù)雜度更低,可以直接定位到具體的下標(biāo)),而我們在使用NIO的API的時候都需要進(jìn)行remove操作
4.1.6.Final中的源碼,這里的SelectionKey是兩個數(shù)組交替遍歷的,在4.1.9.Final 版本中,netty已經(jīng)將SelectedSelectionKeySet底層使用一個數(shù)組了:SelectedSelectionKeySet
接著來看下?processSelectedKeysOptimized(selectedKeys.flip());方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//拿到SelectionKey的attachment,并根據(jù)其類型做不同處理final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {//如果需要重新select,就將selectedKeys的元素都置為null恢復(fù)初始的狀態(tài)for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again. selectedKeys = this.selectedKeys.flip();i = -1;}}}上述過程可以分為三步:
- 取出SelectionKey(包含channel,attachment等信息)
-
這里看到SelectionKey的attachment類型可能是AbstractNioChannel,猜測是不是在注冊事件的時間添加的,根據(jù)ServerBootstrap的啟動流程,最后會調(diào)用AbstractNioChannel的如下方法:
selectionKey = javaChannel().register(eventLoop().selector, 0, this);這里的最后一個參數(shù)也就是attachment,當(dāng)前對象不就是AbstractNioChannel的子類
- 處理SelectionKey
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;eventLoop = ch.eventLoop();if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}int readyOps = k.readyOps();//連接建立事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;//1.將連接事件從interestOps中移除k.interestOps(ops);//2.調(diào)用pipeline().fireChannelActive()將連接建立完成通知給pipeline中的各個handler unsafe.finishConnect();}//可寫事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();} //可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}}
可以看出這里就是一系列NIO的操作,分別對OP_READ, 可讀事件,?OP_WRITE, 可寫事件,?OP_CONNECT, 連接事件進(jìn)行處理
以O(shè)P_READ事件為例
public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;do {//1.分配ByteBufbyteBuf = allocHandle.allocate(allocator);//2.從Channel讀取數(shù)據(jù)allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;break;}allocHandle.incMessagesRead(1);readPending = false;//3.通過pipeline.fireChannelRead事件通知給pipeline里的各個handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}}}- 判斷是否需要重新Select并重置
這里的cancelledKeys會在調(diào)用cancel(SelectionKey)刪除注冊事件的時候計(jì)數(shù),當(dāng)他大于CLEANUP_INTERVAL(256)的時候,就會將needsToSelectAgain設(shè)置為true,進(jìn)入對應(yīng)的分支判斷,先將原來的selectedKeys都置為Null,然后重新調(diào)用selectNow(),重新填充selectedKeys
總結(jié):
netty的NioEventLoop線程第二步做的事情就是處理SelectionKey,netty使用數(shù)組替換掉jdk原生的HashSet來優(yōu)化查詢和更新SelectionKey的效率,每個SelectionKey上綁定了netty類AbstractNioChanne的具體實(shí)現(xiàn)子類對象作為attachment,在處理每個SelectionKey的時候,就可以找到對應(yīng)的AbstractNioChannel,最后通過pipeline來處理通知給其他Handler
任務(wù)執(zhí)行runAllTasks
任務(wù)添加
添加普通任務(wù)
前面的分析說過NioEventLoop 是Netty的核心線程,其添加任務(wù)是通過執(zhí)行父類SingleThreadEventExecutor的execute方法,
通過addTask方法,將Runnable(即task)添加到對應(yīng)的任務(wù)隊(duì)列?Queue<Runnable> taskQueue;里
Netty的源碼里的bind()流程中有通過如下方法添加對應(yīng)的task到SingleThreadEventExecutor的任務(wù)隊(duì)列里,如下:
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}用戶也可以通過如下方式自己添加task到TaskQueue
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() {@Overridepublic void run() {//TODO } });添加定時任務(wù)
除了上述方式,我們還可以通過如下方法添加定時任務(wù)到對應(yīng)的任務(wù)隊(duì)列
EventLoop eventLoop = channel.eventLoop(); eventLoop.schedule(new Runnable() {@Overridepublic void run() {//TODO } }, 30, TimeUnit.SECONDS);具體的實(shí)現(xiàn)是在父類AbstractScheduledEventExecutor里,看下對應(yīng)的源碼
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {...if (delay < 0) {throw new IllegalArgumentException(String.format("delay: %d (expected: >= 0)", delay));}return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task);}});}return task;}會將對應(yīng)的Runnable和延遲時間封裝成一個新的ScheduledFutureTask,然后調(diào)用重載的schedule方法,將對應(yīng)的task添加到PriorityQueue<ScheduledFutureTask<?>>的優(yōu)先隊(duì)列里
這里對添加定時任務(wù)的Thread進(jìn)行了判斷,如果調(diào)用的發(fā)起方是reactor線程,那么就直接將Task添加到優(yōu)先隊(duì)列中;如果是外部線程調(diào)用的schedule,會將"添加定時任務(wù)到優(yōu)先隊(duì)列"封裝成一個Runnable也就是新的task,然后調(diào)用上面的execute方法去添加任務(wù),這樣會訪問PriorityQueue的就只有reactor線程了,變成了單線程
接下來我們來詳細(xì)看下這個特殊的優(yōu)先隊(duì)列PriorityQueue<ScheduledFutureTask<?>>,所謂的優(yōu)先隊(duì)列與普通隊(duì)列的區(qū)別在于每個元素都被賦予了優(yōu)先級。當(dāng)訪問元素時,會將具有最高優(yōu)先級的元素最先彈出。即優(yōu)先隊(duì)列具有最高級先出的特征。
看下這個優(yōu)先隊(duì)列里的元素ScheduledFutureTask,它實(shí)現(xiàn)了Comparable接口,定義了自己的compareTo方法,先比較deadlineNanos(也就是截止時間)的大小,如果一樣則比較id,如果也相同就拋出異常
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {private static final AtomicLong nextTaskId = new AtomicLong();private final long id = nextTaskId.getAndIncrement();@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}} }既然ScheduledFutureTask本質(zhì)也是一個Runnable,那么就看下它的run方法吧
這里對于不同的類型任務(wù)進(jìn)行了不同的處理,periodNanos=0表示是只執(zhí)行一次的任務(wù),>0 表示是按照指定頻率定期執(zhí)行的任務(wù),<0表示是每次執(zhí)行完成后,延遲一段時間再次執(zhí)行的任務(wù)(二者的區(qū)別在于一個是根據(jù)上次任務(wù)開始執(zhí)行的時間計(jì)算間隔,一個是按照上次任務(wù)執(zhí)行結(jié)束的時間計(jì)算間隔)
Task任務(wù)的執(zhí)行
有兩個重載的runAllTasks方法,一個無參,一個帶有l(wèi)ong timeoutNanos參數(shù),先來看下無參的方法
protected boolean runAllTasks() {assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {fetchedAll = fetchFromScheduledTaskQueue();if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}afterRunningAllTasks();return ranAtLeastOne;}主要做下面三件事情:
1.將優(yōu)先隊(duì)列里的ScheduledFutureTask取出放到taskQueue里
2.從taskQueue里取出task并執(zhí)行
3.task任務(wù)執(zhí)行完畢后執(zhí)行后置處理邏輯
將任務(wù)從優(yōu)先隊(duì)列移動到taskQueue
private boolean fetchFromScheduledTaskQueue() {long nanoTime = AbstractScheduledEventExecutor.nanoTime();Runnable scheduledTask = pollScheduledTask(nanoTime);while (scheduledTask != null) {if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask = pollScheduledTask(nanoTime);}return true;}protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;}先從scheduledTaskQueue優(yōu)先隊(duì)列里拿到對應(yīng)優(yōu)先級最高的task(截止時間最近的Task),判斷當(dāng)前是否已到達(dá)其截止時間,是的話就將其從優(yōu)先隊(duì)列中取出并刪除元素,然后將其加入到taskQueue中,如果加入失敗就重新加入到scheduledTaskQueue中,一直到所有的優(yōu)先隊(duì)列里的task都遷移成功
簡單來說就是把已經(jīng)到期的定時任務(wù)從PriorityQueue轉(zhuǎn)移到taskQueue
執(zhí)行task
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}}protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}}protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}從taskQueue中取出非WAKEUP_TASK的任務(wù),然后調(diào)用safeExecute() --內(nèi)部之間調(diào)用task.run()來安全執(zhí)行所有的task,一直到所有的task都執(zhí)行完畢
后置處理
@Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}當(dāng)所有的task執(zhí)行完畢之后,我們還可以執(zhí)行一些自己的task,通過afterRunningAllTasks方法來執(zhí)行在tailTasks隊(duì)列里的所有任務(wù),我們可以通過SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要執(zhí)行的業(yè)務(wù)邏輯
task的執(zhí)行還有一個帶有超時時間的重載方法,如下:
protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();//從taskQueue poll獲取任務(wù)Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}//計(jì)算當(dāng)前方法超時的截止時間final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task);runTasks ++;//位運(yùn)算,說明runTasks是64的倍數(shù) 0x3F=0011 1111 (63)if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}基本思路和不帶參數(shù)的runAllTasks一樣,區(qū)別在于會考慮所有任務(wù)執(zhí)行的超時時間,為了提高執(zhí)行效率,每執(zhí)行64個任務(wù)都會比較下當(dāng)前時間是否大于runAllTasks的截止時間,是的話就退出
從上面可以看出,我們的EventLoopGroup?既需要執(zhí)行 IO 操作, 又需要執(zhí)行 很多的task, 因此在調(diào)用對應(yīng)execute 方法添加任務(wù)的時候, 不要提交耗時任務(wù), 更不能提交一些會造成阻塞的任務(wù), 不然會導(dǎo)致我們的 IO 線程得不到調(diào)度, 影響整個程序的并發(fā)量
總結(jié)一下:
- netty內(nèi)的任務(wù)可分為普通任務(wù)和定時任務(wù),分別保存在LinkedBlockingQueue和PriorityQueue
- netty執(zhí)行任務(wù)之前,會將已經(jīng)到期的定時任務(wù)從PriorityQueue轉(zhuǎn)移到LinkedBlockingQueue
- 如果執(zhí)行任務(wù)有超時時間,那么會每執(zhí)行64個任務(wù)校驗(yàn)下是否達(dá)到截止時間
參考:
netty源碼分析之揭開reactor線程的面紗(二)
netty源碼分析之揭開reactor線程的面紗(三)
Netty 源碼分析-EventLoop???????
總結(jié)
以上是生活随笔為你收集整理的Netty学习笔记(四)EventLoopGroup续篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 14 发布了,终于可以扔掉Lo
- 下一篇: Netty学习笔记(五)Pipeline