Mina2.0框架源码剖析(八)
生活随笔
收集整理的這篇文章主要介紹了
Mina2.0框架源码剖析(八)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
這篇來看看AbstractPollingIoConnector抽象類,它用于用于實現客戶端連接的輪詢策略。處理邏輯基本上和上一篇文章說的AbstractPollingIoAcceptor類似,它繼承自AbstractIoConnector,兩個泛型參數分別是所處理的會話和客戶端socket連接。底層的sockets會被不斷檢測,并當有任何一個socket需要被處理時就會被喚醒去處理。這個類封裝了客戶端socket的bind,connect和dispose等動作,其成員變量Executor用于發起連接請求,另一個AbstractPollingIoProcessor用于處理已經連接客戶端的I/O操作請求,如讀寫和關閉連接。
其最重要的幾個成員變量是:
private?final?Queue<ConnectionRequest>?connectQueue?=?new?ConcurrentLinkedQueue<ConnectionRequest>();//連接隊列private?final?Queue<ConnectionRequest>?cancelQueue?=?new?ConcurrentLinkedQueue<ConnectionRequest>();//?取消連接隊列
先來看看當服務端調用connect后的處理過程:
????protected?final?ConnectFuture?connect0(????????????SocketAddress?remoteAddress,?SocketAddress?localAddress,
????????????IoSessionInitializer<??extends?ConnectFuture>?sessionInitializer)?{
????????H?handle?=?null;
????????boolean?success?=?false;
????????try?{
????????????handle?=?newHandle(localAddress);
????????????if?(connect(handle,?remoteAddress))?{//若已經連接服務器成功
????????????????ConnectFuture?future?=?new?DefaultConnectFuture();
????????????????T?session?=?newSession(processor,?handle);//創建新會話
????????????????finishSessionInitialization(session,?future,?sessionInitializer);//結束會話初始化
????????????????session.getProcessor().add(session);//將剩下的處理交給IoProcessor
????????????????success?=?true;
????????????????return?future;
????????????}
????????????success?=?true;
????????}?catch?(Exception?e)?{
????????????return?DefaultConnectFuture.newFailedFuture(e);
????????}?finally?{
????????????if?(!success?&&?handle?!=?null)?{
????????????????try?{
????????????????????close(handle);
????????????????}?catch?(Exception?e)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????}
????????????}
????????}
????????ConnectionRequest?request?=?new?ConnectionRequest(handle,?sessionInitializer);
????????connectQueue.add(request);//連接請求加入連接隊列中
????????startupWorker();//開啟工作線程處理連接請求
????????wakeup();//中斷select操作
????????return?request;
????}
?????真正的負責處理客戶端請求的工作都是Worker線程完成的,
private?class?Worker?implements?Runnable?{????????public?void?run()?{
????????????int?nHandles?=?0;
????????????while?(selectable)?{
????????????????try?{
??????????????????????int?timeout?=?(int)Math.min(getConnectTimeoutMillis(),?1000L);//等待超時時間
????????????????????boolean?selected?=?select(timeout);//在超時時限內查看是否有可以被處理的選擇鍵(狀態
????????????????????nHandles?+=?registerNew();//取出連接隊列隊頭的連接請求,將其注冊一個用于連接的新的客戶端socket,?并把它加入連接輪詢池中
????????????????????if?(selected)?{
????????????????????????nHandles-=?processSessions(selectedHandles());//處理連接請求
????????????????????}
????????????????????processTimedOutSessions(allHandles());//處理超時連接請求
????????????????????nHandles?-=?cancelKeys();
????????????????????if?(nHandles?==?0)?{
????????????????????????synchronized?(lock)?{
????????????????????????????if?(connectQueue.isEmpty())?{
????????????????????????????????worker?=?null;
????????????????????????????????break;
????????????????????????????}
????????????????????????}
????????????????????}
????????????????}?catch?(Throwable?e)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????????try?{
????????????????????????Thread.sleep(1000);
????????????????????}?catch?(InterruptedException?e1)?{
????????????????????????ExceptionMonitor.getInstance().exceptionCaught(e1);
????????????????????}
????????????????}
????????????}
????????????if?(selectable?&&?isDisposing())?{
????????????????selectable?=?false;
????????????????try?{
????????????????????if?(createdProcessor)?{
????????????????????????processor.dispose();
????????????????????}
????????????????}?finally?{
????????????????????try?{
????????????????????????synchronized?(disposalLock)?{
????????????????????????????if?(isDisposing())?{
????????????????????????????????destroy();
????????????????????????????}
????????????????????????}
????????????????????}?catch?(Exception?e)?{
????????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????????}?finally?{
????????????????????????disposalFuture.setDone();
????????????????????}
????????????????}
????????????}
????????}
????}
private?int?registerNew()?{
????????int?nHandles?=?0;
????????for?(;?;)?{
????????????ConnectionRequest?req?=?connectQueue.poll();//取連接隊列隊頭請求
????????????if?(req?==?null)?{
????????????????break;
????????????}
????????????H?handle?=?req.handle;
????????????try?{
????????????????register(handle,?req);//注冊一個用于連接的新的客戶端socket,?并把它加入連接輪詢池中
????????????????nHandles?++;
????????????}?catch?(Exception?e)?{
????????????????req.setException(e);
????????????????try?{
????????????????????close(handle);
????????????????}?catch?(Exception?e2)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e2);
????????????????}
????????????}
????????}
????????return?nHandles;
????}
private?int?processSessions(Iterator<H>?handlers)?{//處理連接請求
????????int?nHandles?=?0;
????????while?(handlers.hasNext())?{
????????????H?handle?=?handlers.next();
????????????handlers.remove();
????????????ConnectionRequest?entry?=?connectionRequest(handle);
????????????boolean?success?=?false;
????????????try?{
????????????????if?(finishConnect(handle))?{//連接請求成功完成,創建一個新會話
????????????????????T?session?=?newSession(processor,?handle);
????????????????????finishSessionInitialization(session,?entry,?entry.getSessionInitializer());//結束會話初始化
????????????????????session.getProcessor().add(session);//將剩下的工作交給IoProcessor去處理
????????????????????nHandles?++;
????????????????}
????????????????success?=?true;
????????????}?catch?(Throwable?e)?{
????????????????entry.setException(e);
????????????}?finally?{
????????????????if?(!success)?{//若連接失敗,則將此連接請求放到取消連接隊列中
????????????????????cancelQueue.offer(entry);
????????????????}
????????????}
????????}
????????return?nHandles;
????}
private?void?processTimedOutSessions(Iterator<H>?handles)?{//處理超時的連接請求
????????long?currentTime?=?System.currentTimeMillis();//當前時間
????????while?(handles.hasNext())?{
????????????H?handle?=?handles.next();
????????????ConnectionRequest?entry?=?connectionRequest(handle);
????????????if?(currentTime?>=?entry.deadline)?{//當前時間已經超出了連接請求的底限
????????????????entry.setException(
????????????????????????new?ConnectException("Connection?timed?out."));
????????????????cancelQueue.offer(entry);//將此連接請求放入取消連接隊列中
????????????}
????????}
????}
private?int?cancelKeys()?{//把取消隊列中的連接請求給cancel掉
????????int?nHandles?=?0;
????????for?(;?;)?{
????????????ConnectionRequest?req?=?cancelQueue.poll();
????????????if?(req?==?null)?{
????????????????break;
????????????}
????????????H?handle?=?req.handle;
????????????try?{
????????????????close(handle);//關閉對應的客戶端socket
????????????}?catch?(Exception?e)?{
????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????}?finally?{
????????????????nHandles?++;
????????????}
????????}
????????return?nHandles;
????}
?
作者:phinecos(洞庭散人)
出處:http://phinecos.cnblogs.com/
本文版權歸作者和博客園共有,歡迎轉載,但請保留此段聲明,并在文章頁面明顯位置給出原文連接。
總結
以上是生活随笔為你收集整理的Mina2.0框架源码剖析(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Lotus Notes应用经验谈
- 下一篇: 累计值