Java Master-Worker模式实现
Master-Worker模式簡介
Master-Worker模式是非常經典的常用的一個并行計算模式,它的核心思想是2類進程協作工作:Master進程和Worker進程。Master負責接收客戶端請求,分配任務;Worker負責具體處理任務。當各個Worker處理完任務后,統一將結果返回給Master,由Master進行整理和總結。其好處是能夠將一個大JOB分解成若干小JOB,并行執行,從而提高系統的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在這種Master-Worker工作模式;離線分布式計算框架Hadoop的JobTracker和TaskTracker,實時流計算框架Strom的Nimbus和Supervisor都涉及到這種思想。那么下面我們來具體分析下Java Master-Worker模式的實現。
Master-Worker模式分析
我們重點分析下Master,Worker這2個角色。
Master
Master需要接受Client端提交過來的任務Task,而且還得將Task分配給Worker進行處理,因此Master需要一個存儲來存放Task。那么采用哪種存儲集合呢?首先來說,需要支持并發的集合類,因為多個Worker間可能存在任務競爭,因此我們需要考慮java.util.concurrent包下的集合。這里可以考慮采用非阻塞的ConcurrentLinkedQueue。
Master需要清楚的知道各個Woker的基本信息,如是否各個Worker都運行完畢,因此Master端需要保存Worker的信息,可以采用Map存儲。
由于最后各個Worker都會上報運行結果,Master端需要有一個存儲結果的Map,可以采用支持并發的ConcurrentHashMap。
Worker
Worker需要持有Master端的任務Task集合的引用,因為Worker需要從里面拿取Task。
同上,Worker需要持有Master端的存儲結果的引用。
綜上,我們可以得到如下:
我們可以進一步細化,Master/Worker應該提供什么操作?
Master:
通過構造方法以初始化workers
應該提供submit(Task)方法接受Client端提交過來的任務
start()讓workers開始處理任務
提供isComplete()判斷各個worker的狀態,是否都處理完畢
提供getResult()給客戶端返回結果
Worker:
Worker本質上就是Runnable,提供run()
負責處理業務邏輯的handle()
Java Master-Worker代碼實現
Task
public?class?Task?{private?long?id;private?String?name;public?Task(long?id,?String?name)?{this.id?=?id;this.name?=?name;}public?long?getId()?{return?id;}public?void?setId(long?id)?{this.id?=?id;}public?String?getName()?{return?name;}public?void?setName(String?name)?{this.name?=?name;}}Worker
public?class?Worker?implements?Runnable?{private?long?id;private?String?name;private?ConcurrentLinkedQueue<Task>?workQueue;private?ConcurrentHashMap<Long,Object>?results;public?void?setWorkQueue(ConcurrentLinkedQueue<Task>?workQueue)?{this.workQueue?=?workQueue;}public?void?setResults(ConcurrentHashMap<Long,?Object>?results)?{this.results?=?results;}public?Worker(long?id,?String?name)?{this.id?=?id;this.name?=?name;}@Overridepublic?void?run()?{while(true){Task?task?=?workQueue.poll();if(task?==?null){break;}long?start?=?System.currentTimeMillis();long?result?=?handle(task);this.results.put(task.getId(),result);System.out.println(this.name?+?"?handle?"?+?task.getName()?+?"?success?.?result?is?"?+?result?+?"?cost?time?:?"?+?(System.currentTimeMillis()?-?start));}}/***?負責處理具體業務邏輯*?@param?task*?@return*/private?long?handle(Task?task)?{//這里只是模擬下,在真實環境也許是查詢數據庫,也許是查緩存等try?{Thread.sleep(500);}?catch?(InterruptedException?e)?{e.printStackTrace();}return?new?Random().nextLong();} }Master
public?class?Master?{private?ConcurrentLinkedQueue<Task>?workQueue?=?new?ConcurrentLinkedQueue<Task>();private?Map<Long,Thread>?workers?=?new?HashMap<Long,?Thread>();private?ConcurrentHashMap<Long,Object>?results?=?new?ConcurrentHashMap<Long,?Object>();public?Master(int?num){for(int?i?=?0?;?i?<?num?;?i++){Worker?worker?=?new?Worker(i,"worker-"?+?i);worker.setResults(results);worker.setWorkQueue(workQueue);workers.put(Long.valueOf(i),new?Thread(worker));}}public?void?submit(Task?task){workQueue.add(task);}public?void?start(){for?(Map.Entry<Long,Thread>?entry?:?workers.entrySet()){entry.getValue().start();}}public?boolean?isComlepte(){for(Map.Entry<Long,Thread>?entry?:?workers.entrySet()){if(entry.getValue().getState()?!=?Thread.State.TERMINATED){return?false;}}return?true;}public?long?getSumResult(){long?value?=?0;for(Map.Entry<Long,Object>?entry?:?results.entrySet()){value?=?value?+?(Long)entry.getValue();}return?value;} }Main
public?class?Main?{public?static?void?main(String[]?args)?{Master?master?=?new?Master(10);for(int?i?=?0?;?i?<?10?;?i++){Task?task?=?new?Task(i,"task-"?+?i);master.submit(task);}long?start?=?System.currentTimeMillis();master.start();while(true){if(master.isComlepte()){System.out.println("sum?result?is?"?+?master.getSumResult()?+?"?.?cost?time?:?"?+?(System.currentTimeMillis()?-?start));break;}}}}運行結果
總結
在單線程的時候,處理一個Task需要500ms,那么處理10個Task需要5S,如果采用Master-Worker這種并行模型,可以大大縮短計算處理時間。
轉載于:https://blog.51cto.com/zhangfengzhe/1879323
總結
以上是生活随笔為你收集整理的Java Master-Worker模式实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一次竞赛案例的分享——基于正则表达式的深
- 下一篇: 使用ajax预加载图片