netty之心跳机制
目錄
一、前言
二、netty的心跳工具
三、IdleStatehandler
1、構造方法
2、handlerAdded
3、定時任務
4、讀事件空閑
5、寫事件空閑
一、前言
心跳機制就是定時的給對端發送特殊的數據包 , 對端收到后回復特殊的數據包 , 這一次往返的ping-pong過程 , 就是一次心跳,心跳的目的是為了讓雙方感知 ,對方還活著。
TCP協議層也是有心跳機制的 , 但是他的心跳是2個小時 ,且依賴底層操作系統 ,整體來講不是很靈活 , 所以一般都是在應用層自由實現。
二、netty的心跳工具
netty 提供了IdleStateHandler , ReadTimeoutHandler , WriteTimeoutHandler 三個工具類來監測鏈接的存活性 , 當然 , 我們也可以自由實現。
| 1 | IdleStateHandler | 監測鏈接的讀、寫、讀寫空閑時間是否超過了配置的指定時間 , 如果超過了,則觸發一個IdleStateEvent事件 ,我們可以通過重寫 ChannelInboundHandler.userEventTrigged 方法來做處理。 |
| 2 | ReadTimeouthandler | 指定時間內,沒有發生讀事件 , 則拋出異常,并自動關閉鏈接 , 我們可以在execeptionCaught方法中處理這個異常 |
| 3 | WriteTimeoutHandler | 同上 , 只不過關心的是寫事件 |
總結 : 這兩類工具 , 整體來講 , 各有千秋:
IdleStatehandler 是以事件的方式 ,方式上比較優雅, 讓服務感知到對端的鏈接空閑了, 具體如何操作 , 全看我們自己
Read/WriteTimeoutHandler已異常的方式處理 , 簡單有效 , 且直接關閉了鏈接
三、IdleStatehandler
當連接的空閑時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。然后,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件
1、構造方法
private final boolean observeOutput;// 是否考慮出站時較慢的情況。默認值是false(不考慮)。 private final long readerIdleTimeNanos; // 讀事件空閑時間,0 則禁用事件 private final long writerIdleTimeNanos;// 寫事件空閑時間,0 則禁用事件 private final long allIdleTimeNanos; //讀或寫空閑時間,0 則禁用事件2、handlerAdded
當該handler被添加到pipeline時 , 會調用initialize方法
給定的監測時間大于0 , 就會被創建周期調度任務 。 同時 ,將state狀態設置為1, 防止重復初始化。
private void initialize(ChannelHandlerContext ctx) {//當channel被銷毀時 或者 已經初始化過 , 不在初始化這個方法 , 直接返回switch (state) {case 1:case 2:return;}state = 1;//初始化 寫狀態的部分監測initOutputChanged(ctx); ?lastReadTime = lastWriteTime = ticksInNanos();//添加了三個調度任務 if (readerIdleTimeNanos > 0) {// 這里的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (writerIdleTimeNanos > 0) {writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (allIdleTimeNanos > 0) {allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),allIdleTimeNanos, TimeUnit.NANOSECONDS);} }調用initOutputChanged方法 , 初始化 監控出棧數據屬性
private void initOutputChanged(ChannelHandlerContext ctx) {if (observeOutput) {Channel channel = ctx.channel();Unsafe unsafe = channel.unsafe();ChannelOutboundBuffer buf = unsafe.outboundBuffer();// 記錄了出站緩沖區相關的數據,buf 對象的 hash 碼,和 buf 的剩余緩沖字節數if (buf != null) {lastMessageHashCode = System.identityHashCode(buf.current());lastPendingWriteBytes = buf.totalPendingWriteBytes();}} }observeOutput 針對出棧較慢的情況監測 。記錄最后一次輸出消息的相關信息 , 并使用一個值firstXXXXIdleEvent標識是否再次活動過,每次讀寫活動都變更其為ture ,那么如果是false ,說明這段時間沒有發生過讀寫 。
同時 , 如果第一次記錄的出棧數據 和 第二次得到的不同 , 說明出棧緩慢 , 則不觸發空閑時間。
3、定時任務
抽象父任務: AbstractIdleTask
讀空閑任務:ReaderIdleTimeoutTask
寫空閑任務:WriterIdleTimeoutTask
讀寫空閑任務:AllIdleTimeoutTask
private abstract static class AbstractIdleTask implements Runnable {public void run() {// 當通道不是打開狀態 , 不再執行if (!ctx.channel().isOpen()) {return;} ?run(ctx);} ? ? }4、讀事件空閑
protected void run(ChannelHandlerContext ctx) {//重新計算下次任務調度的間隔時間long nextDelay = readerIdleTimeNanos;if (!reading) {//配置的時間 - (最后一次讀取 距離目前的時間差)nextDelay -= ticksInNanos() - lastReadTime;}//最新的調度時間 <=0 說明 , 空閑了 , if (nextDelay <= 0) {// 發生空閑了 , 添加下一次的調度任務readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); ?boolean first = firstReaderIdleEvent;firstReaderIdleEvent = false; ?try {// 再次提交任務IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);// 觸發用戶 handler usechannelIdle(ctx, event);} catch (Throwable t) {ctx.fireExceptionCaught(t);}} else {// Read occurred before the timeout - set a new timeout with shorter delay.// 使用最新的調度時間 , 重新入調度隊列readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);} }5、寫事件空閑
protected void run(ChannelHandlerContext ctx) {// 最后一次寫的時間long lastWriteTime = IdleStateHandler.this.lastWriteTime;// 計算最新的任務調度時間long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);// 空閑了if (nextDelay <= 0) {// Writer is idle - set a new timeout and notify the callback.// 重新入調度隊列writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); ?boolean first = firstWriterIdleEvent;firstWriterIdleEvent = false; ?try {// 判斷下是不是出棧慢的情況 ,如果是 不觸發空閑事件if (hasOutputChanged(ctx, first)) {return;}//觸發一個事件IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);channelIdle(ctx, event);} catch (Throwable t) {ctx.fireExceptionCaught(t);}} else {// Write occurred before the timeout - set a new timeout with shorter delay.// 以最新時間 重新入調度隊列writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);} }總結
以上是生活随笔為你收集整理的netty之心跳机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MiniSnail子模块的规划
- 下一篇: React Native发布——使用Ap