java nio wakeup_Java NIO wakeup实现原理
最近在閱讀netty源碼時,很好奇Java NIO中Selector的wakeup()方法是如何喚醒selector的,于是決定深扒一下wakeup機制的實現原理,相信對學習NIO是大有裨益的。
眾所周知,selector.select()是阻塞的,通常情況下,只有注冊在selector上的channel有事件就緒時,select()才會從阻塞中被喚醒,處理就緒事件。那么,當selector上的channel無就緒事件時,如果想要喚醒阻塞在select()操作上的線程去處理一些別的工作,該如何實現呢?事實上Selector提供了這樣的API:
public abstract Selector wakeup();
wakeup()實現的功能:
如果一個線程在調用select()或select(long)方法時被阻塞,調用wakeup()會使線程立即從阻塞中喚醒;如果調用wakeup()期間沒有select操作,下次調用select相關操作會立即返回,不會執行poll(),包括調用selectNow()。
在Select期間,多次調用wakeup()與調用一次效果是一樣的。
注意:如果調用wakeup()期間沒有select操作,后續若先調用一次selectNow(),再次調用select()則會導致阻塞。
以上描述了wakeup()的功能,那么JAVA NIO中是如何實現這個機制的呢?下面以windows環境為例,結合源碼來探究這個問題。
通常我們會使用Selector.open()方法創建一個選擇器對象,SelectorProvider負責根據不同操作系統來返回不同的實現類,windows平臺就返回WindowsSelectorProvider,然后再調用其openSelector()。
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
從WindowsSelectorProvider的openSelector()可知,其作用是創建一個WindowsSelectorImpl對象:
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
WindowsSelectorImpl就是Selector接口的最終實現類,我們來看看其構造方法都做了什么:
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);//禁用Nagle算法
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
從源碼可知,實例化WindowsSelectorImpl時,會調用Pipe.open()創建一個管道實例wakeupPipe,并從wakeupPipe中獲取wakeupSourceFd和wakeupSinkFd兩個文件描述符,wakeupSourceFd為read端FD,wakeupSinkFd為write端FD,然后將wakeupSourceFd加入pollWrapper中。
我們知道,pollWrapper的作用是保存當前selector對象上注冊的FD,當調用Selector的select()方法時,會將pollWrapper的內存地址傳遞給內核,由內核負責輪訓pollWrapper中的FD,一旦有事件就緒,將事件就緒的FD傳遞回用戶空間,阻塞在select()的線程就會被喚醒。將wakeupSourceFd加入pollWrapper中,表示selector也需要關注wakeupSourceFd上發生的事件,而誰會處理該事件呢?我們先了解下Pipe吧。
從廣義上說,管道就是一個用來在兩個實體之間單向傳輸數據的導管。在Unix系統中,管道被用來連接一個進程的輸出和另一個進程的輸入。Java使用Pipe類實現了一個管道范例,只不過它創建的管道是進程內(JVM進程內部)而非進程間使用的。
Pipe實現的管道由一個可寫的SinkChannel和一個可讀的SourceChannel組成,這兩個Channel的遠端是連接起來的,使得一旦將一些字節寫入到SinkChannel,就可以在SourceChannel按寫入順序讀取這些字節。下面我們看看SinkChannel和SourceChannel類繼承結構圖:
從圖中我們知道幾點:
SinkChannel和SourceChannel都擴展了AbstractSelectableChannel,因此都支持被注冊到一個Selector上;
SourceChannel只實現了ReadableByteChannel,因此只支持讀操作;同時實現了ScatteringByteChannel,具有將通道中的數據分散到多個緩沖區的能力(矢量I/O);
SinkChannel只實現了WritableByteChannel,因此只支持寫操作;同時實現了GatheringByteChannel,具有將多個緩沖區的數據聚集到該通道的能力(矢量I/O);
SinkChannel和SourceChannel的實現類都實現了SelChImpl,因此都能獲取通道相關聯的文件描述符FD;
SourceChannel和SinkChannel內部通過聚合SocketChannel來完成讀和寫相關的操作。
下面我們繼續分析Pipe.open()的實現;
Pipe.open()最終會創建一個PipeImpl實例:
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
PipeImpl構造方法中會創建一個內部類Initializer實例,并調用它的run方法:
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
......
}
Initializer的run方法則會創建內部LoopbackConnector的實例,并調用它的run方法,其主要作用是建立一條本地環回連接,看實現:
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// 環回地址
InetAddress lb = InetAddress.getByName("127.0.0.1");
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// 綁定ServerSocketChannel到環回地址上的一個端口
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
//建立連接
sc1 = SocketChannel.open(sa);
ByteBuffer bb = ByteBuffer.allocate(8);
long secret = rnd.nextLong();
bb.putLong(secret).flip();
sc1.write(bb);
// 獲取連接并校驗合法性
sc2 = ssc.accept();
bb.clear();
sc2.read(bb);
bb.rewind();
if (bb.getLong() == secret)
break;
sc2.close();
sc1.close();
}
// 創建source通道和sink通道
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
run方法完成的功能:
使用本地環回地址“127.0.0.1”創建一個InetAddress實例lb。“127.0.0.1”是一個保留地址,主要用于環回測試,也就是說,目的地址為環回地址的IP數據包永遠都不會出現在任何網絡中;
創建一個ServerSocketChannelImpl實例ssc,為該通道綁定一個唯一的文件描述符FD;
使用lb和0號端口創建一個InetSocketAddress實例,并將該實例綁定到服務端socket通道上。這里使用了系統預留的0號端口,主要是為了避免寫死端口號,操作系統會從動態端口號范圍內搜索接下來可以使用的端口號作為服務端socket通道的監聽端口;
使用lb和ssc上綁定的端口號創建一個InetSocketAddress實例sa,再用sa創建一個SocketChannel實例,并為該通道綁定一個唯一的文件描述符FD;
客戶端socket通道創建成功后會調用connect嘗試建立socket連接,由于當前處于阻塞模式,因此connect會阻塞直到成功建立連接或發生IO錯誤;
創建一個8字節的ByteBuffer對象,填充一個隨機long值,然后將緩沖區的數據寫入通道sc1;
調用ssc的accept()方法,accept方法會創建一個新的SocketChannel實例,并綁定一個唯一的文件描述符FD,然后使用這個SocketChannel實例讀取數據;
比較發送的數據和接收的數據是否相等,若相等,使用sc1創建SourceChannelImpl實例作為管道的source端,使用sc2創建SinkChannelImpl實例作為管道的sink端;
最后調用close()關閉ServerSocketChannel,這樣ServerSocketChannel就不會接受新的連接,同時釋放綁定在該通道上的FD。
到此,一個管道被成功建立,這個管道的兩端為兩個通道,SourceChannel作為read端,而SinkChannel為write端,兩個通道之間通過TCP進行連接,這樣使得在SinkChannel端寫入的數據SourceChannel端可以立馬讀取。
Java中Pipe實現的管道僅用于在同一個Java虛擬機內部傳輸數據。實際應用中,使用管道在線程間傳輸數據也是一種不錯的方案,它為我們提供了良好的封裝性。
現在,回到WindowsSelectorImpl的構造方法中,我們知道,創建一個Selector實例時,還會創建一個管道Pipe實例,并將管道source端wakeupSourceFd加入pollWrapper中,作為第一個注冊到Selector的FD,并設置感興趣的事件為Net.POLLIN,表示對可讀事件感興趣。當Selector在輪訓pollWrapper中的FD時,如果wakeupSourceFd發生read事件,那么Selector就會被喚醒,這就是wakeup()的實現原理。看wakeup()實現:
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
首先判斷interruptTriggered,如果為True,立即返回;如果為False,調用setWakeupSocket(),并將interruptTriggered設置為true。下面看setWakeupSocket()的實現:
private void setWakeupSocket() {
this.setWakeupSocket0(this.wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
傳入管道sink端的wakeupSinkFd,然后調用底層的setWakeupSocket0方法,下面從openjdk8源文件WindowsSelectorImpl.c找到setWakeupSocket0的實現:
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
該函數的主要作用是向pipe的sink端寫入了一個字節,這樣pipe的source端文件描述符立即就會處于就緒狀態,select()方法將立即從阻塞中返回,這樣就完成了喚醒selector的功能。
wakeup()中使用interruptTriggered來判斷是否執行喚醒操作。因此,在select期間,多次調用wakeup()產生的效果與調用一次是一樣的,因為后面的調用將不會滿足喚醒條件。如果調用wakeup()期間沒有select操作,當調用wakeup()之后,interruptTriggered被設置為true,pipe的source端wakeupSourceFd 就會處于就緒狀態。如果此時調用select相關操作時,會調用resetWakeupSocket?方法,resetWakeupSocket?首先會調用本地方法resetWakeupSocket0讀取wakeup()中發送的數據,再將interruptTriggered設置為false,最后doSelect將會立即返回0,而不會調用poll操作。
為什么說將一些字節寫入到SinkChannel后,SourceChannel就可以立即按寫入順序讀取這些字節?
這是因為我們在WindowsSelectorImpl構造方法中將TCP參數TCP_NODELAY設置為了true。該參數的主要作用是禁用Nagle算法,當sink端寫入1字節數據時,將立即發送,而不必等到將較小的包組合成較大的包再發送,這樣source端就可以立馬讀取數據。
下面附上windows環境下Selector的實現原理圖幫助理解阻塞與喚醒的原理(圖片來源網絡):
本文主要介紹了windows環境下wakeup()的實現原理,它通過一個可寫的SinkChannel和一個可讀的SourceChannel組成的pipe來實現喚醒的功能,而Linux環境則使用其本身的Pipe來實現喚醒功能。無論windows還是linux,wakeup的思想是完全一致的,只不過windows沒有Pipe這種信號通知的機制,所以通過TCP來實現了Pipe,建立了一對自己和自己的loopback的TCP連接來發送信號。請注意,每創建一個Selector對象,都會創建一個Pipe實例,這會導致消耗兩個文件描述符FD和兩個端口號,實際開發中需注意端口號和文件描述符的限制。
總結
以上是生活随笔為你收集整理的java nio wakeup_Java NIO wakeup实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iOS开发:The app icon s
- 下一篇: androidx和android的区别,