网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型
生活随笔
收集整理的這篇文章主要介紹了
网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Netty工作架構圖
從圖上看來:
- 一個線程在 Boss Group 中負責接收
- 另外兩個線程在 Worker Group 中由接收之后的連接分配過去,負責讀寫
根據上圖模型,仿照Netty手寫一個多路復用模型
MainThread.java
這里不做關于 IO 和 業務的事情
package com.bjmashibing.system.io.testreactor;public class MainThread {public static void main(String[] args) {//1,創建 IO Thread (一個或者多個)SelectorThreadGroup boss = new SelectorThreadGroup(3); //混雜模式//boss有自己的線程組SelectorThreadGroup worker = new SelectorThreadGroup(3); //混雜模式//worker有自己的線程組//混雜模式,只有一個線程負責accept,每個都會被分配client,進行R/W // SelectorThreadGroup stg = new SelectorThreadGroup(3);//2,我應該把 監聽(9999)的 server 注冊到某一個 selector上boss.setWorker(worker);//但是,boss得多持有worker的引用:/*** boss里選一個線程注冊listen , 觸發bind,從而,這個不選中的線程得持有 workerGroup的引用* 因為未來 listen 一旦accept得到client后得去worker中 next出一個線程分配*/boss.bind(9999);boss.bind(8888);boss.bind(6666);boss.bind(7777);} }SelectorThreadGroup.java
package com.bjmashibing.system.io.testreactor;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Channel; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.atomic.AtomicInteger;/*** @author: 馬士兵教育* @create: 2020-06-21 20:37*/ public class SelectorThreadGroup { //天生都是bossSelectorThread[] selectorThreadArray;ServerSocketChannel serverSocketChannel = null;AtomicInteger xid = new AtomicInteger(0);SelectorThreadGroup selectorThreadGroup = this;public void setWorker(SelectorThreadGroup selectorThreadGroup) {this.selectorThreadGroup = selectorThreadGroup;}SelectorThreadGroup(int num) {//num 線程數selectorThreadArray = new SelectorThread[num];for (int i = 0; i < num; i++) {selectorThreadArray[i] = new SelectorThread(this);new Thread(selectorThreadArray[i]).start();}}public void bind(int port) {try {serverSocketChannel = ServerSocketChannel.open();// 打開服務器-套接字通道serverSocketChannel.configureBlocking(false);//設置非阻塞serverSocketChannel.bind(new InetSocketAddress(port));//綁定端口//注冊到那個selector上呢? // nextSelectorV2(server);nextSelectorV3(serverSocketChannel);} catch (IOException e) {e.printStackTrace();}}/*** 負載均衡的方式選擇一個selector* @param channel*/public void nextSelectorV3(Channel channel) {try {if (channel instanceof ServerSocketChannel) {SelectorThread selectorThread = next(); //listen 選擇了 boss組selectorThreadArray中的一個線程后,要更新這個線程的work組selectorThread.linkedBlockingQueue.put(channel);selectorThread.setWorker(selectorThreadGroup);selectorThread.selector.wakeup();} else {SelectorThread selectorThread = nextV3(); //在main線程(當前SelectorThreadGroup)中,取到堆里的selectorThread對象//1,通過隊列傳遞數據 消息selectorThread.linkedBlockingQueue.add(channel);//2,通過打斷阻塞,讓對應的線程去自己在打斷后完成注冊selectorselectorThread.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}public void nextSelectorV2(Channel c) {try {if (c instanceof ServerSocketChannel) {selectorThreadArray[0].linkedBlockingQueue.put(c);selectorThreadArray[0].selector.wakeup();} else {SelectorThread st = nextV2(); //在 main線程種,取到堆里的selectorThread對象//1,通過隊列傳遞數據 消息st.linkedBlockingQueue.add(c);//2,通過打斷阻塞,讓對應的線程去自己在打斷后完成注冊selectorst.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}public void nextSelector(Channel c) {SelectorThread st = next(); //在 main線程種,取到堆里的selectorThread對象//1,通過隊列傳遞數據 消息st.linkedBlockingQueue.add(c);//2,通過打斷阻塞,讓對應的線程去自己在打斷后完成注冊selectorst.selector.wakeup();// public void nextSelector(Channel c) { // SelectorThread st = next(); //在 main線程種,取到堆里的selectorThread對象 // // //1,通過隊列傳遞數據 消息 // st.lbq.add(c); // //2,通過打斷阻塞,讓對應的線程去自己在打斷后完成注冊selector // st.selector.wakeup();//重點: c有可能是 server 有可能是client // ServerSocketChannel s = (ServerSocketChannel) c;//呼應上, int nums = selector.select(); //阻塞 wakeup() // try { // s.register(st.selector, SelectionKey.OP_ACCEPT); //會被阻塞的!!!!! // st.selector.wakeup(); //功能是讓 selector的select()方法,立刻返回,不阻塞! // System.out.println("aaaaa"); // } catch (ClosedChannelException e) { // e.printStackTrace(); // }}//無論 serversocket socket 都復用這個方法private SelectorThread next() {int index = xid.incrementAndGet() % selectorThreadArray.length; //輪詢就會很尷尬,傾斜return selectorThreadArray[index];}private SelectorThread nextV2() {int index = xid.incrementAndGet() % (selectorThreadArray.length - 1); //輪詢就會很尷尬,傾斜return selectorThreadArray[index + 1];}private SelectorThread nextV3() {int index = xid.incrementAndGet() % selectorThreadGroup.selectorThreadArray.length; //動用worker的線程分配return selectorThreadGroup.selectorThreadArray[index];} }SelectorThread.java
package com.bjmashibing.system.io.testreactor;import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue;/*** @author: 馬士兵教育* @create: 2020-06-21 20:14*/ public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable {// 每線程對應一個selector,// 多線程情況下,該主機,該程序的并發客戶端被分配到多個selector上//注意,每個客戶端,只綁定到其中一個selector//其實不會有交互問題Selector selector = null;// LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>();LinkedBlockingQueue<Channel> linkedBlockingQueue = get(); //lbq 在接口或者類中是固定使用方式邏輯寫死了。你需要是lbq每個線程持有自己的獨立對象SelectorThreadGroup selectorThreadGroup;@Overrideprotected LinkedBlockingQueue<Channel> initialValue() {return new LinkedBlockingQueue<>();//你要豐富的是這里! pool。。。}SelectorThread(SelectorThreadGroup selectorThreadGroup) {try {this.selectorThreadGroup = selectorThreadGroup;selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//Loopwhile (true) {try {//1,select()int nums = selector.select(); //此方法執行阻塞選擇操作。只有在選擇了至少一個通道、調用了此選擇器的wakeup方法或中斷了當前線程(以最先出現的方式)之后,它才會返回。//2,處理selectedKeysSystem.out.println("In run(), selector.select() 獲取到的keys數量為: " + nums);if (nums > 0) {Set<SelectionKey> keys = selector.selectedKeys();System.out.println("In run(), keys is: " + keys);Iterator<SelectionKey> iter = keys.iterator();int loop = 0;while (iter.hasNext()) { //線程處理的過程System.out.println("In run(), loop is: " + loop++);SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) { //復雜,接受客戶端的過程(接收之后,要注冊,多線程下,新的客戶端,注冊到那里呢?)acceptHandler(key);} else if (key.isReadable()) {readHander(key);} else if (key.isWritable()) {}}}//3,處理一些task : listen clientSystem.out.println("處理一些task");if (!linkedBlockingQueue.isEmpty()) { //隊列是個啥東西啊? 堆里的對象,線程的棧是獨立,堆是共享的System.out.println("linkedBlockingQueue 不是空的,包含 " + linkedBlockingQueue);//只有方法的邏輯,本地變量是線程隔離的Channel c = linkedBlockingQueue.take();if (c instanceof ServerSocketChannel) {ServerSocketChannel server = (ServerSocketChannel) c;server.register(selector, SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName() + " register server listen");} else if (c instanceof SocketChannel) {SocketChannel client = (SocketChannel) c;ByteBuffer buffer = ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName() + " register client: " + client.getRemoteAddress());} else {System.out.println("c 既不是server,也不是client,c=" + c);}} else {System.out.println("linkedBlockingQueue 是空的");}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} // catch (InterruptedException e) { // e.printStackTrace(); // }}}private void readHander(SelectionKey key) {System.out.println(Thread.currentThread().getName() + " readHander......");ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel client = (SocketChannel) key.channel();buffer.clear();while (true) {try {int num = client.read(buffer);if (num > 0) {buffer.flip(); //將讀到的內容翻轉,然后直接寫出while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (num == 0) {break;} else if (num < 0) {//客戶端斷開了System.out.println("client: " + client.getRemoteAddress() + " closed......");key.cancel();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName() + "::: acceptHandler Begin");ServerSocketChannel server = (ServerSocketChannel) key.channel();try {SocketChannel clientChannel = server.accept();clientChannel.configureBlocking(false);//選擇一個selector,并調用wakeup()完成注冊selectorThreadGroup.nextSelectorV3(clientChannel);System.out.println(Thread.currentThread().getName() + "::: acceptHandler Finish");} catch (IOException e) {e.printStackTrace();}}public void setWorker(SelectorThreadGroup stgWorker) {this.selectorThreadGroup = stgWorker;} }用nc命令連接
截取了一些輸出
"C:\Program Files\Java\jdk-11.0.3\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.2\lib\idea_rt.jar=7876:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.2\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\Bug\mashibing\bjmashbing-sysio\target\classes;C:\Users\Bug\.m2\repository\junit\junit\4.12\junit-4.12.jar;C:\Users\Bug\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Bug\.m2\repository\io\netty\netty-all\4.1.49.Final\netty-all-4.1.49.Final.jar;C:\Users\Bug\.m2\repository\org\scala-sbt\test-interface\1.0\test-interface-1.0.jar com.bjmashibing.system.io.testreactor.MainThread In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:9999]] In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888]] Thread-1 register server listen In run(), selector.select() 獲取到的keys數量為: 0 處理一些task Thread-2 register server listen linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:6666]] Thread-0 register server listen In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:7777]] Thread-1 register server listen In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 是空的 linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7940]] Thread-4 register client: /192.168.111.1:7940 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955]] Thread-5 register client: /192.168.111.1:7955 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955], selector=sun.nio.ch.WindowsSelectorImpl@7912fa80, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-5 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955], selector=sun.nio.ch.WindowsSelectorImpl@7912fa80, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-5 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7940], selector=sun.nio.ch.WindowsSelectorImpl@24d0d04, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-4 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7961]] Thread-3 register client: /192.168.111.1:7961 In run(), selector.select() 獲取到的keys數量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7961], selector=sun.nio.ch.WindowsSelectorImpl@67ea4302, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-3 readHander...... 處理一些task linkedBlockingQueue 是空的總結
以上是生活随笔為你收集整理的网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程与高并发(一):单机高并发应该掌握
- 下一篇: 多线程与高并发(二):解析自旋锁CAS操