[Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析
目錄
- 14-啟服務端網絡監聽連接NIOServerCnxnFactory
- 14.1 簡介
- 14.2 主從Reactor網絡IO模型main-sub reactor
- 14.3 NIOServerCnxnFactory 的初始化配置方法
- 14.5 AcceptThread
- 14.5.1 AcceptThread類型源碼
- pauseAccept暫停接收
- 14.6 SelectorThread
- 14.6.1 SelectorThread類型源碼
- 14.6.2 處理被接受的連接請求processAcceptedConnections
- 14.6.2.1看下NIOServerCnxn的構造器
- 14.6.3 更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests
- 14.6.4 處理IO事件
- 14.6.4.1 IOWorkRequest
- 14.6.4.2 NIOServerCnxn的doIO
- 14.6.4.3 處理連接請求processConnectRequest
- 14.6.4.4 跟蹤會話過期時間
- 14.6.4.5 提交會話
- 14.6.4.6 提交請求處理
- 14.7 ConnectionExpirerThread
- 14.8 NettyServerCnxnFactory
14-啟服務端網絡監聽連接NIOServerCnxnFactory
14.1 簡介
回到QuorumPeer的start()方法,數據恢復之后開始進行網絡交互
的startServerCnxnFactory();
繼續往下看:
private void startServerCnxnFactory() {if (cnxnFactory != null) {cnxnFactory.start();}if (secureCnxnFactory != null) {secureCnxnFactory.start();} }在QuorumPeerMain類型中的runFromConfig方法中
調用ServerCnxnFactory.createFactory();方法創建連接工廠 在創建工廠對象方法中通過判斷JVM參數zookeeper.serverCnxnFactory工廠類型配置參數是否存在,不存在的話將會默認NIOServerCnxnFactory類型
創建連接對象的代碼如下:
QuorumPeerMain類型的runFromConfig方法中的調用
ServerCnxnFactory的createFactory根據參數類型創建對象
public static ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}try {ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();LOG.info("Using {} as server connection factory", serverCnxnFactoryName);return serverCnxnFactory;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);throw ioe;}}連接工廠啟動系統提供了屬性來指定連接工廠對象默認情況下會使用
NIOServerCnxnFactory -JDK自帶的NIO工具如果指定了屬性zookeeper.serverCnxnFactory比如NettyServerCnxnFactory -Netty的NIO工具類型則在上面初始化的時候會加載對應類型,根據我們客戶端配置的clientPort來啟用對應端口提供查詢功能。
這個通信實現先看看別人怎么說
Zookeeper作為一個服務器,自然要與客戶端進行網絡通信,如何高效的與客戶端進行通信, ZooKeeper中使用ServerCnxnFactory管理與客戶端的連接,其有兩個實現,
- 一個是NIOServerCnxnFactory,使用Java原生NIO實現;
- 一個是NettyServerCnxnFactory,使用netty實現;
使用ServerCnxn代表一個客戶端與服務端的連接.
ServerCnxnFactory
注:下文或注釋中的連接就是客戶端發起的TCP連接,也即SocketChannel類
ZooKeeper可以通過設置系統屬性zookeeper.serverCnxnFactory配置ServerCnxnFactory的實現類,默認使用NIOServerCnxnFactory
NIOServerCnxnFactory
14.2 主從Reactor網絡IO模型main-sub reactor
一般使用Java NIO的思路為使用1個線程組監聽OP_ACCEPT事件,負責處理客戶端的連接;使用1個線程組監聽客戶端連接的OP_READ和OP_WRITE事件,處理IO事件(netty便是如此實現).
但ZooKeeper并不是如此劃分線程功能的,NIOServerCnxnFactory啟動時會啟動四類線程
- accept thread:該線程接收來自客戶端的連接,并將其分配給selector thread(啟動一個線程)
- selector thread:該線程執行select(),由于在處理大量連接時,select()會成為性能瓶頸,因此啟動多個selector thread,使用系統屬性zookeeper.nio.numSelectorThreads配置該類線程數,默認個數為 核心數/2 ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄√核心數/2(至少一個)
- worker thread:該線程執行基本的套接字讀寫,使用系統屬性zookeeper.nio.numWorkerThreads配置該類線程數,默認為核心數?2核心數?2.如果該類線程數為0,則另外啟動一線程進行IO處理,見下文worker thread介紹
- connection expiration thread:若連接上的session已過期,則關閉該連接
可以看出,ZooKeeper中對線程需要處理的工作做了更細的拆分.其認為在有大量客戶端連接的情況下,selector.select()會成為性能瓶頸,因此其將selector.select()拆分出來,交由selector thread處理.
線程間通信
上述各類線程之間通過同步隊列通信.這一小節我們看下各類線程通信使用哪幾個同步隊列?各有什么用處
- SelectorThread.acceptedQueue
acceptedQueue是LinkedBlockingQueue類型的,在selector thread中.其中包含了accept thread接收的客戶端連接,由selector thread負責將客戶端連接注冊到selector上,監聽OP_READ和OP_WRITE. - SelectorThread.updateQueue
updateQueue和acceptedQueue一樣,也是LinkedBlockingQueue類型的,在selector thread中.但是要說明白該隊列的作用,就要對Java NIO的實現非常了解了.
Java NIO使用epoll系統調用,且是水平觸發,也即若selector.select()發現socketChannel中有事件發生,比如有數據可讀,只要沒有將這些數據從socketChannel讀取完畢,下一次selector.select()還是會檢測到有事件發生,直至數據被讀取完畢.
ZooKeeper一直認為selector.select()是性能的瓶頸,為了提高selector.select()的性能,避免上述水平觸發模式的缺陷,ZooKeeper在處理IO的過程中,會讓socketChannel不再監聽OP_READ和OP_WRITE事件,這樣就可以減輕selector.select()的負擔.
此時便出現一個問題,IO處理完畢后,如何讓socketChannel再監聽OP_READ和OP_WRITE事件?
有的小伙伴可能認為這件事情非常容易,worker thread處理IO結束后,直接調用key.interestOps(OP_READ & OP_WRITE)不就可以了嗎?事情并沒有這簡單,是因為selector.select()是在selector thread中執行的,若在selector.select()的過程中,worker thread調用了key.interestOps(OP_READ & OP_WRITE),可能會阻塞selector.select().ZooKeeper為了追求性能的極致,設計為由selector thread調用key.interestOps(OP_READ & OP_WRITE),因此worker thread就需在IO處理完畢后告訴selector thread該socketChannel可以去監聽OP_READ和OP_WRITE事件了,
updateQueue就是存放那些需要監聽OP_READ和OP_WRITE事件的 - socketChannel.NIOServerCnxn.outgoingBuffers
outgoingBuffers存放待發送給客戶端的響應數據.
注:個人推測,既然key.interestOps(OP_READ & OP_WRITE)會阻塞selector.select(),那么accepted.register(selector, SelectionKey.OP_READ)也會阻塞selector.select(),因此接收到的客戶端連接注冊到selector上也要在selector thread上執行,這也是acceptedQueue存在的理由
了解了線程IO模型我們來看一下啟動的源碼:
NIOServerCnxnFactory的配置方法,這個是在Zookeeper啟動時前面加載配置信息時候會調用這個方法:
14.3 NIOServerCnxnFactory 的初始化配置方法
@Override public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {if (secure) {throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");}configureSaslLogin();maxClientCnxns = maxcc;initMaxCnxns();sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);// We also use the sessionlessCnxnTimeout as expiring interval for// cnxnExpiryQueue. These don't need to be the same, but the expiring// interval passed into the ExpiryQueue() constructor below should be// less than or equal to the timeout.cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);expirerThread = new ConnectionExpirerThread();int numCores = Runtime.getRuntime().availableProcessors();// 32 cores sweet spot seems to be 4 selector threadsnumSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,Math.max((int) Math.sqrt((float) numCores / 2), 1));if (numSelectorThreads < 1) {throw new IOException("numSelectorThreads must be at least 1");}numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);String logMsg = "Configuring NIO connection handler with "+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "+ numSelectorThreads + " selector thread(s), "+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));LOG.info(logMsg);for (int i = 0; i < numSelectorThreads; ++i) {selectorThreads.add(new SelectorThread(i));}listenBacklog = backlog; //創建socket對象,獲取文件描述符this.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);LOG.info("binding to port {}", addr);if (listenBacklog == -1) {ss.socket().bind(addr);} else {ss.socket().bind(addr, listenBacklog);}ss.configureBlocking(false);acceptThread = new AcceptThread(ss, addr, selectorThreads); }## 14.4 NIOServerCnxnFactory的啟動方法 @Override public void start() {stopped = false; //啟動工作線程if (workerPool == null) {workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);} //啟動selector線程for (SelectorThread thread : selectorThreads) {if (thread.getState() == Thread.State.NEW) {thread.start();}} //啟動accept線程// ensure thread is started once and only onceif (acceptThread.getState() == Thread.State.NEW) {acceptThread.start();} //啟動過期處理線程if (expirerThread.getState() == Thread.State.NEW) {expirerThread.start();} }14.5 AcceptThread
14.5.1 AcceptThread類型源碼
accept thread的源碼如下:先全局看下:
private class AcceptThread extends AbstractSelectThread {private final ServerSocketChannel acceptSocket;private final SelectionKey acceptKey;private final RateLogger acceptErrorLogger = new RateLogger(LOG);private final Collection<SelectorThread> selectorThreads;private Iterator<SelectorThread> selectorIterator;private volatile boolean reconfiguring = false;public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {super("NIOServerCxnFactory.AcceptThread:" + addr);this.acceptSocket = ss; //向通道注冊接收事件this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));selectorIterator = this.selectorThreads.iterator();} public void run() {try {while (!stopped && !acceptSocket.socket().isClosed()) {try { //未關閉則循環執行select方法select();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring unexpected exception", e);}}} finally {closeSelector();// This will wake up the selector threads, and tell the// worker thread pool to begin shutdown.if (!reconfiguring) {NIOServerCnxnFactory.this.stop();}LOG.info("accept thread exitted run method");} }private void select() {try { //阻塞到至少有一個通道在你注冊的事件上就緒了。selector.select(); //一旦調用select()方法,并且返回值不為0時,則 可以通過調用Selector的selectedKeys()方法來訪問已選擇鍵集合Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();if (!key.isValid()) {continue;} //測試此鍵的通道是否已準備好接受新的套接字連接。if (key.isAcceptable()) {if (!doAccept()) {// If unable to pull a new connection off the accept// queue, pause accepting to give us time to free// up file descriptors and so the accept thread// doesn't spin in a tight loop.pauseAccept(10);}} else {LOG.warn("Unexpected ops in accept select {}", key.readyOps());}}} catch (IOException e) {LOG.warn("Ignoring IOException while selecting", e);} }private boolean doAccept() {boolean accepted = false;SocketChannel sc = null;try { // accept() 方法監聽新進來的連接。當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。因此, accept()方法會一直阻塞到有新連接到達sc = acceptSocket.accept();accepted = true; //當前連接數超過配置最大連接數量則拒絕接受新進連接if (limitTotalNumberOfCnxns()) {throw new IOException("Too many connections max allowed is " + maxCnxns);} //獲取當前連接的地址InetAddress ia = sc.socket().getInetAddress();int cnxncount = getClientCnxnCount(ia); //單個客戶端鏈接數量超過上限if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);}LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); //可以設置 SocketChannel 為非阻塞模式(non-blocking mode).設置之后,就可以在異步模式下調用connect(), read() 和write()了。sc.configureBlocking(false);// Round-robin assign this connection to a selector threadif (!selectorIterator.hasNext()) {selectorIterator = selectorThreads.iterator();} //獲取當前的Selector線程SelectorThread selectorThread = selectorIterator.next(); //調用選擇線程的接收請求方法*將新接受的連接放到等待添加的隊列中。 if (!selectorThread.addAcceptedConnection(sc)) {throw new IOException("Unable to add connection to selector queue"+ (stopped ? " (shutdown in progress)" : ""));}acceptErrorLogger.flush();} catch (IOException e) {// accept, maxClientCnxns, configureBlockingServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());fastCloseSock(sc);}return accepted;}}pauseAccept暫停接收
//如果無法將新連接從接受隊列中拉出,則暫停接受以給我們時間釋放文件描述符,這樣接受線程就不會在一個緊密的循環中旋轉。 private void pauseAccept(long millisecs) {acceptKey.interestOps(0);try {selector.select(millisecs);} catch (IOException e) {// ignore} finally {acceptKey.interestOps(SelectionKey.OP_ACCEPT);} }14.6 SelectorThread
14.6.1 SelectorThread類型源碼
SelectorThread
SelectorThread從AcceptThread接收新接收的連接,并負責選擇連接之間的I/O準備情況。這個線程是唯一一個對選擇器執行非線程安全或潛在阻塞調用的線程(注冊新連接和讀寫興趣操作)。將一個連接分配給一個SelectorThread是永久的,并且只有一個SelectorThread會與這個連接交互。有1-N個SelectorThreads,連接平均分配在SelectorThreads之間。
如果有一個工作線程池,當一個連接有I/O來執行時,SelectorThread通過清除它感興趣的操作將它從選擇中刪除,并安排I/O由工作線程處理。當工作完成時,連接被放置在就緒隊列上,以恢復其感興趣的操作并恢復選擇。如果沒有工作線程池,SelectorThread將直接執行I/O操作。
針對SelectorThread我們一共看3個操作,這3個操作通過while來做無限循環,當stop變量設置為true時候終止循環,
在while無限循環中, 線程的主循環在連接上選擇()并分派準備好的I/O工作請求,然后注冊所有等待的新接受的連接并更新隊列上的任何感興趣的操作。
- select();
在連接上選擇()并分派準備好的I/O工作請求 - processAcceptedConnections();
處理accept線程新分派的連接, // (1)將新連接注冊到selector上;(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中 - processInterestOpsUpdateRequests();
更新updateQueue中連接的監聽事件
14.6.2 處理被接受的連接請求processAcceptedConnections
接下來詳細看下processAcceptedConnections如何處理可以接收的連接的:
private void processAcceptedConnections() {SocketChannel accepted;while (!stopped && (accepted = acceptedQueue.poll()) != null) {SelectionKey key = null;try { //為SocketChannel注冊OP_READ事件用來接收讀請求key = accepted.register(selector, SelectionKey.OP_READ);NIOServerCnxn cnxn = createConnection(accepted, key, this); //將給定對象附加到此鍵上。在處理連接,讀取IO數據的時候都會使用到此對象來操作key.attach(cnxn); //將同一IP客戶端的連接緩存至NIOServerCnxnFactory類型中的ipMap對象中用于限制同一客戶端的連接數量,如果同一個客戶端連接數量過多則拋出Too many connections錯誤,拒絕accept連接addCnxn(cnxn);} catch (IOException e) {// register, createConnectioncleanupSelectionKey(key);fastCloseSock(accepted);}} }為SocketChannel注冊OP_READ事件用來接收讀請求之后開始創建連接對象如下:
創建連接如下:
14.6.2.1看下NIOServerCnxn的構造器
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException {super(zk);this.sock = sock;this.sk = sk;this.factory = factory;this.selectorThread = selectorThread;if (this.factory.login != null) {this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);} //關閉Nagle算法算法,不需要緩存延遲發送sock.socket().setTcpNoDelay(true);/* set socket ling SO_LINGER還有一個作用就是用來減少TIME_WAIT套接字的數量。在設置SO_LINGER選項時,指定等待時間為0,此時調用主動關閉時不會發送FIN來結束連接,而是直接將連接設置為CLOSE狀態,清除套接字中的發送和接收緩沖區,直接對對端發送RST包//第一個參數為是否開啟SoLinger,第二個參數為如果開啟SoLinger持續時間sock.socket().setSoLinger(false, -1);InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress(); //緩存遠程ip地址addAuthInfo(new Id("ip", addr.getHostAddress()));this.sessionTimeout = factory.sessionlessCnxnTimeout; }14.6.3 更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests
processInterestOpsUpdateRequests()方法:
前面我們說過處理IO事件時候會停止訂閱事件,IO處理完畢之后則獲取updateQueue中連接的監聽事件來訂閱interestOps
14.6.4 處理IO事件
14.6.4.1 IOWorkRequest
IOWorkRequest處理IO事件發生時機當SocketChannel上有數據可讀時,worker thread調用NIOServerCnxn.doIO()進行讀操作
粘包拆包問題
處理讀事件比較麻煩的問題就是通過TCP發送的報文會出現粘包拆包問題,Zookeeper為了解決此問題,在設計通信協議時將報文分為3個部分:
- 請求頭和請求體的長度(4個字節)
- 請求頭
- 請求體
注:(1)請求頭和請求體也細分為更小的部分,但在此不做深入研究,只需知道請求的前4個字節是請求頭和請求體的長度即可.(2)將請求頭和請求體稱之為payload
在報文頭增加了4個字節的長度字段,表示整個報文除長度字段之外的長度.服務端可根據該長度將粘包拆包的報文分離或組合為完整的報文.NIOServerCnxn讀取數據流程如下:
NIOServerCnxn中有兩個屬性,一個是lenBuffer,容量為4個字節,用于讀取長度信息.一個是incomingBuffer,其初始化時即為lenBuffer,但是讀取長度信息后,就為incomingBuffer分配對應的空間用于讀取payload
根據請求報文的長度分配incomingBuffer的大小
將讀到的字節存放在incomingBuffer中,直至讀滿(由于第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
處理報文
14.6.4.2 NIOServerCnxn的doIO
可以參考這個博客:
https://blog.csdn.net/jpf254/article/details/80792086
ZooKeeperServer處理連接請求:processConnectRequest
可以參考文章:
- https://www.cnblogs.com/Benjious/p/11462064.html
session: - https://my.oschina.net/anxiaole/blog/3217373
- https://segmentfault.com/a/1190000022193168
14.6.4.3 處理連接請求processConnectRequest
調用代碼如下:
zkServer.processConnectRequest(this, incomingBuffer);
14.6.4.4 跟蹤會話過期時間
//跟蹤session的過期時間 @Override public synchronized boolean trackSession(long id, int sessionTimeout) {boolean added = false;SessionImpl session = sessionsById.get(id);if (session == null) {session = new SessionImpl(id, sessionTimeout);}// findbugs2.0.3 complains about get after put.// long term strategy would be use computeIfAbsent after JDK 1.8 //將sessionid與session對象映射關系存入本地sessionsById Map緩存對象中SessionImpl existedSession = sessionsById.putIfAbsent(id, session);if (existedSession != null) {session = existedSession;} else {added = true;LOG.debug("Adding session 0x{}", Long.toHexString(id));}if (LOG.isTraceEnabled()) {String actionStr = added ? "Adding" : "Existing";ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"SessionTrackerImpl --- " + actionStr+ " session 0x" + Long.toHexString(id) + " " + sessionTimeout);}updateSessionExpiry(session, sessionTimeout);return added; } //將對應session對象的過期時間存入sessionExpiryQueue private void updateSessionExpiry(SessionImpl s, int timeout) {logTraceTouchSession(s.sessionId, timeout, "");sessionExpiryQueue.update(s, timeout); } sessionExpiryQueued的update方法,以tickTime為單位將過期時間放入桶集合中 public Long update(E elem, int timeout) {Long prevExpiryTime = elemMap.get(elem);long now = Time.currentElapsedTime();Long newExpiryTime = roundToNextInterval(now + timeout);if (newExpiryTime.equals(prevExpiryTime)) {// No change, so nothing to updatereturn null;}// First add the elem to the new expiry time bucket in expiryMap. // expiryMap為過期時間映射元素集合的Map,key為單位過期時間,value為當前過期時間對應的集合Set<E> set = expiryMap.get(newExpiryTime);if (set == null) {// Construct a ConcurrentHashSet using a ConcurrentHashMapset = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());// Put the new set in the map, but only if another thread// hasn't beaten us to itSet<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);if (existingSet != null) {set = existingSet;}} //存入當前過期時間對應的集合中set.add(elem);// Map the elem to the new expiry time. If a different previous// mapping was present, clean up the previous expiry bucket. //prevExpiryTime = elemMap.put(elem, newExpiryTime); //如果之前的過期時間對應的元素存在則移除老數據保證數據正常刷新if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {Set<E> prevSet = expiryMap.get(prevExpiryTime);if (prevSet != null) {prevSet.remove(elem);}}return newExpiryTime; }14.6.4.5 提交會話
ZookeeperServer提交session請求submitRequest public void submitRequest(Request si) {enqueueRequest(si); }public void enqueueRequest(Request si) {if (requestThrottler == null) {synchronized (this) {try {// Since all requests are passed to the request// processor it should wait for setting up the request// processor chain. The state will be updated to RUNNING// after the setup.while (state == State.INITIAL) {wait(1000);}} catch (InterruptedException e) {LOG.warn("Unexpected interruption", e);}if (requestThrottler == null) {throw new RuntimeException("Not started");}}}requestThrottler.submitRequest(si); }public void submitRequest(Request request) {if (stopping) {LOG.debug("Shutdown in progress. Request cannot be processed");dropRequest(request);} else {submittedRequests.add(request);} }submittedRequests為LinkedBlockingQueue類型的請求隊列
那被放到隊列中的請求接下來是如何處理呢:
Zookeeper使用了隊列+異步的模型:請求鏈如下:
- 提交請求:RequestThrottler.run()>Zookeeper.submitRequestNow(Request si)>
- 預處理請求:PrepRequestProcessor.processRequest(Request request)
- 請求持久化:SyncReuqestProcessor .run
- 處理請求的業務:FinalRequestProcessor. processTxn
14.6.4.6 提交請求處理
具體的請求詳情到后面再看到這里我們就看完了ZookeeperServer中的readConnectRequest方法
接下來可以看下Zookeeper是如何處理度請求的
Zookeeper的讀請求處理NIOServerCnxn類中的readRequest()
調用了ZookeeperServer中的processPacket方法
如何處理數據包呢可以看如下代碼:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {// We have the request, now process and setup for next //使用jute反序列化對象InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");// Need to increase the outstanding request count first, otherwise// there might be a race condition that it enabled recv after// processing request and then disabled when check throttling.//// Be aware that we're actually checking the global outstanding// request before this request.//// It's fine if the IOException thrown before we decrease the count// in cnxn, since it will close the cnxn anyway.cnxn.incrOutstandingAndCheckThrottle(h);// Through the magic of byte buffers, txn will not be// pointing// to the start of the txn 從當前待讀取位置生成新的只讀BufferincomingBuffer = incomingBuffer.slice();//如果當前請求是認證授權請求if (h.getType() == OpCode.auth) {LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());AuthPacket authPacket = new AuthPacket();ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);String scheme = authPacket.getScheme();ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);Code authReturn = KeeperException.Code.AUTHFAILED;if (ap != null) {try {// handleAuthentication may close the connection, to allow the client to choose// a different server to connect to. 處理客戶端認證authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn),authPacket.getAuth());} catch (RuntimeException e) {LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);authReturn = KeeperException.Code.AUTHFAILED;}} //認證成功則返回認證成功的消息 if (authReturn == KeeperException.Code.OK) {LOG.debug("Authentication succeeded for scheme: {}", scheme);LOG.info("auth success {}", cnxn.getRemoteSocketAddress());ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());cnxn.sendResponse(rh, null, null);} else { //認證失敗返回認證失敗的消息,同時關閉連接if (ap == null) {LOG.warn("No authentication provider for scheme: {} has {}",scheme,ProviderRegistry.listProviders());} else {LOG.warn("Authentication failed for scheme: {}", scheme);}// send a response...ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());cnxn.sendResponse(rh, null, null);// ... and close connectioncnxn.sendBuffer(ServerCnxnFactory.closeConn);cnxn.disableRecv();}return;} else if (h.getType() == OpCode.sasl) { //處理sasl認證processSasl(incomingBuffer, cnxn, h);} else { //處理請求if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { //如果是未認證請求則直接關閉連接ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());cnxn.sendResponse(replyHeader, null, "response");cnxn.sendCloseSession();cnxn.disableRecv();} else { //處理請求Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());int length = incomingBuffer.limit(); //檢查是否是大請求,通過參數zookeeper.largeRequestThreshold配置, if (isLargeRequest(length)) {// checkRequestSize will throw IOException if request is rejected 如果是大數量傳輸則判斷最大請求字節大小防止JVM堆內存溢出,這個默認大小是100KB通過參數zookeeper.largeRequestMaxBytes配置checkRequestSizeWhenMessageReceived(length);si.setLargeRequestSize(length);}si.setOwner(ServerCnxn.me); //提交包請求,這個請求的處理與連接請求發起的處理是一樣的。具體細節可以看請求代碼submitRequest(si);}} }在accept thread 的run()中,其執行selector.select(),并調用doAccept()接收客戶端連接,將其添加至SelectorThread.acceptedQueue()
selector thread@Overridepublic void run() {try {while (!stopped) {try {//1.調用select()讀取就緒的IO事件,交由worker thread處理select();//2.處理accept線程新分派的連接,// (1)將新連接注冊到selector上;(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中processAcceptedConnections();//3.更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring unexpected exception", e);}}//執行清理操作,關閉所有在selector上等待的連接...} finally {...//清理工作}}在selector thread的run()中,主要執行3件事情
調用select()讀取就緒的IO事件,交由worker thread處理(在交由worker thread 處理之前會調用key.interestOps(0))
處理accept線程新分派的連接,
(1)將新連接注冊到selector上;
(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中
更新updateQueue中連接的監聽事件
worker thread
ZooKeeper中通過WorkerService管理一組worker thread線程,其有兩種管理模式:
| 可指定線程模式 | 將任務指定由某一線程完成,若一系列任務需有序完成,可使用此種模式,將需按序完成的任務指定到同一線程 | 同一會話下的一系列請求 | 生成N個ExecutorService,每個ExecutorService只包含一個線程 |
| 不可指定線程模式 | 任務提交后,由WorkerService隨機指定線程完成,任務之間無順序要求則使用該模式 | 執行網絡IO | 生成1個ExecutorService,其中有N個線程 |
由于各連接的網絡IO任務之間無順序要求,NIOServerCnxnFactory使用的WorkerService采用不可指定線程模式.
/*** Schedule work to be done by the thread assigned to this id. Thread* assignment is a single mod operation on the number of threads. If a* worker thread pool is not being used, work is done directly by* this thread.* 根據id取模將workRequest分配給對應的線程.如果沒有使用worker thread* (即numWorkerThreads=0),則啟動ScheduledWorkRequest線程完成任務,當前* 線程阻塞到任務完成.** @param workRequest 待處理的IO請求* @param id 根據此值選擇使用哪一個thread處理workRequest*/public void schedule(WorkRequest workRequest, long id) {if (stopped) {workRequest.cleanup();return;}ScheduledWorkRequest scheduledWorkRequest =new ScheduledWorkRequest(workRequest);// If we have a worker thread pool, use that; // otherwise, do the work directly.int size = workers.size();if (size > 0) {try {// make sure to map negative ids as well to [0, size-1]int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} catch (RejectedExecutionException e) {LOG.warn("ExecutorService rejected execution", e);workRequest.cleanup();}} else {// When there is no worker thread pool, do the work directly// and wait for its completionscheduledWorkRequest.start();try {scheduledWorkRequest.join();} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);Thread.currentThread().interrupt();}}}在上文介紹worker thread時,說”如果該類線程數為0,則使用selector thread 直接執行IO讀寫”,但從上面源碼可以看出,若worker thread個數為0,為每個網絡IO啟動一個線程去執行,且主線程阻塞都到網絡IO執行完畢,這簡直是浪費資源,既然要阻塞到網絡IO執行完畢,為何還要單獨啟動一個線程?個人認為可能是遺留代碼或為日后擴展做準備,才會有如此不合理的代碼.因此一定不能將worker thread的個數設置為0.
我們繼續看ScheduledWorkRequest是如何處理網絡IO的
除去一些健壯性代碼,主要完成3件事:
NIOServerCnxn.doIO()方法,通知NIOServerCnxn連接對象進行IO讀寫及處理
更新該連接的過期時間
網絡IO已處理完畢,修改selectable標志位和將socketChannel添加至selector thread的updateQueue中,其作用已在前文說明.
在selector thread處理accept thread接收的連接時,除了將新連接注冊到selector上之外,還將連接包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中.NIOServerCnxn是對客戶端連接的封裝,worker thread中調用NIOServerCnxn.doIO()處理網絡IO.詳見ZooKeeper-客戶端連接ServerCnxn之NIOServerCnxn
14.7 ConnectionExpirerThread
此線程用于清理過期的連接,主要方法如下:
@Overridepublic void run() {try {while (!stopped) {long waitTime = cnxnExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {conn.close();}}} catch (InterruptedException e) {LOG.info("ConnnectionExpirerThread interrupted");}}此線程的工作原理詳見Zookeeper-連接和會話的過期清理策略(ExpiryQueue)
14.8 NettyServerCnxnFactory
前面詳細說了NIO模式的連接器下面可以比較下他們兩個的差異:
NettyServerCnxnFactory使用netty進行網絡IO,但是其使用netty3.*版本,與4.*版本的實現思路雖然一致,但API差別很大,為此不再深入研究NettyServerCnxnFactory,簡單介紹下其與NIOServerCnxnFactory的不同.
| accept事件 | 啟動1個accept thread | boss group處理accept事件,默認啟動1個線程 |
| select() | 啟動select thread | 添加handler時調用addLast(EventExecutorGroup, ChannelHandler…),則handler處理IO事件會在EventExecutorGroup中進行 |
| 網絡IO | 啟動worker thread | 啟動work group處理網絡IO,默認啟動核心數?2核心數?2個線程 |
| 處理讀事件 | 在worker thread中調用NIOServerCnxn.doIO()處理 | 在handler中處理讀事件 |
| 粘包拆包 | 通過lenBuffer和incomingBuffer解決該問題,代碼很復雜 | 插入處理粘包拆包的handler即可 |
| 處理寫事件 | 執行FinalRP.processRequest()的線程與worker thread通過NIOServerCnxn.outgoingBuffers進行通信,由worker thread批量寫 | netty天生支持異步寫,若當前線程為EventLoop線程,則將待寫入數據存放到ChannelOutboundBuffer中.若當前線程不是EventLoop線程,構造寫任務添加至EventLoop任務隊列中 |
| 直接內存 | 使用ThreadLocal的直接內存 | 記不太清楚netty中如何使用直接內存了,但netty支持直接內存,且使用較為方便 |
| 處理連接關閉 | 啟動connection expiration thread管理連接 | 在handler中處理連接 |
注:上述區別是將netty4.版本與NIOServerCnxnFactory的對比,由于ZooKeeper使用netty3.,因此其NettyServerCnxnFactory中存在一些無用代碼,比如處理粘包拆包的代碼
從上述的比較中可以看出使用netty處理網絡IO比基于Java NIO自己編碼方便太多了,netty大法好~~
總結
總結下線程通信所用的三個隊列:
- SelectorThread.acceptedQueue:accept thread和selector thread通信
- SelectorThread.updateQueue:worker thread和selector thread通信
- NIOServerCnxn.outgoingBuffers:worker thread和請求處理線程通信
總結
以上是生活随笔為你收集整理的[Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LED设计规范
- 下一篇: 网站被攻击了,接入CDN防护,源IP是否