Bio->Nio->Selector->Epoll->Netty
c10k問題
2000年左右提出的,BIO模型下的10K個socket處理客戶端和服務端數據傳輸慢的問題。
單線程模擬10k個客戶端
服務端和客戶端通信,在內核里會有兩個socket,一是服務器內核listen客戶端的socket,二是客戶端進來后相互之間通信的socket(先netstat -natp 看java的pid,然后lsof -p pid 就可以看到了)
BIO模型
package io.bio;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket;/*** bio 慢的原因:* 1.accept阻塞, 后 new thread 那一步會發生系統調用 clone,這里用戶態系統太切換* 2.客戶端連進來后的io流也是阻塞的。* Author: ljf* CreatedAt: 2021/3/31 下午1:38*/ public class SocketBIO {public static void main(String[] args) {try {ServerSocket server = new ServerSocket(9090, 5);System.out.println("step1: new ServerSocket(9090,5)");while (true) {Socket client = server.accept();System.out.println("step2:client \t" + client.getPort());new Thread(new Runnable() {@Overridepublic void run() {InputStream inputStream = null;try {inputStream = client.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String s = reader.readLine();while (true) {if (s != null) {System.out.println(s);} else {inputStream.close();break;}}} catch (IOException e) {e.printStackTrace();}}}).start();}} catch (IOException e) {e.printStackTrace();}} }阻塞發生在服務端accept客戶端和服務端等待客戶端數據(內核RECV(6)兩處。所以慢。
NIO模型
package io.nio;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List;/*** nio 的n 有兩個意思:* 1.是accept(5 的時候 是 NON_BLOCKINg,和 RECV(6 的時候非阻塞* 2.java 的new io ,即新io的意思* <p>* Author: ljf* CreatedAt: 2021/3/31 下午3:16*/ public class SocketNIO {public static void main(String[] args) {List<SocketChannel> clients = new LinkedList<>();try {ServerSocketChannel ss = ServerSocketChannel.open();// 服務端開啟監聽,接收客戶端ss.bind(new InetSocketAddress(9090)); // 綁定本地的9090端口ss.configureBlocking(false); // 重點,這里是用非阻塞的方式接收客戶端while (true) {// 接收客戶端連接 // Thread.sleep(1000);// accept 調用了內核的accept,沒有客戶端連進來返回值,在BIO的時候一直卡著,NIO不看著,返回-1,java 返回null// 有客戶端連進來,accept 返回這個客戶端的FD5,client Object// NONBLOCKING 就是代碼能往下走了,但是往下走的情況要根據客戶端是否連進來有不同SocketChannel client = ss.accept();if (client == null) {System.out.println("null ...");} else {client.configureBlocking(false); // 重點,socket(服務端的listen// socket<連接請求三次握手后,往這里扔,我去通過accept得到連接的socket>,連接socket<往后的數據讀寫使用的>)int port = client.socket().getPort();System.out.println("client port : " + port);clients.add(client);}ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 這種是直接在服務器內存分配,還有一種是allocate(int capacity) // (這種是在jvm里分配,即堆內存)第一種會分配比較慢,第二種是發生系統內存到堆內存的復制,也不一定快,具體看運行情況// 遍歷已經連進來的客戶端能不能讀寫數據for (SocketChannel s : clients) {int num = s.read(byteBuffer); // >0 -1 0 不會阻塞if (num > 0) {byteBuffer.flip();byte[] bytes = new byte[byteBuffer.limit()];byteBuffer.get(bytes);String b = new String(bytes);System.out.println(client.socket().getPort() + " : " + b);byteBuffer.clear();}}}} catch (IOException e) {e.printStackTrace();}} }道理在代碼注釋里有了,目前這個代碼寫法有一個毛病是clients隨著死循環的增多,遍歷會慢,所以會越跑越慢,最后要么報文件描述符不夠用錯誤。
同步異步阻塞非阻塞
同步異步是相對于IO流來說的,阻塞非阻塞是相對于線程是否等待來說的
linux沒有異步模型,netty也只是優化了的同步模型,win有iocp異步模型。
內核怎么處理IO中斷的
要聊IO模型,從BIO到NIO,再到select、poll,最后到epoll,需要了解內核怎么處理IO中斷。
不管軟中斷IO還是硬中斷IO(我也不知道什么是軟中斷IO和硬中斷IO,但是不影響接下來要說的)。發生中斷了,一定是發生了系統調用 int 中斷號 回調(比如80中斷就是 int 80 callback) 這種組合。假設IO的中斷號是80(具體是什么我也知道,反正不影響接下來說的,不理它)
BIO的模型是 int 80 RECV(5,callback是RECV阻塞,所以BIO就阻塞等待IO流。
NIO 的模型是 int 80 在內核態和用戶態之間切換循環每個FDS,有數據了就交給應用程序處理,繼續循環遍歷所有連接的FDS。
SELECT/POLL的模型是 int 80 內核態遍歷循環里傳入的FDS,這些FDS有數據了就交給應用程序處理,繼續循環,select 和 poll 的區別是select 有文件描述符個數的限制(普通用戶默認是1024,root用戶可能是2048),poll沒有文件描述符個數限制。
所以select/poll比NIO快是因為省去了遍歷每個FDS時的用戶態和內核態的切換
EPOLL的模型是 int 80 內核遍歷所有FDS,把有數據的FDS放到一個集合里,應用程序直接從這個集合里取數據。
java的Epoll
java 在NIO用selector/poll還是epoll是可以在啟動參數上配置
epoll:-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
selector/poll:-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider
單線程的讀,可讀了直接又寫回去給客戶端
package io.selector;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;/*** Author: ljf* CreatedAt: 2021/4/1 下午3:14*/ public class SocketMultiplexingSingleThread {public static void main(String[] args) {SocketMultiplexingSingleThread service = new SocketMultiplexingSingleThread();service.start();}private ServerSocketChannel server = null;private Selector selector = null;int port = 9090;public void start() {initServer();System.out.println("服務器啟動了。。。");Set<SelectionKey> keys = selector.keys();System.out.println("keys size : " + keys.size());while (true) {try {/*** selector.select(timeUnit time):* 1.select/poll 其實內核調用 select(fd4) poll(fd4)* 2.epoll 其實內核調用 epoll_wait()* time 如果不傳或者0,阻塞,如果設置,就是設置了一個阻塞時間** 其實可以調用 selector.wakeup(),因為調用的時候還沒有有數據的fd,容易返回0,** 懶加載:其實在觸碰到selector.select() 調用的時候出發了 epoll_ctl 的系統調用*/while ((selector.select(500) > 0)) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {/*** 無論是啥多路復用,key 只能返回狀態,所以應用程序還得一個一個的取處理R/W,即同步*/SelectionKey key = iterator.next();/*** 如果是客戶端連進來,語義上,accept 接收連接且返回新連接的FD,那這個FD怎么處理?* 1.select、poll,因為他們內核沒有空間,在jvm保存和前邊的fd4哪個listen的一起* 2.epoll,我們希望通過epoll_ctl把新的客戶端FD注冊到內核空間*/if (key.isAcceptable()) {acceptHandler(key);/*** 連接的R、W的處理,在當前線程,這個方法可能會阻塞,所以提出了 IO THREADS,比如 redis,tomcat8,9的異步處理*/} else if (key.isReadable()) {readHandler(key);}}}} catch (IOException e) {e.printStackTrace();}}}private void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;while (true) {try {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();try {SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buf = ByteBuffer.allocateDirect(8192);client.register(selector, SelectionKey.OP_READ, buf);System.out.println("--------------------------------------");System.out.println("新客戶端: " + client.getRemoteAddress());System.out.println("--------------------------------------");} catch (IOException e) {e.printStackTrace();}}private void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));// 如果在epoll模型下,open()-》 epoll_create() -> fd3// 優先選擇epoll,select poll 可以通過 -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider 參數修正selector = Selector.open();// 約等于 listen狀態的 fd4// register:// 如果是select、poll,jvm里開辟一個數組,把fd4放進去// 如果是epoll,調用 epoll_ctl(fd3,add,fd4,EPOOLINserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}} }單線程的讀寫
a、isReadable關注了客戶端連進來,有客戶端連進來注冊一個寫到多路復用器里。
b、isWriteable關注了send queue里是否為空,說明時候寫決定與應用程序
多線程的讀寫
單線程明顯的瓶頸是如果有一個客戶端要讀寫10年,那別的客戶端就拜拜了,所以主線程只關注連接,讀寫交給新的線程。
package io.selector;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;/*** Author: ljf* CreatedAt: 2021/4/3 上午9:24*/ public class SocketMultiplexingSingleThread2 {private ServerSocketChannel server = null;private Selector selector = null;int port = 9090;public static void main(String[] args) {SocketMultiplexingSingleThread2 socketMultiplexingSingleThread2 = new SocketMultiplexingSingleThread2();socketMultiplexingSingleThread2.start();}private void start() {initServer();System.out.println("服務器啟動了。。。");try {while (true) {while (selector.select(50) > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {readHandler(key);} else if (key.isWritable()) {writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(() -> {System.out.println("write handler ...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {client.write(buffer);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}).start();}private void readHandler(SelectionKey key) {new Thread(() -> {System.out.println("read handler ...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;while (true) {try {read = client.read(buffer);if (read > 0) {client.register(key.selector(), SelectionKey.OP_WRITE, buffer);System.out.println(Thread.currentThread().getName() + " " + read);} else if (read == 0) {break;} else if (read < 0) {client.close();break;}} catch (IOException e) {e.printStackTrace();}}}).start();}private void acceptHandler(SelectionKey key) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();try {SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocateDirect(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("---------------------------------");System.out.println("新客戶端: " + client.getRemoteAddress());System.out.println("---------------------------------");} catch (IOException e) {e.printStackTrace();}}private void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}}按道理讀寫的線程應該池化管理,不過這不重要,實際上沒人會直接用java的epoll。
很明顯,jdk的epoll只做到接收連接和讀寫這兩個動作的異步,哪個復用器可讀寫了需要程序自己死循環去跟蹤。而我們想要的是哪個復用器可讀寫交給內核,讓內核產生系統調用自動讀寫,我們只需要關注怎么讀寫就行了,把讀寫的具體實現接上系統的讀寫調用就行。即響應式,即netty。
netty
引入netty依賴
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.9.Final</version></dependency>模擬netty的客戶端
@Testpublic void clientMode() throws Exception {// 相當于 java 的 selectorNioEventLoopGroup thread = new NioEventLoopGroup(1);// 相當于 java 的SocketChannelNioSocketChannel client = new NioSocketChannel();ChannelFuture register = thread.register(client);// 注冊一個IO的handlerChannelPipeline p = client.pipeline();// 注冊一個事件,netty會監聽這個事件即時給出動作p.addLast(new MyInHandler());// 連接服務器ChannelFuture connect = client.connect(new InetSocketAddress("192.168.172.3", 9090));// 因為連接是異步的,所以要等待同步連接,否則連接之后的動作才能繼續ChannelFuture sync = connect.sync();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);// 因為發送是異步的,所以要同步等待發過去send.sync();// 連接要阻塞住,永遠不從服務器斷開sync.channel().closeFuture().sync(); } /*** ChannelInboundHandlerAdapter 只是 ChannelHandler 的一個抽象的實現,并非 adapter模式* 如果沒有這個實現,要 MyInHandler 要實現接口里的所有方法,很多是沒用的**/ class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// buf有get、set和read、write兩組字節的操作,前者不改變seek,后者改變seek // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence charSequence = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(charSequence);// 把讀到的字節再寫回去ctx.writeAndFlush(buf);} }模擬netty的服務器
@Testpublic void serverMode() throws Exception {// 相當于 java 的 selectorNioEventLoopGroup thread = new NioEventLoopGroup(1);// 相當與 java 的 ServerSocketChannelNioServerSocketChannel server = new NioServerSocketChannel();// 綁定客戶端的連接ChannelPipeline p = server.pipeline(); // p.addLast(new MyAcceptHandler(thread)); // 把selector 傳進去了,接收到的客戶端注冊進來的讀寫,應該復用客戶端的讀寫 // p.addLast(new MyAcceptHandler2(thread, new MyInHandler())); // 復用客戶端的讀寫p.addLast(new MyAcceptHandler2(thread, new AcceptHandlerInitiator()));thread.register(server);ChannelFuture bind = server.bind(new InetSocketAddress("192.168.8.103", 9090));// 因為bind是異步的,要用同步方法鎖住bind.channel().closeFuture().sync();System.out.println("server closed ....");} /*** 雖然可以加上@ChannelHandler.Sharable 解決不可復用問題,* 但是nio推薦加一層包裝,請看 AcceptHandlerInitiator,這也是netty的做法*/ //@ChannelHandler.Sharable class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// buf有get、set和read、write兩組字節的操作,前者不改變seek,后者改變seek // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence charSequence = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(charSequence);// 把讀到的字節再寫回去ctx.writeAndFlush(buf);} }class MyAcceptHandler extends ChannelInboundHandlerAdapter {private final NioEventLoopGroup selector;public MyAcceptHandler(NioEventLoopGroup selector) {this.selector = selector;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {SocketChannel client = (SocketChannel) msg;ChannelPipeline p = client.pipeline();// 獲得客戶端了,要注冊讀寫// 每次都 new MyInHandler()// 是新的對象,業務上是沒問題的,但是如果有一萬個客戶端,要有一萬個對象,這個對象處理連接和業務,應該分開連接和業務,連接公用一個實例,具體的業務根據需要要自己實現。// 解決辦法辦法請看2p.addLast(new MyInHandler());selector.register(client);} }class MyAcceptHandler2 extends ChannelInboundHandlerAdapter {private final NioEventLoopGroup selector;private final ChannelHandler handler;public MyAcceptHandler2(NioEventLoopGroup thread, ChannelHandler myInHandler) {this.selector = thread;this.handler = myInHandler;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {SocketChannel client = (SocketChannel) msg;ChannelPipeline p = client.pipeline();// 獲得客戶端了,要注冊讀寫// 所以復用 myInhandler,但是不同的客戶端復用一個連接,nio不允許這么做,可以加上Sharable注解解決,// nio 的做法是加上一個包裝p.addLast(handler);selector.register(client);} }@ChannelHandler.Sharable class AcceptHandlerInitiator extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());// 在 serverMode new AcceptHandlerInitiator 的時候, AcceptHandlerInitiator,在 pipeline 加入了// AcceptHandlerInitiator 本身,在獲取到 client 時,又加入了了 myInHandler,// 而我們的業務實際上只需要 MyInhandler,所以 removectx.pipeline().remove(this);} }結合以上的模擬,實際的netty的簡單模擬可以寫成
@Testpublic void nettyClient() throws Exception {NioEventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new MyInHandler());}}).connect(new InetSocketAddress("192.168.172.3", 9090));Channel client = connect.sync().channel();ByteBuf byteBuf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(byteBuf);send.sync();client.closeFuture().sync();}@Testpublic void nettyServer() throws Exception {NioEventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bs = new ServerBootstrap();ChannelFuture bind = bs.group(group, group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioServerSocketChannel) throws Exception {ChannelPipeline p = nioServerSocketChannel.pipeline();p.addLast(new MyInHandler());}}).bind(new InetSocketAddress("192.168.8.103", 9090));bind.sync().channel().closeFuture().sync();}ChannelInitializer 相當于之前模擬的那個包裝 MyInHandler 的類 AcceptHandlerInitiator
總結
以上是生活随笔為你收集整理的Bio->Nio->Selector->Epoll->Netty的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 搭建LVS_DR模型
- 下一篇: linux内核管理pagecache的一