Netty学习笔记(六)Pipeline的传播机制
前面簡單提到了下Pipeline的傳播機制,這里再詳細分析下
Pipeline的傳播機制中有兩個非常重要的屬性inbound和outbound(AbstractChannelHandlerContext的屬性),
inbound為true表示其對應的ChannelHandler實現了ChannelInboundHandler接口
outbound為true表示其對應的ChannelHandler實現了ChannelOutboundHandler接口
Pipeline是一個雙向鏈表,其head實現了ChannelOutboundHandler接口,tail實現了ChannelInboundHandler接口
Netty的傳播事件可以分為兩種:inbound和outbound事件(我簡單理解為輸入和輸出)
上面是Netty官網關于兩個事件的說明: inbound 事件和outbound 事件的流向是不一樣的, inbound 事件從socket.read()開始,其流向是自下而上的,而outbound剛好相反,自上而下,以socket.write()為結束。其中inbound 的傳遞方式是通過調用ChannelHandlerContext.fireIN_EVT()方法,而outbound 的傳遞方式是通過調用ChannelHandlerContext.OUT_EVT()方法
我們來看下Netty源碼里定義的ChannelInboundHandler 和ChannelOutboundHandler
從方法命名發現,ChannelInboundHandler定義的都是類似于回調(響應事件通知)的方法,而ChannelOutboundHandler定義的都是操作(主動觸發請求)的方法
Outbound事件傳播方式
Outbound 事件都是請求事件(request event),即請求某件事情的發生,然后通過 Outbound 事件進行通知。按照官網的說明其傳播方法應該是tail–>handler–>head
以Bootstrap的connect事件為例,分析下其傳播流程
其調用鏈如下:
Bootstrap.connect()
–>Bootstrap.doResolveAndConnect()
–>Bootstrap.doResolveAndConnect0()
–>Bootstrap.doConnect()
–>AbstractChannel.connect(remoteAddress, promise)
–>DefaultChannelPipeline.connect(remoteAddress, promise)
可以看到,這里的connect事件確實是以tail為起點開始傳播的
實際會調用AbstractChannelHandlerContext的如下代碼:
主要做兩件事:
找到下一個outbound的context,然后調用其invokeConnect方法
先來看下findContextOutbound()方法
邏輯很簡單,就是從當前的context節點(這里就是tail節點)開始向前遍歷,直到找到Outbound為true的context并返回
找到一個outbound為true的context之后就會調用其invokeConnect方法,然后會獲取其關聯的ChannelOutboundHandler并調用connect方法
默認調用的是ChannelOutboundHandlerAdapter的connect方法(如果我們重寫了該方法就會調用我們自己的實現,此時如果想要讓其繼續向下傳遞,需要手動調用ctx.connect()),然后又調用了context的connect方法,即又回到了AbstractChannelHandlerContext的connect()方法,開始向前去找下一個滿足outbound為true的context
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}那么這樣的循環什么時候會結束呢,從一開始就說明了Pipeline的head節點是HeadContext,并且其滿足outbound為true這一條件,所以最后一定會走到HeadContext的handler()方法,然后調用對應的connect。
前面說過HeadContext既是ChannelHandlerContext又是ChannelHandler,所以handler()方法返回的就是HeadContext對象,其connect方法如下:
最終調用了unsafe的connect方法,之類的unsafe其實是Pipeline里保存的channel里的unsafe,我們在Channel的初始化的時候看到過
繼續跟下去發現會調用AbstractNioChannel的內部類AbstractNioUnsafe的如下方法
這里的doConnect()方法主要做了兩件事;
1.調用jdk底層的bind方法
2.調用jdk底層的connect方法
然后我們看下這里的fulfillConnectPromise方法
如果在調用doConnect方法之前channel不是active激活狀態,調用后變為激活狀態,那么就會調用pipeline的fireChannelActive方法,將這一事件–激活成功通知下去 (注意下這里的fireXX方法應該是inbound的類型事件)
接下來我們看看這里的inbound事件會怎么傳播
Inbound事件傳播方式
DefaultChannelPipeline.fireChannelActive–>AbstractChannelHandlerContext.invokeChannelActive
public final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;} static void invokeChannelActive(final AbstractChannelHandlerContext next) {//最開始傳入的next對象是head節點,說明Inbound事件確實是從Head開始傳遞的EventExecutor executor = next.executor();if (executor.inEventLoop()) {//調用HeadContext的invokeChannelActive()方法(實際是執行了AbstractChannelHandlerContext里的方法)next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}} private void invokeChannelActive() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelActive(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelActive();}}先執行對應的handler的channelActive方法,這里就是HeadContext的channelActive方法,然后調用context的fireChannelActive繼續向下傳播
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}這里的fireChannelActive做的事情和前面Outbound事件的類似,向后遍歷找到滿足inbound為true的context,再調用invokeChannelActive(next),又回到了開始,是不是和outbound的傳播很類似
不過需要注意的一點,在傳播的過程中會調用對應的ChannelInboundHandler的channelActive(this)方法,如果想要讓事件繼續往下傳播,那么在我們對應的channelActive都需要調用ctx.fireChannelActive向下傳播(就像HeadContext做的那樣);如果我們沒有重寫channelActive方法,默認會執行ChannelInboundHandlerAdapter的channelActive方法,它會幫我們調用fireChannelActive()
public ChannelHandlerContext fireChannelActive() {final AbstractChannelHandlerContext next = findContextInbound();invokeChannelActive(next);return this;}這樣不斷傳播下去,最后會找到TailContext節點,前面說過tail是Pipeline的尾結點并且其inbound屬性為true,那么就會執行TailContext的channelActive方法,如下:
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { }這里就是一個空實現,其實TailContext對于ChannelInboundHandler接口的實現大部分都是空方法,除了下面三個函數
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ReferenceCountUtil.release(evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);}說明對于Inbound事件,如果用戶沒有添加自定義的處理器,那么默認都是不處理的
注意到這里的HeadContext在執行fireChannelActive()向下傳遞之外,還執行了一個方法readIfIsAutoRead(),
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}private void readIfIsAutoRead() {//channelconfig的默認isAutoRead是1也就是開啟自動讀取if (channel.config().isAutoRead()) {//這里的read()會調用 tail.read();從tail向read開始傳遞channel.read();}}我們看到在如下回調方法里也調用了readIfIsAutoRead()
@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();readIfIsAutoRead();}這里的channelReadComplete顧名思義就是完成了channel的數據讀取的回調(是在NioEventLoop的processSelectedKey(SelectionKey k, AbstractNioChannel ch)里調用了unsafe.read()方法時觸發回調的)。
說明在默認情況下,Channel會開啟自動讀取模式的,只要Channel是active的,讀完一波數據之后就繼續向selector注冊讀事件,這樣就可以連續不斷得讀取數據。
關于processSelectedKey方法的調用流程可以參考我的EventLoopGroup的學習筆記
Netty學習筆記(三)EventLoopGroup開篇
Netty學習筆記(四)EventLoopGroup續篇
前面說到TailContext對于ChannelInboundHandler接口的實現大部分都是空方法,我們來看下其中比較重要的兩個方法體不為空的實現exceptionCaught(),channelRead()
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);}/*** Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.*/protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}}如果Inbound事件沒有被用戶自定義的ContextHandler處理,那么就會一直向下傳播,head->tail,最后tail節點會接收到在Pipeline傳播過程中沒有被處理的消息,tail節點就會給我們發出一個警告,告訴我們,它已經將我們未處理的數據給丟掉了
對于未處理的流轉到tail的異常也是這樣處理的,這里的注釋提示說異常流轉到Tail節點是因為Pipeline的最后一個handler沒有處理異常。換句話來說就是如果我們想處理異常,就需要在Pipeline的最后一個非tail節點進行處理,即該handler需要加在自定義節點的最末尾
那么這樣是如何保證我們的異常最終都會進入到這個handler的呢?后面分析一下
Pipeline中的異常傳播和處理
如果要在業務代碼中加入異常處理器,統一處理pipeline過程中的所有的異常,那么該異常處理器需要加載自定義節點的最末尾,如下圖所示
分別以Outbound和Inbound事件來看看異常是怎么在最后一個Handler里被捕捉到并處理的
outBound異常的處理
以 ctx.channel().read()為例
@Overridepublic Channel read() {pipeline.read();return this;}@Overridepublic final ChannelPipeline read() {tail.read();return this;}@Overridepublic ChannelHandlerContext read() {final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeRead();} else {...}return this;}其調用鏈如下:channel.read()–>pipeline.read()–>tail.read()–>tail.invokeRead()
進入Pipeline之后會從tail傳播到head,最后調用HeadContext的read()方法
可以看到對于捕捉到的異常,最后都會調用fireExceptionCaught進行處理,我們看下它的實現
@Overridepublic final ChannelPipeline fireExceptionCaught(Throwable cause) {AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);return this;}下面的流程就是Pipeline的傳播了
調用鏈如下:
DefaultChannelPipeline.fireExceptionCaught()
–>AbstractChannelHandlerContext.invokeExceptionCaught(head,cause)
–>AbstractChannelHandlerContext.invokeExceptionCaught(cause) (此時節點為HeadContext)
–>HeadContext.exceptionCaught(ctx,cause)
–>AbstractChannelHandlerContext.fireExceptionCaught(cause) (向下傳播)
–>AbstractChannelHandlerContext.invokeExceptionCaught(next,cause) (這里的節點就是當前節點的下一個節點)
會一直傳遞直到tail節點,如果我們在最后一個自定義Handler里處理了異常,那么就不會傳播到TailContext,否則TailContext就會執行到如下方法,提示我們該異常未被處理,將被拋棄
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}inBound異常的處理
以前面提到的DefaultChannelPipeline.fireChannelActive()方法為例
以head為起點,會調用Inbound屬性為true節點的invokeChannelActive()方法,在調用context的handler的channelActive方法時會進行 try { … } catch ( Throwable t ) { }
下面的流程就很簡單了,執行InboundContext handler的exceptionCaught()方法,ChannelInboundHandlerAdapter幫我們實現了該接口方法,如果我們沒有重寫對應的方法,會繼續向下傳播,這里的invokeExceptionCaught和上面的Outbound中的異常在傳播過程中執行到的方法是同一個,最終也會向Tail傳播
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {invokeExceptionCaught(next, cause);return this;}所以,我們就可以定義這樣一個ExceptionCaughtHandler 來處理Inbound和Outbound的異常
public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //TODO 異常處理System.out.println("打印異常通知");} }我覺得這里ExceptionCaughtHandler可以是InboundHandler或者是OutboundHandler,因為在異常傳播的時候并沒有像其他Inbound和OutBound事件那樣對context的 inbound和outbound屬性有限制
總結
Outbound事件
Inbound事件
總結
以上是生活随笔為你收集整理的Netty学习笔记(六)Pipeline的传播机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty学习笔记(五)Pipeline
- 下一篇: 单元测试之带你搞懂Mockito使用