epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿
“ Epoll 是Linux內核的高性能、可擴展的I/O事件通知機制。
在linux2.5.44首次引入epoll,它設計的目的旨在取代既有的select、poll系統函數,讓需要大量操作文件描述符的程序得以發揮更優異的性能(wikipedia example: 舊有的系統函數所花費的時間復雜度為O(n), epoll的時間復雜度O(log n))。epoll實現的功能與poll類似,都是監聽多個文件描述符上的事件。
epoll底層是由可配置的操作系統內核對象建構而成,并以文件描述符(file descriptor)的形式呈現于用戶空間(from wikipedia: 在操作系統中,虛擬內存通常會被分成用戶空間,與核心空間這兩個區段。這是存儲器保護機制中的一環。內核**、核心擴展(kernel extensions)、以及驅動程序,運行在核心空間**上。而其他的應用程序,則運行在用戶空間上。所有運行在用戶空間的應用程序,都被統稱為用戶級(userland))。
多說一點關于內核的
它是一個用來管理軟件發出的數據I/O的一個程序,并將數據交由CPU和電腦其他電子組件處理,但是直接對硬件操作是非常復雜的,通常內核提供一種硬件抽象的方法來完成(由內核決定一個程序在什么時候對某部分硬件操作多長時間),通過這些方法來完成進程間通信和系統調用。
宏內核:
宏內核簡單來說,首先定義了一個高階的抽象接口,叫系統調用(System call))來實現操作系統的功能,例如進程管理,文件系統,和存儲管理等等,這些功能由多個運行在內核態的程序來完成。
微內核:
微內核結構由硬件抽象層和系統調用組成;包括了創建一個系統必需的幾個部分;如線程管理,地址空間和進程間通信等。微核的目標是將系統服務的實現和系統的基本操作規則分離開來。
linux就是使用的宏內核。因為它能夠在運行時將模塊調入執行,使擴充內核的功能變得更簡單。
epoll做了什么事?
epoll 通過使用紅黑樹(RB-tree)搜索被監視的文件描述符(file descriptor)。
在 epoll 實例上注冊事件時,epoll 會將該事件添加到 epoll 實例的紅黑樹上并注冊一個回調函數,當事件發生時會將事件添加到就緒鏈表中。
epoll的結構?
int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);①epoll_create
向內核申請空間,創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個參數不同于select()中的第一個參數,給出最大監聽的fd+1的值。在最初的實現中,調用者通過 size 參數告知內核需要監聽的文件描述符數量。如果監聽的文件描述符數量超過 size, 則內核會自動擴容。而現在 size 已經沒有這種語義了,但是調用者調用時 size 依然必須大于 0,以保證后向兼容性。需要注意的是,當創建好epoll句柄后,它就是會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的。
②epoll_ctl
向 epfd 對應的內核epoll 實例添加、修改或刪除對 fd 上事件 event 的監聽。op 可以為 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分別對應的是添加新的事件,修改文件描述符上監聽的事件類型,從實例上刪除一個事件。如果 event 的 events 屬性設置了 EPOLLET flag,那么監聽該事件的方式是邊緣觸發。
events可以是以下幾個宏的集合:
- EPOLLIN:觸發該事件,表示對應的文件描述符上有可讀數據。(包括對端SOCKET正常關閉);
- EPOLLOUT:觸發該事件,表示對應的文件描述符上可以寫數據;
- EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來);
- EPOLLERR:表示對應的文件描述符發生錯誤;
- EPOLLHUP:表示對應的文件描述符被掛斷;
- EPOLLET:將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)來說的。
- EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里。
例如:
struct epoll_event ev; //設置與要處理的事件相關的文件描述符 ev.data.fd=listenfd; //設置要處理的事件類型 ev.events=EPOLLIN|EPOLLET; //注冊epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);③epoll_wait
Linux-2.6.19又引入了可以屏蔽指定信號的epoll_wait: epoll_pwait
接收發生在被偵聽的描述符上的,用戶感興趣的IO事件。簡單點說:通過循環,不斷地監聽暴露的端口,看哪一個fd可讀、可寫~
當 timeout 為 0 時,epoll_wait 永遠會立即返回。而 timeout 為 -1 時,epoll_wait 會一直阻塞直到任一已注冊的事件變為就緒。當 timeout 為一正整數時,epoll 會阻塞直到計時結束或已注冊的事件變為就緒。因為內核調度延遲,阻塞的時間可能會略微超過 timeout (毫秒級)。
epoll文件描述符用完后,直接用close關閉,并且會自動從被偵聽的文件描述符集合中刪除
epoll實戰
說了這么多原理,腦殼怕嗡嗡的吧,來看看實戰清醒下~
如上知道:每次添加/修改/刪除被偵聽文件描述符都需要調用epoll_ctl,所以要盡量少地調用epoll_ctl,防止其所引來的開銷抵消其帶來的好處。有的時候,應用中可能存在大量的短連接(比如說Web服務器),epoll_ctl將被頻繁地調用,可能成為這個系統的瓶頸。
傳統的select以及poll的效率會因為在線人數的線形遞增而導致呈二次乃至三次方的下降,這些直接導致了網絡服務器可以支持的人數有了個比較明顯的限制。這是因為他們有限的文件描述符和遍歷所有的fd所帶來的低效。
重點哦~
當你擁有一個很大的socket集合,不過由于網絡延時,任一時間只有部分的socket是“活躍”的,但是select/poll每次調用都會線性掃描全部的集合,導致效率呈現線性下降。epoll不存在這個問題,它只會對“活躍”的socket進行操作---這是因為在內核實現中epoll是根據每個fd上面的callback函數實現的。那么,只有“活躍”的socket才會主動的去調用 callback函數,其他idle(空閑)狀態socket則不會,在這點上,epoll實現了一個“偽”AIO,因為這時候推動力在os內核。在一些 benchmark中,如果所有的socket基本上都是活躍的---比如一個高速LAN環境,epoll并不比select/poll有什么效率,相反,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。
int epfd = epoll_create(POLL_SIZE);struct epoll_event ev;struct epoll_event *events = NULL;nfds = epoll_wait(epfd, events, 20, 500);{for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {//如果是主socket的事件的話,則表示//有新連接進入了,進行新連接的處理。client = accept(listener, (structsockaddr *)&local, &addrlen);if (client < 0) {perror("accept");continue;}setnonblocking(client); //將新連接置于非阻塞模式ev.events = EPOLLIN | EPOLLET; //并且將新連接也加入EPOLL的監聽隊列。//注意,這里的參數EPOLLIN|EPOLLET并沒有設置對寫socket的監聽,//如果有寫操作的話,這個時候epoll是不會返回事件的,如果要對寫操作//也監聽的話,應該是EPOLLIN|EPOLLOUT|EPOLLETev.data.fd = client;if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {//設置好event之后,將這個新的event通過epoll_ctl加入到epoll的監聽隊列里面,//這里用EPOLL_CTL_ADD來加一個新的epoll事件,通過EPOLL_CTL_DEL來減少一個//epoll事件,通過EPOLL_CTL_MOD來改變一個事件的監聽方式。fprintf(stderr, "epollsetinsertionerror:fd=%d", client);return -1;}}else if(event[n].events & EPOLLIN){//如果是已經連接的用戶,并且收到數據,//那么進行讀入int sockfd_r;if ((sockfd_r = event[n].data.fd) < 0)continue;read(sockfd_r, buffer, MAXSIZE);//修改sockfd_r上要處理的事件為EPOLLOUTev.data.fd = sockfd_r;ev.events = EPOLLOUT | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)}else if(event[n].events & EPOLLOUT){//如果有數據發送int sockfd_w = events[n].data.fd;write(sockfd_w, buffer, sizeof(buffer));//修改sockfd_w上要處理的事件為EPOLLINev.data.fd = sockfd_w;ev.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)}do_use_fd(events[n].data.fd);}}簡單說下流程:
- 監聽到有新連接進入了,進行新連接的處理;
- 如果是已經連接的用戶,并且收到數據,讀完之后修改sockfd_r上要處理的事件為EPOLLOUT(可寫);
- 如果有數據發送,寫完之后,修改sockfd_w上要處理的事件為EPOLLIN(可讀)
epoll在Java中怎么去調用的?
基礎知識:
文件描述符:
- (參考《Unix網絡編程》譯者的注釋)
- 文件描述符是Unix系統標識文件的int,Unix的哲學一切皆文件,所以各自資源(包括常規意義的文件、目錄、管道、POSIX IPC、socket)都可以看成文件。
Java NIO的世界中,Selector是中央控制器,Buffer是承載數據的容器,而Channel可以說是最基礎的門面,它是本地I/O設備、網絡I/O的通信橋梁。
- 網絡I/O設備:
- DatagramChannel:讀寫UDP通信的數據,對應DatagramSocket類
- SocketChannel:讀寫TCP通信的數據,對應Socket類
- ServerSocketChannel:監聽新的TCP連接,并且會創建一個可讀寫的SocketChannel,對應ServerSocket類
- 本地I/O設備:
- FileChannel:讀寫本地文件的數據,不支持Selector控制,對應File類
①先從最簡單的ServerSocketChannel看起
ServerSocketChannel與ServerSocket一樣是socket監聽器,其主要區別前者可以運行在非阻塞模式下運行;
// 創建一個ServerSocketChannel,將會關聯一個未綁定的ServerSocketpublic static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}ServerSocketChannel的創建也是依賴底層操作系統實現,其實現類主要是ServerSocketChannelImpl,我們來看看其構造方法
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);// 創建一個文件操作符this.fd = Net.serverSocket(true);// 得到文件操作符是索引this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}新建一個ServerSocketChannelImpl其本質是在底層操作系統創建了一個fd(即文件描述符),相當于建立了一個用于網絡通信的通道,調用socket的bind()方法綁定,通過accept()調用操作系統獲取TCP連接
public SocketChannel accept() throws IOException {// 忽略一些校驗及無關代碼.... ?SocketChannelImpl var2 = null;// var3的作用主要是說明當前的IO狀態,主要有/*** EOF = -1;* UNAVAILABLE = -2;* INTERRUPTED = -3;* UNSUPPORTED = -4;* THROWN = -5;* UNSUPPORTED_CASE = -6;*/int var3 = 0;// 這里本質也是用fd來獲取連接FileDescriptor var4 = new FileDescriptor();// 用來存儲TCP連接的地址信息InetSocketAddress[] var5 = new InetSocketAddress[1]; ?try {// 這里設置了一個中斷器,中斷時會將連接關閉this.begin();// 這里當IO被中斷時,會重新獲取連接do {var3 = this.accept(this.fd, var4, var5);} while(var3 == -3 && this.isOpen());}finally {// 當連接被關閉且accept失敗時或拋出AsynchronousCloseExceptionthis.end(var3 > 0);// 驗證連接是可用的assert IOStatus.check(var3);} ?if (var3 < 1) {return null;} {// 默認連接是阻塞的IOUtil.configureBlocking(var4, true);// 創建一個SocketChannel的引用var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);// 下面是是否連接成功校驗,這里忽略... ?return var2;} } ? // 依賴底層操作系統實現的accept0方法 private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {return this.accept0(var1, var2, var3); }②SocketChannel
用于讀寫TCP通信的數據,相當于客戶端
open
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);}// State, increases monotonicallyprivate static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = ST_UNINITIALIZED; SocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);// 創建一個scoket通道,即fd(fd的作用可參考上面的描述)this.fd = Net.socket(true);// 得到該fd的索引this.fdVal = IOUtil.fdVal(fd);// 設置為未連接this.state = ST_UNCONNECTED;}connect建立連接
// 代碼均來自JDK1.8 部分代碼public boolean connect(SocketAddress var1) throws IOException {boolean var2 = false;// 讀寫都鎖住synchronized(this.readLock) {synchronized(this.writeLock) {/****狀態檢查,channel和address****/// 判斷channel是否openthis.ensureOpenAndUnconnected();InetSocketAddress var5 = Net.checkAddress(var1);SecurityManager var6 = System.getSecurityManager();if (var6 != null) {var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());} ?boolean var10000;/****連接建立****/// 阻塞狀態變更的鎖也鎖住synchronized(this.blockingLock()) {int var8 = 0; ?try {try {this.begin(); // 如果當前socket未綁定本地端口,則嘗試著判斷和服務端是否能建立連接synchronized(this.stateLock) {if (!this.isOpen()) {boolean var10 = false;return var10;} ?if (this.localAddress == null) {// 和遠程建立連接后關閉連接NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());} ?this.readerThread = NativeThread.current();} ?do {InetAddress var9 = var5.getAddress();if (var9.isAnyLocalAddress()) {var9 = InetAddress.getLocalHost();}// 建立連接var8 = Net.connect(this.fd, var9, var5.getPort());} while(var8 == -3 && this.isOpen());synchronized(this.stateLock) {this.remoteAddress = var5;if (var8 <= 0) {if (!this.isBlocking()) {this.state = 1;} else {assert false;}} else {this.state = 2;// 連接成功if (this.isOpen()) {this.localAddress = Net.localAddress(this.fd);} ?var10000 = true;return var10000;}}} ?var10000 = false;return var10000;}}}在建立在綁定地址之前,我們需要調用NetHooks.beforeTcpBind,這個方法是將fd轉換為SDP(Sockets Direct Protocol,Java套接字直接協議) socket。SDP需要網卡支持InfiniBand高速網絡通信技術,windows不支持該協議。
我們來看看在openjdk: srcsolarisclassessunnet下的NetHooks.java
private static final Provider provider = new sun.net.sdp.SdpProvider();public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpBind(fdObj, address, port);}public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpConnect(fdObj, address, port);}可以看到實際是調用的SdpProvider里的implBeforeTcpBind
@Overridepublic void implBeforeTcpBind(FileDescriptor fdObj,InetAddress address,int port)throws IOException{if (enabled)convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);}// converts unbound TCP socket to a SDP socket if it matches the rulesprivate void convertTcpToSdpIfMatch(FileDescriptor fdObj,Action action,InetAddress address,int port)throws IOException{boolean matched = false;// 主要是先通過規則校驗器判斷入參是否符合,一般有PortRangeRule校驗器// 然后再執行將fd轉換為socketfor (Rule rule: rules) {if (rule.match(action, address, port)) {SdpSupport.convertSocket(fdObj);matched = true;break;}}}public static void convertSocket(FileDescriptor fd) throws IOException {...//獲取fd索引int fdVal = fdAccess.get(fd);convert0(fdVal);}// convert0JNIEXPORT void JNICALLJava_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd){// create方法實際是通過socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一個socketint s = create(env);if (s >= 0) {socklen_t len;int arg, res;struct linger linger;/* copy socket options that are relevant to SDP */len = sizeof(arg);// 重用TIME_WAIT的端口if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);len = sizeof(arg);// 緊急數據放入普通數據流if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);len = sizeof(linger);// 延遲關閉連接if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);// 將fd也引用到s所持有的通道RESTARTABLE(dup2(s, fd), res);if (res < 0)JNU_ThrowIOExceptionWithLastError(env, "dup2");// 執行close方法,關閉s這個引用RESTARTABLE(close(s), res);}}read 讀
public int read(ByteBuffer var1) throws IOException {// 省略一些判斷synchronized(this.readLock) {this.begin();synchronized(this.stateLock) {do {// 通過IOUtil的讀取fd的數據至buf// 這里的nd是SocketDispatcher,用于調用底層的read和write操作var3 = IOUtil.read(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());// 這個方法主要是將UNAVAILABLE(原為-2)這個狀態返回0,否則返回nvar4 = IOStatus.normalize(var3);var20 = false;break label367;} ?this.readerCleanup();assert IOStatus.check(var3);} }}} static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1.isReadOnly()) {throw new IllegalArgumentException("Read-only buffer");} else if (var1 instanceof DirectBuffer) {return readIntoNativeBuffer(var0, var1, var2, var4);} else {// 臨時緩沖區,大小為buf的remain(limit - position),堆外內存,使用ByteBuffer.allocateDirect(size)分配// Notes:這里分配后后面有個try-finally塊會釋放該部分內存ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); ?int var7;try {// 將網絡中的buf讀進direct bufferint var6 = readIntoNativeBuffer(var0, var5, var2, var4);var5.flip();// 待讀取if (var6 > 0) {var1.put(var5);// 成功時寫入} ?var7 = var6;} finally {Util.offerFirstTemporaryDirectBuffer(var5);} ?return var7;}} private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {// 忽略變量initif (var2 != -1L) {// pread方法只有在同步狀態下才能使用var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);} else {// 其調用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);} ?if (var9 > 0) {var1.position(var5 + var9);} ?return var9;}} // 同樣找到openjdk:srcsolarisnativesunnioch //FileDispatcherImpl.c JNIEXPORT jint JNICALL Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,jobject fdo, jlong address, jint len) {jint fd = fdval(env, fdo);// 獲取fd索引void *buf = (void *)jlong_to_ptr(address);// 調用底層read方法return convertReturnVal(env, read(fd, buf, len), JNI_TRUE); }總結一下讀取的過程
write 寫
看完了前面的read,write整個執行流程基本一樣,具體的細節參考如下
public int write(ByteBuffer var1) throws IOException {if (var1 == null) {throw new NullPointerException();} else {synchronized(this.writeLock) {this.ensureWriteOpen();this.begin();synchronized(this.stateLock) {if (!this.isOpen()) {var5 = 0;var20 = false;break label310;}this.writerThread = NativeThread.current();}do {// 通過IOUtil的讀取fd的數據至buf// 這里的nd是SocketDispatcher,用于調用底層的read和write操作var3 = IOUtil.write(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen()); ?var4 = IOStatus.normalize(var3);var20 = false;this.writerCleanup();assert IOStatus.check(var3);return var4;}}}} static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else { ?ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); ?int var10;try {// 這里的pos為buf初始的position,意思是將buf重置為最初的狀態;因為目前還沒有真實的寫入到channel中var8.put(var1);var8.flip();var1.position(var5);// 調用int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);} ?var10 = var9;} finally {Util.offerFirstTemporaryDirectBuffer(var8);} ?return var10;}} IOUtil.writeFromNativeBuffer(fd , buf , position , nd) {// ... 忽略一些獲取buf變量的代碼 int written = 0;if (position != -1) {// pread方法只有在同步狀態下才能使用written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);} else {// 其調用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);}//.... } FileDispatcherImpl.write0 {// 調用底層的write方法寫入return convertReturnVal(env, write(fd, buf, len), JNI_FALSE); } }總結一下write的過程:
耐心一點,馬上就到Epoll了
理解了前面的一些基礎知識,接下來的部分就會涉及到Java是怎么樣來使用epoll的。
Selector簡述
Selector的作用是Java NIO中管理一組多路復用的SelectableChannel對象,并能夠識別通道是否為諸如讀寫事件做好準備的組件 --Java doc
Selector的創建過程如下:
// 1.創建Selector Selector selector = Selector.open();// 2.將Channel注冊到選擇器中 // ....... new channel的過程 ....//Notes:channel要注冊到Selector上就必須是非阻塞的,所以FileChannel是不可以 //使用Selector的,因為FileChannel是阻塞的 channel.configureBlocking(false);// 第二個參數指定了我們對 Channel 的什么類型的事件感興趣 SelectionKey key = channel.register(selector , SelectionKey.OP_READ);// 也可以使用或運算|來組合多個事件,例如 SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);// 不過值得注意的是,一個 Channel 僅僅可以被注冊到一個 Selector 一次, // 如果將 Channel 注冊到 Selector 多次, 那么其實就是相當于更新 SelectionKey //的 interest set.①一個Channel在Selector注冊其代表的是一個SelectionKey事件,SelectionKey的類型包括:
- OP_READ:可讀事件;值為:1<<0
- OP_WRITE:可寫事件;值為:1<<2
- OP_CONNECT:客戶端連接服務端的事件(tcp連接),一般為創建SocketChannel客戶端channel;值為:1<<3
- OP_ACCEPT:服務端接收客戶端連接的事件,一般為創建ServerSocketChannel服務端channel;值為:1<<4
②一個Selector內部維護了三組keys:
③Selector類中總共包含以下10個方法:
- open():創建一個Selector對象
- isOpen():是否是open狀態,如果調用了close()方法則會返回false
- provider():獲取當前Selector的Provider
- keys():如上文所述,獲取當前channel注冊在Selector上所有的key
- selectedKeys():獲取當前channel就緒的事件列表
- selectNow():獲取當前是否有事件就緒,該方法立即返回結果,不會阻塞;如果返回值>0,則代表存在一個或多個
- select(long timeout):selectNow的阻塞超時方法,超時時間內,有事件就緒時才會返回;否則超過時間也會返回
- select():selectNow的阻塞方法,直到有事件就緒時才會返回
- wakeup():調用該方法會時,阻塞在select()處的線程會立馬返回;(ps:下面一句劃重點)即使當前不存在線程阻塞在select()處,那么下一個執行select()方法的線程也會立即返回結果,相當于執行了一次selectNow()方法
- close(): 用完Selector后調用其close()方法會關閉該Selector,且使注冊到該Selector上的所有SelectionKey實例無效。channel本身并不會關閉。
關于SelectionKey
談到Selector就不得不提SelectionKey,兩者是緊密關聯,配合使用的;如上文所示,往Channel注冊Selector會返回一個SelectionKey對象, 這個對象包含了如下內容:
- interest set,當前Channel感興趣的事件集,即在調用register方法設置的interes set
- ready set
- channel
- selector
- attached object,可選的附加對象
①interest set 可以通過SelectionKey類中的方法來獲取和設置interes set
// 返回當前感興趣的事件列表 int interestSet = key.interestOps();// 也可通過interestSet判斷其中包含的事件 boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE; // 可以通過interestOps(int ops)方法修改事件列表 key.interestOps(interestSet | SelectionKey.OP_WRITE);②ready set 當前Channel就緒的事件列表
int readySet = key.readyOps();// 也可通過四個方法來分別判斷不同事件是否就緒 key.isReadable(); //讀事件是否就緒 key.isWritable(); //寫事件是否就緒 key.isConnectable(); //客戶端連接事件是否就緒 key.isAcceptable(); //服務端連接事件是否就緒③channel和selector 我們可以通過SelectionKey來獲取當前的channel和selector
// 返回當前事件關聯的通道,可轉換的選項包括:`ServerSocketChannel`和`SocketChannel` Channel channel = key.channel(); ? //返回當前事件所關聯的Selector對象 Selector selector = key.selector();attached object 我們可以在selectionKey中附加一個對象,或者在注冊時直接附加:
key.attach(theObject); Object attachedObj = key.attachment(); // 在注冊時直接附加 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);萬丈高樓平地起,基礎知識差不多了,了解了這些,可以找一些nio demo或者netty demo練練手。接下來講解本節比較重要的~epoll
前面多次提到了openjdk,seletor的具體實現肯定是跟操作系統有關的,我們一起來看看。
可以看到Selector的實現是SelectorImpl, 然后SelectorImpl又將職責委托給了具體的平臺,比如圖中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl
根據前面我們知道,Selector.open()可以得到一個Selector實例,怎么實現的呢?
// Selector.java public static Selector open() throws IOException {// 首先找到provider,然后再打開Selectorreturn SelectorProvider.provider().openSelector(); }// java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 這里就是打開Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} }在openjdk中,每個操作系統都有一個sun.nio.ch.DefaultSelectorProvider實現,以src**solaris**classessunnioch下的DefaultSelectorProvider為例:
/*** Returns the default SelectorProvider.*/ public static SelectorProvider create() {// 獲取OS名稱String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根據名稱來創建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider(); }打開src**solaris**classessunnioch下的EPollSelectorProvider.java
public class EPollSelectorProviderextends SelectorProviderImpl {public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}public Channel inheritedChannel() throws IOException {return InheritedChannel.getChannel();} }Linux平臺就得到了最終的Selector實現:src**solaris**classessunnioch下的EPollSelectorImpl.java
來看看它實現的構造器:
EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2個文件描述符,編碼在一個long類型的變量中// 高32位代表讀 低32位代表寫// 使用pipe為了實現Selector的wakeup邏輯long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一個EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}srcsolarisnativesunnioch下的EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd; }①epoll_create在前面已經講過了,這里就不再贅述了。
②epoll wait 等待內核IO事件
調用Selector.select(返回鍵的數量,可能是零)最后會委托給各個實現的doSelect方法,限于篇幅不貼出太詳細的,這里看下EpollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); //EPollArrayWrapper pollWrapper pollWrapper.poll(timeout);//重點在這里 } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys();// 后面會講到 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } int poll(long timeout) throws IOException {updateRegistrations();// 這個代碼在下面講,涉及到epoo_ctl// 這個epollWait是不是有點熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;看下EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd) {struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) { /* Indefinite or no wait *///系統調用等待內核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else { /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res; }可以看到在linux中Selector.select()其實是調用了epoll_wait
③epoll control以及openjdk對事件管理的封裝
JDK中對于注冊到Selector上的IO事件關系是使用SelectionKey來表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.
調用Selector.register()時均會將事件存儲到EpollArrayWrapper.java的成員變量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. // 使用數組保存事件變更, 數組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; // 超過數組長度的事件會緩存到這個map中,等待下次處理 private Map<Integer,Byte> eventsHigh; ? ? /*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/ private void setUpdateEvents(int fd, byte events, boolean force) {// 判斷fd和數組長度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}} }/*** Returns the pending update events for the given file descriptor.*/private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}在上面poll代碼中涉及到
int poll(long timeout) throws IOException {updateRegistrations();/ ?/*** Update the pending registrations.*/private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 從保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0; ?if (events != KILLED) {if (isRegistered) {// 判斷操作類型以傳給epoll_ctl// 沒有指定EPOLLET事件類型opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}private native void epollCtl(int epfd, int opcode, int fd, int events);可以看到epollCtl調用的native方法,我們進入EpollArrayWrapper.c
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events) {struct epoll_event event;int res;event.events = events;event.data.fd = fd;// epoll_ctl這里就不用多說了吧RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");} }在doSelect方法poll執行后,會更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三個set集合,具體可看前面。
/** ? *更新已被epoll選擇fd的鍵。 ? *將就緒興趣集添加到就緒隊列。 ? */ private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}總結
通過本文,你應該知道Channel、Selector基本原理和在Java中怎么使用Epoll的。 (包括更細節的fd與channel和socket之間的轉換關系)掌握這些基礎知識,再去看NIO、netty網絡框架的源碼可能就沒有那么吃力了。在接下來的文章里我會跟進關于Netty的文章,畢竟這已成為分布式網絡通信框架的主流了!
感謝
https://zh.wikipedia.org/wiki/Epoll 維基百科
https://baike.baidu.com/item/epoll/10738144?fr=aladdin
https://juejin.im/entry/5b51546df265da0f70070b93
https://www.jianshu.com/p/f26f1eaa7c8e
來自:微信公眾號(作者:汀雨筆記),著作權屬于:本文和汀雨
總結
以上是生活随笔為你收集整理的epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle中那个日期怎么相减_二手车鉴
- 下一篇: 带参函数_更好的理解Python第五弹函