Zookeeper 客户端源码吐血总结
目錄
一、幾個重要的類
二、JAVA的基礎知識
三、大致了解
四 從入門到放棄的講解
Code1:ZK
Code2:創建 Zookeeper實例,實例化ClientCnxn,實例化ClientCnxnSocketNIO
Code3:實例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
Code4:ClientCnxn的具體實例化
Code5:SendThread的具體實例化
Code6:EventThread的具體實例化
Code 7:SendThread核心run流程
Code 8:startConnect()
Code 9: clientCnxnSocket.connect
Code10:registerAndConnect()
Code11:primeConnection()
Code 12:doTransport()
Code13:findSendablePacket()
Code14:IO write
Code15:createBB()
Code 16:IO read
Code 17: readResponse
Code 18:EventThread run:
源碼: Zookeeper 3.4.6.jar(吐血總結)
一、幾個重要的類
1) ZookeeperMain: main函數為入口,由zkCli.sh腳本調用啟動2) ZooKeeper:客戶端入口3) ZooKeeper.SendThread: IO線程4) ZooKeeper.EventThread: 事件處理線程,處理各類消息callback5) ClientCnxn: 客戶端與服務器端交互的主要類6) ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO, 利用JAVANIO7) Watcher: 用于監控Znode節點9) WatcherManager:用來管理Watcher,管理了ZK Client綁定的所有Watcher。二、JAVA的基礎知識
1)JAVA多線程 2)JAVANIO: 可參考:http://blog.csdn.net/cnh294141800/article/details/52996819 3)Socket編程(稍微了解即可) 4)JLine: 是一個用來處理控制臺輸入的Java類庫三、大致了解
上圖就是對Zookeeper源碼一個最好的解釋,
(1) Client端發送Request(封裝成Packet)請求到Zookeeper
(2) Zookeeper處理Request并將該請求放入Outgoing Queue(顧名思義,外出隊列,就是讓Zookeeper服務器處理的隊列),
(3) Zookeeper端處理Outgoing Queue,并將該事件移到Pending Queue中
(4) Zookeeper端消費Pending Queue,并調用finishPacket(),生成Event
(5) EventThread線程消費Event事件,并且處理Watcher.
四 從入門到放棄的講解
(1)應用 提供watch實例(new MyWatcher(null))
(2)實例化zookeeper
? 實例化socket,默認使用ClientCnxnSocketNIO
? 實例化ClientCnxn
? 實例化SendThread
? 實例化EventThread
Code1:ZK
Code2:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException{…watchManager.defaultWatcher = watcher; // 設置defaultWatcher 為 MyWatcherConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 解析-server 獲取 IP以及PORTHostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly); // 創建 ClientCnxn實例cnxn.start(); // 啟動cnxn中的SendThread and EventThread進程}Code3:實例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}Code4:ClientCnxn的具體實例化
/* 另一個ClientCnxn構造函數, 可見時sessionId=0
Code5:SendThread的具體實例化
SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING; // 將狀態設置為連接狀態(此時還未連接)this.clientCnxnSocket = clientCnxnSocket;setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true); //設為守護線程}Code6:EventThread的具體實例化
EventThread() {super(makeThreadName("-EventThread"));setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true);}至此所有對象實例化完成,然后啟動SendThread、EventThread進程
(3)啟動zookeeper
? 啟動SendThread
? 連接服務器
? 產生真正的socket,見ClientCnxnSocketNIO.createSock
? 向select注冊一個OP_CONNECT事件并連接服務器,由于是非阻塞連接,此時有可能并不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來注冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理
? 復位socket的incomingBuffer
? 連接成功后會產生一個connect型的請求發給服務,用于獲取本次連接的sessionid
? 進入循環等待來自應用的請求,如果沒有就根據時間來ping 服務器
? 啟動EventThread
開始進入無限循環,從隊列waitingEvents中獲取事件,如果沒有就阻塞等待
Code 7:SendThread核心run流程
可以對run進行抽象看待,流程如下
先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則直接使用;然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;最后關閉連接。
public void run() {while (state.isAlive()) { // this != CLOSED && this != AUTH_FAILED; 剛才設置了首次狀態為連接狀態try {//如果還沒連上,則啟動連接程序if (!clientCnxnSocket.isConnected()) { //所有的clientCnxnSocket都是clientCnxnSocketDIO實例//不是首次連接則休息1Sif(!isFirstConnect){ try {Thread.sleep(r.nextInt(1000));} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// don't re-establish connection if we are closingif (closing || !state.isAlive()) {break;}startConnect();// 啟動連接clientCnxnSocket.updateLastSendAndHeard(); //更新Socket最后一次發送以及聽到消息的時間}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {...... }// 下一次超時時間to = readTimeout - clientCnxnSocket.getIdleRecv();} else {// 如果還沒連接上 重置當前剩余可連接時間to = connectTimeout - clientCnxnSocket.getIdleRecv();}// 連接超時if (to <= 0) {}// 判斷是否 需要發送Ping心跳包if (state.isConnected()) {sendPing();}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {}// The most important step. Do real IOclientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);} catch (Throwable e) {}}cleanup();...} }Code 8:startConnect()
// 具體實際連接部分
Code 9: clientCnxnSocket.connect
void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock(); // 創建一個非阻塞空SocketChanneltry {registerAndConnect(sock, addr); //注冊并且連接sock到辣個addr } catch (IOException e) {….}initialized = false;/* Reset incomingBuffer*/lenBuffer.clear();incomingBuffer = lenBuffer;} }Code10:registerAndConnect()
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //將socket注冊到selector中boolean immediateConnect = sock.connect(addr); //socket連接服務器if (immediateConnect) {sendThread.primeConnection(); //初始化連接事件} }Code11:primeConnection()
void primeConnection() IOException {LOG.info("Socket connection established to "+ clientCnxnSocket.getRemoteSocketAddress()+ ", initiating session");isFirstConnect = false; // 設置為非首次連接long sessId = (seenRwServerBefore) ? sessionId : 0; // 客戶端默認sessionid為0// 創建連接request lastZxid 代表最新一次的節點ZXIDConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd); // 線程安全占用outgoing synchronized (outgoingQueue) {…//組合成通訊層的Packet對象,添加到發送隊列,對于ConnectRequest其requestHeader為null outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));}//確保讀寫事件都監聽,也就是設置成可讀可寫clientCnxnSocket.enableReadWriteOnly();if (LOG.isDebugEnabled()) {LOG.debug("Session establishment request sent on "+ clientCnxnSocket.getRemoteSocketAddress());}}至此Channelsocket已經成功連接,并且已將連接請求做為隊列放到Outgoing中。此時,需要再回頭看Code7, 也就是一直在循環的SendThread部分。可以看到連接部分成功完成,接下來需要做doTransport()。// CnxnClientSocketNio
Code 12:doTransport()
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {//selectselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());//如果之前連接沒有立馬連上,則在這里處理OP_CONNECT事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}} //如果讀寫就位,則處理之else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {//找到連接Packet并且將他放到隊列頭if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// 將要Channecl設置為可讀enableWrite();}}}selected.clear(); }Code13:findSendablePacket()
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,boolean clientTunneledAuthenticationInProgress) {synchronized (outgoingQueue) {..// Since client's authentication with server is in progress,// send only the null-header packet queued by primeConnection().// This packet must be sent so that the SASL authentication process// can proceed, but all other packets should wait until// SASL authentication completes.//因為Conn Packet需要發送到SASL authentication進行處理,其他Packet都需要等待直到該處理完成,//Conn Packet必須第一個處理,所以找出它并且把它放到OutgoingQueue頭,也就是requestheader=null的辣個ListIterator<Packet> iter = outgoingQueue.listIterator();while (iter.hasNext()) {Packet p = iter.next();if (p.requestHeader == null) {// We've found the priming-packet. Move it to the beginning of the queue.iter.remove(); outgoingQueue.add(0, p); // 將連接放到outgogingQueue第一個return p;} else {// Non-priming packet: defer it until later, leaving it in the queue// until authentication completes.if (LOG.isDebugEnabled()) {LOG.debug("deferring non-priming packet: " + p +"until SASL authentication completes.");}}}// no sendable packet found.return null;} }然后就是最重要的IO部分:
? 需要處理兩類網絡事件(讀、寫)
Code14:IO write
if (sockKey.isWritable()) {synchronized(outgoingQueue) {// 獲得packetPacket p = findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress());if (p != null) {updateLastSend();// If we already started writing p, p.bb will already existif (p.bb == null) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != OpCode.ping) &&(p.requestHeader.getType() != OpCode.auth)) {//如果不是 連接事件,不是ping 事件,不是 認證時間 p.requestHeader.setXid(cnxn.getXid());}// 序列化p.createBB();}//將數據寫入Channelsock.write(p.bb);// p.bb中如果沒有內容 則表示發送成功if (!p.bb.hasRemaining()) {//發送數+1sentCount++;//將該P從隊列中移除outgoingQueue.removeFirstOccurrence(p);//如果該事件不是連接事件,不是ping事件,不是認證事件, 則將他加入pending隊列中if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {// No more packets to send: turn off write interest flag.// Will be turned on later by a later call to enableWrite(),// from within ZooKeeperSaslClient (if client is configured// to attempt SASL authentication), or in either doIO() or// in doTransport() if not.disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {// On initial connection, write the complete connect request// packet, but then disable further writes until after// receiving a successful connection response. If the// session is expired, then the server sends the expiration// response and immediately closes its end of the socket. If// the client is simultaneously writing on its end, then the// TCP stack may choose to abort with RST, in which case the// client would never receive the session expired event. See// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.htmldisableWrite();} else {// Just in caseenableWrite();}}}Code15:createBB()
public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in later// 如果不是連接事件則設置協議頭if (requestHeader != null) {requestHeader.serialize(boa, "header");}//設置協議體if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();//生成ByteBufferthis.bb = ByteBuffer.wrap(baos.toByteArray());//將bytebuffer的前4個字節修改成真正的長度,總長度減去一個int的長度頭 this.bb.putInt(this.bb.capacity() - 4);//準備給后續讀 讓buffer position = 0this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}Code 16:IO read
if (sockKey.isReadable()) {//先從Channel讀4個字節,代表頭 int rc = sock.read(incomingBuffer);if (rc < 0) {throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId)+ ", likely server has closed socket");}if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount++;readLength();} //初始化else if (!initialized) {readConnectResult(); // 讀取連接結果enableRead(); // Channel 可讀if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// Since SASL authentication has completed (if client is configured to do so),// outgoing packets waiting in the outgoingQueue can now be sent.enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else { // 處理其他請求sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}還有一個比較關鍵的函數就是readResponse函數,用來消費PendingQueue,處理的消息分為三類
? ping 消息 XID=-2
? auth認證消息 XID=-4
? 訂閱的消息,即各種變化的通知,比如子節點變化、節點內容變化,由服務器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊列
XID=-1
Code 17: readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}//將事件加入到 event隊列中eventThread.queueEvent( we );return;}結束了IO之后就是對于事件的消費,也就是一開始圖示的右半部分也是接近最后部分啦
Code 18:EventThread run:
public void run() {try {isRunning = true;while (true) {// 獲取事件Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//處理事件processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");}} }最后就是processEvent了,這個就不貼代碼了(代碼備注的累死了),寫思路。
ProcessEvent:
processEvent 是 EventThread 處理事件核心函數,核心邏輯如下:
1、如果 event instanceof WatcherSetEventPair ,取出 pair 中的 Watchers ,逐個調用 watcher.process(pair.event)
2、否則 event 為 AsyncCallback ,根據 p.response 判斷為哪種響應類型,執行響應的回調 processResult 。
Watcher 和 AsyncCallback 的區別
Watcher: Watcher 是用于監聽節點,session 狀態的,比如 getData 對數據節點 a 設置了 watcher ,那么當 a 的數據內容發生改變時,客戶端會收到 NodeDataChanged 通知,然后進行 watcher 的回調。
AsyncCallback : AsyncCallback 是在以異步方式使用 ZooKeeper API 時,用于處理返回結果的。例如:getData 同步調用的版本是: byte[] getData(String path, boolean watch,Stat stat) ,異步調用的版本是: void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx) ,可以看到,前者是直接返回獲取的結果,后者是通過 AsyncCallback 回調處理結果的。
**接下來就是客戶端發送指令與負責端進行交互比如:
Ls、getChildren、getData等**
參考文獻:
[1] http://www.cnblogs.com/davidwang456/p/5000927.html
[2] http://www.verydemo.com/demo_c89_i33659.html
[3] http://blog.csdn.net/pwlazy/article/details/8000566
[4] http://www.cnblogs.com/ggjucheng/p/3376548.html
[5] http://zookeeper.apache.org/doc/r3.3.6/api/index.html
[6] http://www.tuicool.com/articles/i6vMVze
[7]http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/
總結
以上是生活随笔為你收集整理的Zookeeper 客户端源码吐血总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Zookeeper选举算法( FastL
- 下一篇: 进程与线程的超级简单形象解释