Zookeeper-watcher机制源码分析(一)
Watcher的基本流程
ZooKeeper 的 Watcher 機制,總的來說可以分為三個過程:客戶端注冊 Watcher、服務器處理 Watcher 和客戶端回調 Watcher
客戶端注冊watcher有3種方式,getData、exists、getChildren;以如下代碼為例來分析整個觸發機制的原理
| ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){ public void processor(WatchedEvent event){ ?System.out.println(“event.type”); } }); ? zookeeper.create(“/mic”,”0”.getByte(),ZooDefs.Ids.?OPEN_ACL_UNSAFE,CreateModel.?PERSISTENT); //創建節點 ? zookeeper.exists(“/mic”,true); //注冊監聽 ? zookeeper.setData(“/mic”, “1”.getByte(),-1) ;?//修改節點的值觸發監聽 ? |
?
ZooKeeper?API的初始化過程
| ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){ public void processor(WatchedEvent event){ ?System.out.println(“event.type”); } }); |
在創建一個?ZooKeeper 客戶端對象實例時,我們通過new?Watcher()向構造方法中傳入一個默認的 Watcher,?這個?Watcher 將作為整個 ZooKeeper會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中;代碼如下
| ????public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ????????????boolean canBeReadOnly, HostProvider aHostProvider, ????????????ZKClientConfig clientConfig) throws IOException { ????????LOG.info("Initiating client connection, connectString=" + connectString ????????????????+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); ? ????????if (clientConfig == null) { ????????????clientConfig = new ZKClientConfig(); ????????} ????????this.clientConfig = clientConfig; ????????watchManager = defaultWatchManager(); ????????watchManager.defaultWatcher = watcher; ?--在這里將watcher設置到ZKWatchManager ????????ConnectStringParser connectStringParser = new ConnectStringParser( ????????????????connectString); ????????hostProvider = aHostProvider; ????????--初始化了ClientCnxn,并且調用cnxn.start()方法 ????????cnxn = new ClientCnxn(connectStringParser.getChrootPath(), ????????????????hostProvider, sessionTimeout, this, watchManager, ????????????????getClientCnxnSocket(), canBeReadOnly); ????????cnxn.start(); ????} |
ClientCnxn:是Zookeeper客戶端和Zookeeper服務器端進行通信和事件通知處理的主要類,它內部包含兩個類,
1. SendThread ?:負責客戶端和服務器端的數據通信,?也包括事件信息的傳輸
2. EventThread?: ?主要在客戶端回調注冊的Watchers進行通知處理
ClientCnxn初始化
| ??public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ????????????ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, ????????????long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { ????????this.zooKeeper = zooKeeper; ????????this.watcher = watcher; ????????this.sessionId = sessionId; ????????this.sessionPasswd = sessionPasswd; ????????this.sessionTimeout = sessionTimeout; ????????this.hostProvider = hostProvider; ????????this.chrootPath = chrootPath; ? ????????connectTimeout = sessionTimeout / hostProvider.size(); ????????readTimeout = sessionTimeout * 2 / 3; ????????readOnly = canBeReadOnly; ? ????????sendThread = new SendThread(clientCnxnSocket); ?--初始化sendThread ????????eventThread = new EventThread(); ???????????????--初始化eventThread ????????this.clientConfig=zooKeeper.getClientConfig(); ????} ? ????public void start() { --啟動兩個線程 ????????sendThread.start(); ????????eventThread.start(); ????} |
?
客戶端通過exists注冊監聽
| zookeeper.exists(“/mic”,true); //注冊監聽 |
通過exists方法來注冊監聽,代碼如下
| ????public Stat exists(final String path, Watcher watcher) ????????throws KeeperException, InterruptedException ????{ ????????final String clientPath = path; ????????PathUtils.validatePath(clientPath); ? ????????// the watch contains the un-chroot path ????????WatchRegistration wcb = null; ????????if (watcher != null) { ????????????wcb = new ExistsWatchRegistration(watcher, clientPath);?//構建ExistWatchRegistration ????????} ? ????????final String serverPath = prependChroot(clientPath); ? ????????RequestHeader h = new RequestHeader(); ????????h.setType(ZooDefs.OpCode.exists); ?//設置操作類型為exists ????????ExistsRequest request = new ExistsRequest();??//?構造ExistsRequest ????????request.setPath(serverPath); ????????request.setWatch(watcher != null);??//是否注冊監聽 ????????SetDataResponse response = new SetDataResponse(); ?//設置服務端響應的接收類 //將封裝的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到發送隊列 ????????ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);? ????????if (r.getErr() != 0) { ????????????if (r.getErr() == KeeperException.Code.NONODE.intValue()) { ????????????????return null; ????????????} ????????????throw KeeperException.create(KeeperException.Code.get(r.getErr()), ????????????????????clientPath); ????????} ????????//返回exists得到的結果(Stat信息) ????????return response.getStat().getCzxid() == -1 ? null : response.getStat(); ????} |
cnxn.submitRequest
| public ReplyHeader submitRequest(RequestHeader h, Record request, ????????????Record response, WatchRegistration watchRegistration, ????????????WatchDeregistration watchDeregistration) ????????????throws InterruptedException { ????????ReplyHeader r = new ReplyHeader(); ????????//將消息添加到隊列,并構造一個Packet傳輸對象 ????????Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); ????????synchronized (packet) { ????????????while (!packet.finished) {?//在數據包沒有處理完成之前,一直阻塞 ????????????????packet.wait(); ????????????} ????????} ????????return r; ????} |
?
| ????public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, ????????????Record response, AsyncCallback cb, String clientPath, ????????????String serverPath, Object ctx, WatchRegistration watchRegistration, ????????????WatchDeregistration watchDeregistration) { ????????//將相關傳輸對象轉化成Packet ????????Packet packet = null; ????????packet = new Packet(h, r, request, response, watchRegistration); ????????packet.cb = cb; ????????packet.ctx = ctx; ????????packet.clientPath = clientPath; ????????packet.serverPath = serverPath; ????????packet.watchDeregistration = watchDeregistration; ???????? ????????synchronized (state) { ????????????if (!state.isAlive() || closing) { ????????????????conLossPacket(packet); ????????????} else { ????????????????if (h.getType() == OpCode.closeSession) { ????????????????????closing = true; ????????????????} ????????????????outgoingQueue.add(packet);?//添加到outgoingQueue ????????????} ????????} ????????sendThread.getClientCnxnSocket().packetAdded();//此處是多路復用機制,喚醒Selector,告訴他有數據包添加過來了 ????????return packet; ????} |
在?ZooKeeper 中,Packet 是一個最小的通信協議單元,即數據包。Pakcet 用于進行客戶端與服務端之間的網絡傳輸,任何需要傳輸的對象都需要包裝成一個 Packet 對象。在 ClientCnxn 中?WatchRegistration?也會被封裝到 Pakcet 中,然后由 SendThread 線程調用queuePacket方法把 Packet 放入發送隊列中等待客戶端發送,這又是一個異步過程,分布式系統采用異步通信是一個非常常見的手段
SendThread的發送過程
在初始化連接的時候,zookeeper初始化了兩個線程并且啟動了。接下來我們來分析SendThread的發送過程,因為是一個線程,所以啟動的時候會調用SendThread.run方法
| public void run() { ????????????clientCnxnSocket.introduce(this, sessionId, outgoingQueue); ????????????clientCnxnSocket.updateNow(); ????????????clientCnxnSocket.updateLastSendAndHeard(); ????????????int to; ????????????long lastPingRwServer = Time.currentElapsedTime(); ????????????final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds ????????????while (state.isAlive()) { ????????????????try { ????????????????????if (!clientCnxnSocket.isConnected()) {// 如果沒有連接:發起連接 ????????????????????????// don't re-establish connection if we are closing ????????????????????????if (closing) { ????????????????????????????break; ????????????????????????} ????????????????????????startConnect();?//發起連接 ????????????????????????clientCnxnSocket.updateLastSendAndHeard(); ????????????????????} ? ????????????????????if (state.isConnected()) {?//如果是連接狀態,則處理sasl的認證授權 ????????????????????????// determine whether we need to send an AuthFailed event. ????????????????????????if (zooKeeperSaslClient != null) { ????????????????????????????boolean sendAuthEvent = false; ????????????????????????????if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { ????????????????????????????????try { ????????????????????????????????????zooKeeperSaslClient.initialize(ClientCnxn.this); ????????????????????????????????} catch (SaslException e) { ???????????????????????????????????LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); ????????????????????????????????????state = States.AUTH_FAILED; ????????????????????????????????????sendAuthEvent = true; ????????????????????????????????} ????????????????????????????} ????????????????????????????KeeperState authState = zooKeeperSaslClient.getKeeperState(); ????????????????????????????if (authState != null) { ????????????????????????????????if (authState == KeeperState.AuthFailed) { ????????????????????????????????????// An authentication error occurred during authentication with the Zookeeper Server. ????????????????????????????????????state = States.AUTH_FAILED; ????????????????????????????????????sendAuthEvent = true; ????????????????????????????????} else { ????????????????????????????????????if (authState == KeeperState.SaslAuthenticated) { ????????????????????????????????????????sendAuthEvent = true; ????????????????????????????????????} ????????????????????????????????} ????????????????????????????} ? ????????????????????????????if (sendAuthEvent == true) { ????????????????????????????????eventThread.queueEvent(new WatchedEvent( ??????????????????????????????????????Watcher.Event.EventType.None, ??????????????????????????????????????authState,null)); ????????????????????????????} ????????????????????????} ????????????????????????to = readTimeout - clientCnxnSocket.getIdleRecv(); ????????????????????} else { ????????????????????????to = connectTimeout - clientCnxnSocket.getIdleRecv(); ????????????????????} ????????????????????//to,表示客戶端距離timeout還剩多少時間,準備發起ping連接 ????????????????????if (to <= 0) {//表示已經超時了。 ????????????????????????String warnInfo; ????????????????????????warnInfo = "Client session timed out, have not heard from server in " ????????????????????????????+ clientCnxnSocket.getIdleRecv() ????????????????????????????+ "ms" ????????????????????????????+ " for sessionid 0x" ????????????????????????????+ Long.toHexString(sessionId); ????????????????????????LOG.warn(warnInfo); ????????????????????????throw new SessionTimeoutException(warnInfo); ????????????????????} ????????????????????if (state.isConnected()) { ????????????????????????//計算下一次ping請求的時間 ????????????????????????int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ???????????????????????? ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); ????????????????????????//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL ????????????????????????if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { ????????????????????????????sendPing();?//發送ping請求 ????????????????????????????clientCnxnSocket.updateLastSend(); ????????????????????????} else { ????????????????????????????if (timeToNextPing < to) { ????????????????????????????????to = timeToNextPing; ????????????????????????????} ????????????????????????} ????????????????????} ????????????????????// If we are in read-only mode, seek for read/write server ????????????????????if (state == States.CONNECTEDREADONLY) { ????????????????????????long now = Time.currentElapsedTime(); ????????????????????????int idlePingRwServer = (int) (now - lastPingRwServer); ????????????????????????if (idlePingRwServer >= pingRwTimeout) { ????????????????????????????lastPingRwServer = now; ????????????????????????????idlePingRwServer = 0; ????????????????????????????pingRwTimeout = ????????????????????????????????Math.min(2*pingRwTimeout, maxPingRwTimeout); ????????????????????????????pingRwServer(); ????????????????????????} ????????????????????????to = Math.min(to, pingRwTimeout - idlePingRwServer); ????????????????????} ????????????????????調用clientCnxnSocket,發起傳輸 ????????????????????其中 pendingQueue是一個用來存放已經發送、等待回應的Packet隊列, clientCnxnSocket默認使用ClientCnxnSocketNIO(ps:還記得在哪里初始化嗎?在實例化zookeeper的時候) ????????????????????clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); ????????????????} catch (Throwable e) { ????????????????????if (closing) { ????????????????????????if (LOG.isDebugEnabled()) { ????????????????????????????// closing so this is expected ????????????????????????????LOG.debug("An exception was thrown while closing send thread for session 0x" ????????????????????????????????????+ Long.toHexString(getSessionId()) ????????????????????????????????????+ " : " + e.getMessage()); ????????????????????????} ????????????????????????break; ????????????????????} else { ????????????????????????// this is ugly, you have a better way speak up ????????????????????????if (e instanceof SessionExpiredException) { ????????????????????????????LOG.info(e.getMessage() + ", closing socket connection"); ????????????????????????} else if (e instanceof SessionTimeoutException) { ????????????????????????????LOG.info(e.getMessage() + RETRY_CONN_MSG); ????????????????????????} else if (e instanceof EndOfStreamException) { ????????????????????????????LOG.info(e.getMessage() + RETRY_CONN_MSG); ????????????????????????} else if (e instanceof RWServerFoundException) { ????????????????????????????LOG.info(e.getMessage()); ????????????????????????} else { ????????????????????????????LOG.warn( ????????????????????????????????????"Session 0x" ????????????????????????????????????????????+ Long.toHexString(getSessionId()) ????????????????????????????????????????????+ " for server " ????????????????????????????????????????????+ clientCnxnSocket.getRemoteSocketAddress() ????????????????????????????????????????????+ ", unexpected error" ????????????????????????????????????????????+ RETRY_CONN_MSG, e); ????????????????????????} ????????????????????????// At this point, there might still be new packets appended to outgoingQueue. ????????????????????????// they will be handled in next connection or cleared up if closed. ????????????????????????cleanup(); ????????????????????????if (state.isAlive()) { ????????????????????????????eventThread.queueEvent(new WatchedEvent( ????????????????????????????????????Event.EventType.None, ????????????????????????????????????Event.KeeperState.Disconnected, ????????????????????????????????????null)); ????????????????????????} ????????????????????????clientCnxnSocket.updateNow(); ????????????????????????clientCnxnSocket.updateLastSendAndHeard(); ????????????????????} ????????????????} ????????????} ????????????synchronized (state) { ????????????????// When it comes to this point, it guarantees that later queued ????????????????// packet to outgoingQueue will be notified of death. ????????????????cleanup(); ????????????} ????????????clientCnxnSocket.close(); ????????????if (state.isAlive()) { ????????????????eventThread.queueEvent(new WatchedEvent(Event.EventType.None, ????????????????????????Event.KeeperState.Disconnected, null)); ????????????} ????????????ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), ????????????????????"SendThread exited loop for session: 0x" ???????????????????????????+ Long.toHexString(getSessionId())); ????????} |
client 和 server的網絡交互
| @Override ????void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { ????????try { ????????????if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) { ????????????????return; ????????????} ????????????Packet head = null; ????????????if (needSasl.get()) { ????????????????if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { ????????????????????return; ????????????????} ????????????} else { ? ????????????????//判斷outgoingQueue是否存在待發送的數據包,不存在則直接返回 ????????????????if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { ????????????????????return; ????????????????} ????????????} ????????????// check if being waken up on closing. ????????????if (!sendThread.getZkState().isAlive()) { ????????????????// adding back the patck to notify of failure in conLossPacket(). ????????????????addBack(head); ????????????????return; ????????????} ????????????// channel disconnection happened ????????????if (disconnected.get()) {?//異常流程,channel關閉了,講當前的packet添加到addBack中 ????????????????addBack(head); ????????????????throw new EndOfStreamException("channel for sessionid 0x" ????????????????????????+ Long.toHexString(sessionId) ????????????????????????+ " is lost"); ????????????} ????????????if (head != null) {?//如果當前存在需要發送的數據包,則調用doWrite方法,pendingQueue表示處于已經發送過等待響應的packet隊列 ????????????????doWrite(pendingQueue, head, cnxn); ????????????} ????????} finally { ????????????updateNow(); ????????} ????} |
DoWrite方法
| ????private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { ????????updateNow(); ????????while (true) { ????????????if (p != WakeupPacket.getInstance()) { ????????????????if ((p.requestHeader != null) &&?//判斷請求頭以及判斷當前請求類型不是ping或者auth操作 ????????????????????????(p.requestHeader.getType() != ZooDefs.OpCode.ping) && ????????????????????????(p.requestHeader.getType() != ZooDefs.OpCode.auth)) { ????????????????????p.requestHeader.setXid(cnxn.getXid()); ?//設置xid,這個xid用來區分請求類型 ????????????????????synchronized (pendingQueue) { ????????????????????????pendingQueue.add(p);?//將當前的packet添加到pendingQueue隊列中 ????????????????????} ????????????????} ????????????????sendPkt(p);?//將數據包發送出去 ????????????} ????????????if (outgoingQueue.isEmpty()) { ????????????????break; ????????????} ????????????p = outgoingQueue.remove(); ????????} ????} |
sendPkt
| ???private void sendPkt(Packet p) { ????????// Assuming the packet will be sent out successfully. Because if it fails, ????????// the channel will close and clean up queues. ????????p.createBB();?//序列化請求數據 ????????updateLastSend();?//更新最后一次發送updateLastSend ????????sentCount++;?//更新發送次數 ????????channel.write(ChannelBuffers.wrappedBuffer(p.bb));?//通過nio?channel發送字節緩存到服務端 ????} |
?
createBB
| public void createBB() { ????????????try { ????????????????ByteArrayOutputStream baos = new ByteArrayOutputStream(); ????????????????BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); ????????????????boa.writeInt(-1, "len"); // We'll fill this in later ????????????????if (requestHeader != null) { ????????????????????requestHeader.serialize(boa, "header"); //序列化header頭(requestHeader) ????????????????} ????????????????if (request instanceof ConnectRequest) { ????????????????????request.serialize(boa, "connect"); ????????????????????// append "am-I-allowed-to-be-readonly" flag ????????????????????boa.writeBool(readOnly, "readOnly"); ????????????????} else if (request != null) { ????????????????????request.serialize(boa, "request"); //序列化request(request) ????????????????} ????????????????baos.close(); ????????????????this.bb = ByteBuffer.wrap(baos.toByteArray()); ????????????????this.bb.putInt(this.bb.capacity() - 4); ????????????????this.bb.rewind(); ????????????} catch (IOException e) { ????????????????LOG.warn("Ignoring unexpected exception", e); ????????????} ????????} |
從createBB方法中,我們看到在底層實際的網絡傳輸序列化中,zookeeper只會講requestHeader和request兩個屬性進行序列化,即只有這兩個會被序列化到底層字節數組中去進行網絡傳輸,不會將watchRegistration相關的信息進行網絡傳輸。
總結
用戶調用exists注冊監聽以后,會做幾個事情
?
| cnxn = new ClientCnxn(connectStringParser.getChrootPath(), ????????????????hostProvider, sessionTimeout, this, watchManager, ????????????????getClientCnxnSocket(), canBeReadOnly); | private ClientCnxnSocket getClientCnxnSocket() throws IOException { ????????String clientCnxnSocketName = getClientConfig().getProperty( ????????????????ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); ????????if (clientCnxnSocketName == null) { ????????????clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); ????????} ????????try { ????????????Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class); ????????????ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig()); ????????????return clientCxnSocket; ????????} catch (Exception e) { ????????????IOException ioe = new IOException("Couldn't instantiate " ????????????????????+ clientCnxnSocketName); ????????????ioe.initCause(e); ????????????throw ioe; ????????} ????} |
4.基于第3步,最終會在ClientCnxnSocketNetty方法中執行sendPkt將請求的數據包發送到服務端
對Java技術,架構技術感興趣的同學,歡迎加QQ群619881427,一起學習,相互討論。
群內已經有小伙伴將知識體系整理好(源碼,筆記,PPT,學習視頻),歡迎加群免費領取。
分享給喜歡Java,喜歡編程,有夢想成為架構師的程序員們,希望能夠幫助到你們。
不是Java程序員也沒關系,幫忙轉發給更多朋友!謝謝。
分享一個小技巧點擊閱讀原文也可以輕松獲取到學習資料哦!!
轉載于:https://www.cnblogs.com/xueSpring/p/9379228.html
總結
以上是生活随笔為你收集整理的Zookeeper-watcher机制源码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux github中文官网,Git
- 下一篇: Akka-CQRS(16)- gRPC用