NIO 源码初探
說到源碼先得從Selector 的open 方法開始看起,java.nio.channels.Selector:
public static Selector open() throws IOException {return SelectorProvider.provider().openSelector(); }看看SelectorProvider.provider()做了什么:
public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} }其中provider = sun.nio.ch.DefaultSelectorProvider.create();會(huì)根據(jù)操作系統(tǒng)來返回不同的實(shí)現(xiàn)類,windows 平臺(tái)就返回WindowsSelectorProvider;而if (provider != null) return provider;保證了整個(gè)server 程序中只有一個(gè)WindowsSelectorProvider 對象;
再看看WindowsSelectorProvider. openSelector():
public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this); }new WindowsSelectorImpl(SelectorProvider)代碼:
WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper = new PollArrayWrapper(INIT_CAP);wakeupPipe = Pipe.open();wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediateSinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();(sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd = ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }其中Pipe.open()是關(guān)鍵,這個(gè)方法的調(diào)用過程是:
public static Pipe open() throws IOException {return SelectorProvider.provider().openPipe(); }SelectorProvider 中:
public Pipe openPipe() throws IOException {return new PipeImpl(this); }再看看怎么new PipeImpl()的:
PipeImpl(SelectorProvider sp) {long pipeFds = IOUtil.makePipe(true);int readFd = (int) (pipeFds >>> 32);int writeFd = (int) pipeFds;FileDescriptor sourcefd = new FileDescriptor();IOUtil.setfdVal(sourcefd, readFd);source = new SourceChannelImpl(sp, sourcefd);FileDescriptor sinkfd = new FileDescriptor();IOUtil.setfdVal(sinkfd, writeFd);sink = new SinkChannelImpl(sp, sinkfd); }其中IOUtil.makePipe(true)是個(gè)native 方法:
/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */ staticnativelong makePipe(boolean blocking);具體實(shí)現(xiàn):
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) {int fd[2];if (pipe(fd) < 0) {JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");return 0;}if (blocking == JNI_FALSE) {if ((configureBlocking(fd[0], JNI_FALSE) < 0)|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");close(fd[0]);close(fd[1]);return 0;}}return ((jlong) fd[0] << 32) | (jlong) fd[1]; } static int configureBlocking(int fd, jboolean blocking) {int flags = fcntl(fd, F_GETFL);int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }正如這段注釋所描述的:
/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */High32 位存放的是通道read 端的文件描述符FD(file descriptor),low 32 bits 存放的是write 端的文件描述符。所以取到makepipe()返回值后要做移位處理。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);這行代碼把返回的pipe 的write 端的FD 放在了pollWrapper 中(后面會(huì)發(fā)現(xiàn),這么做是為了實(shí)現(xiàn)selector 的wakeup())ServerSocketChannel.open()的實(shí)現(xiàn):
public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel(); }SelectorProvider:
public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this); }可見創(chuàng)建的ServerSocketChannelImpl 也有WindowsSelectorImpl 的引用。
public ServerSocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);this.fd = Net.serverSocket(true);this.fdVal = IOUtil.fdVal(fd);this.state = ST_INUSE; }然后通過serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector 和channel 綁定在一起,也就是把newServerSocketChannel 時(shí)創(chuàng)建的FD 與selector 綁定在了一起。
到此,server 端已啟動(dòng)完成了,主要?jiǎng)?chuàng)建了以下對象:
WindowsSelectorProvider:單例WindowsSelectorImpl 中包含:
pollWrapper:保存selector 上注冊的FD,包括pipe 的write 端FD 和ServerSocketChannel 所用的FD
wakeupPipe:通道(其實(shí)就是兩個(gè)FD,一個(gè)read,一個(gè)write)
再到Server 中的run():
selector.select();主要調(diào)用了WindowsSelectorImpl 中的這個(gè)方法:
protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeoutprocessDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// Calculate number of helper threads needed for poll. If necessary// threads are created here and start waiting on startLockadjustThreadsCount();finishLock.reset(); // reset finishLock// Wakeup helper threads, waiting on startLock, so they start polling.// Redundant threads will exit here after wakeup.startLock.startThreads();// do polling in the main thread. Main thread is responsible for// first MAX_SELECTABLE_FDS entries in pollArray.try {begin();try {subSelector.poll();} catch (IOException e) {finishLock.setException(e); // Save this exception}// Main thread is out of poll(). Wakeup others and wait for themif (threads.size() > 0)finishLock.waitForHelperThreads();} finally {end();}// Done with poll(). Set wakeupSocket to nonsignaled for the next run.finishLock.checkForException();processDeregisterQueue();int updated = updateSelectedKeys();// Done with poll(). Set wakeupSocket to nonsignaled for the next run.resetWakeupSocket();return updated; }其中subSelector.poll()是核心,也就是輪訓(xùn)pollWrapper 中保存的FD;具體實(shí)現(xiàn)是調(diào)用native 方法poll0:
private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發(fā)生read 的FD private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生write 的FD private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生except 的FD這個(gè)poll0()會(huì)監(jiān)聽pollWrapper 中的FD 有沒有數(shù)據(jù)進(jìn)出,這會(huì)造成IO 阻塞,直到有數(shù)據(jù)讀寫事件發(fā)生。比如,由于pollWrapper 中保存的也有ServerSocketChannel 的FD,所以只要ClientSocket 發(fā)一份數(shù)據(jù)到ServerSocket,那么poll0()就會(huì)返回;又由于pollWrapper 中保存的也有pipe 的write 端的FD,所以只要pipe 的write 端向FD 發(fā)一份數(shù)據(jù),也會(huì)造成poll0()返回;如果這兩種情況都沒有發(fā)生,那么poll0()就一直阻塞,也就是selector.select()會(huì)一直阻塞;如果有任何一種情況發(fā)生,那么selector.select()就會(huì)返回,所有在OperationServer 的run()里要用while (true) {,這樣就可以保證在selector 接收到數(shù)據(jù)并處理完后繼續(xù)監(jiān)聽poll();
這時(shí)再來看看WindowsSelectorImpl. Wakeup():
public Selector wakeup() {synchronized (interruptLock) {if (!interruptTriggered) {setWakeupSocket();interruptTriggered = true;}}return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() {setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {/* Write one byte into the pipe */const char byte = 1;send(scoutFd, &byte, 1, 0); }可見wakeup()是通過pipe 的write 端send(scoutFd, &byte, 1, 0),發(fā)生一個(gè)字節(jié)1,來喚醒poll()。所以在需要的時(shí)候就可以調(diào)用selector.wakeup()來喚醒selector。
?
總結(jié)
- 上一篇: 通道Channel-IO 多路复用
- 下一篇: 反应堆Reactor