hbase RPCServer源码分析
前置知識: java,nio,多線程
看了幾天的源碼,寫一些自己心得,若有錯誤請指出。
RPCServer的作用:負責創建listener,reader,responser,handler來處理client端的請求。
RPCServer中重要的子類有:Listener,Reader,Call,Connection,Responser
? ? 其中Reader是Listener的子類
listener負責監聽client端的請求,主要做nio操作中的accept操作。
while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException ignored) {}key = null; }
?
與client創建連接,生成新的channel,并將新的channel注冊在reader上。
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {……SocketChannel channel;……Reader reader = getReader();try {reader.startAdd();SelectionKey readKey = reader.registerChannel(channel); //listener接受的連接注冊在reader上c = getConnection(channel, System.currentTimeMillis());readKey.attach(c);…… }
?
reader負責處理listener傳過來的channel,依次讀取數據,
void doRead(SelectionKey key) throws InterruptedException {int count = 0;Connection c = (Connection)key.attachment();……try {count = c.readAndProcess();} catch (InterruptedException ieo) {…… }
?
這里調用Connection里面的readAndProcess()方法,這個方法的做用是讀取客戶端的數據,存入一個buffer字節數組中,給processRequest()方法進行處理,
processRequest方法:
protected void processRequest(byte[] buf) throws IOException, InterruptedException {……//這里的call構造方法中的參數都是由buf中的數據解析出來的,前面省略的代碼做了這部分工作Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,totalRequestSize,traceInfo);//這里的scheduler是一個調度器,可以簡單理解為一個線程池的控制器,它初始化時會生成默認大小的線程池,參數可由REGION_SERVER_HANDLER_COUNT來指定//也就是jstack文件中的handler線程,默認是30//dispatch方法會獲取線程池中的一個線程,執行callRunner中的run()方法。run()方法的功能有:查詢結果,并調用sendResponseIfReady()方法來返回數據。scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider)); }
?
call的run()方法:
public void run() {……//查詢數據,存在resultPair中resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,……if (!call.isDelayed() || !call.isReturnValueDelayed()) {Message param = resultPair != null ? resultPair.getFirst() : null;CellScanner cells = resultPair != null ? resultPair.getSecond() : null;call.setResponse(param, cells, errorThrowable, error);}//調用Responser call.sendResponseIfReady();…… }
?
其中rpcServer的call方法為:
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException {……//此句進行查詢Message result = service.callBlockingMethod(md, controller, param);……//返回給Call對象return new Pair<Message, CellScanner>(result,controller != null? controller.cellScanner(): null); …… }
?
再詳細點的還沒看。看了這些主要解決了以下幾個疑惑:
?
?
reader的線程數在哪指定生成,handler的線程池在哪維護,監聽連接請求的線程有幾個?responser的線程又有幾個?
listener只有一個,
listener中有一個Reader數組,默認是10,也就是說讀取請求數據的連接池大小為10。
private class Listener extends Thread { ……private Reader[] readers = null;
?
handler的線程池由RPCServer中的scheduler維護,默認是30,
listener監聽到一個請求后,生成對應的channel發送給Reader,然后Reader會為每一個channel創建一個connection,
connection中保存了連接的信息。然后調用connection的方法來讀取請求參數,并生成call對象,這時將調用scheduler,
使用handler線程池(默認30)來查詢數據,(這里就開始并行了),結果存在call對象用,call對象最后再調用responser類的方法,將結果返回給client。
responser只有一個線程,它維護了一個call鏈表,采用非阻塞的方式(這里要說也是并行),依次將call對象送出。
大致過程就是這樣
?
轉載于:https://www.cnblogs.com/quan-qunar/p/4942972.html
總結
以上是生活随笔為你收集整理的hbase RPCServer源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。