Java并发编程高级篇(十):分离任务的执行和结果的处理
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
在之前的例子中,我們使用執(zhí)行器框架都是在主類中提交任務(wù),等待任務(wù)執(zhí)行完畢后再去處理任務(wù)執(zhí)行的結(jié)果。接下來我們打算將任務(wù)的提交和結(jié)果的處理都放置到線程中去執(zhí)行。在每個任務(wù)內(nèi)部提交自己到執(zhí)行器,然后通過一個統(tǒng)一的結(jié)果處理線程來處理所有任務(wù)執(zhí)行的結(jié)果。
為了解決這個問題,執(zhí)行器框架為我們提供了一個CompletionService類,任務(wù)執(zhí)行線程和結(jié)果處理線程能夠共享這個類,結(jié)果處理線程便可以在這里渠道已經(jīng)執(zhí)行完畢的任務(wù)的結(jié)果。CompletionService類的內(nèi)部也是通過一個ExecutorService來提交任務(wù)的。
首先,創(chuàng)建任務(wù)線程,實現(xiàn)Callable接口。模擬報表生成過程。
/*** 模擬生成報告** Created by hadoop on 2016/11/3.*/ public class ReportGenerator implements Callable<String> {private String sender;private String title;public ReportGenerator(String sender, String title) {this.sender = sender;this.title = title;}@Overridepublic String call() throws Exception {long duration = (long)(Math.random() * 10);System.out.printf("ReportGenerator: Generator report %s_%s duration %d seconds.\n", sender, title, duration);TimeUnit.SECONDS.sleep(duration);return sender + "_" + title;} }然后我們創(chuàng)建任務(wù)提交線程,這個線程的構(gòu)造方法接受兩個參數(shù),分別是報表名稱和CompletionService對象。將報表生成任務(wù)提交到CompletionService去執(zhí)行。
import java.util.concurrent.CompletionService;/*** 模擬請求獲取報告** Created by hadoop on 2016/11/3.*/ public class ReportRequest implements Runnable {private String name;private CompletionService<String> service;public ReportRequest(String name, CompletionService<String> service) {this.name = name;this.service = service;}@Overridepublic void run() {ReportGenerator generator = new ReportGenerator(name, "Report");service.submit(generator);} }下面我們創(chuàng)建任務(wù)結(jié)果處理類,來打印生成的報表。這個類同樣會拿到CompletionService的引用,然后循環(huán)調(diào)用CompletionService.poll()方法來從任務(wù)結(jié)果隊列中獲取執(zhí)行的結(jié)果,這個方法接受一個時間參數(shù),如果當前結(jié)果隊列為空,那么則等待這個時間,超時返回null。不帶參數(shù)的poll()方法,如果對別為空則直接返回null。
/*** 處理報表結(jié)果** Created by hadoop on 2016/11/3.*/ public class ReportProcessor implements Runnable {private boolean end;private CompletionService<String> service;public ReportProcessor(boolean end, CompletionService<String> service) {this.end = end;this.service = service;}@Overridepublic void run() {while (!end) {try {Future<String> future = service.poll(20, TimeUnit.SECONDS);if (future != null) {System.out.printf("ReportReceiver: received %s\n", future.get());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}public void setEnd(boolean end) {this.end = end;} }最后我們創(chuàng)建主方法類。在這里我們創(chuàng)建ExecutorServer并把它賦值給ExecutorCompletionService。之后創(chuàng)建兩個報表請求任務(wù)和一個報表處理任務(wù),同時持有ExecutorCompletionService的引用。
import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;/*** 在執(zhí)行器中分離任務(wù)執(zhí)行和結(jié)果處理** 我們?nèi)绾翁幚碓谝粋€對象里發(fā)送任務(wù)給執(zhí)行器,在另一個對象里處理任務(wù)執(zhí)行結(jié)果。* 對于這種情況Java API提供了CompletionService類** CompletionService使用Executor對象類執(zhí)行任務(wù)。* 優(yōu)勢在于:可以共享CompletionService。* 缺點在于:CompletionService獲取的Future對象只能是已經(jīng)執(zhí)行完畢的任務(wù),他沒有辦法控制任務(wù)狀態(tài),只能處理任務(wù)結(jié)果。** 我們創(chuàng)建了一個ExecutorService,然后使用這個ExecutorService來初始化一個ExecutorCompletionService<String>(executor)。** 首先創(chuàng)建了兩個ReportRequest任務(wù),然后在任務(wù)內(nèi)部使用service.submit(generator)方法調(diào)用報表生成任務(wù)。** 然后再ReportProcessor中不斷調(diào)用service.poll(20, TimeUnit.SECONDS);方法獲取已經(jīng)執(zhí)行完的結(jié)果,如果當前沒有結(jié)果那么等待20秒。** CompletionService還提供了兩個人方法:* poll():如果沒有任何Future直接返回null。* take():如若任務(wù)隊列中沒有Future那么阻塞知道有可用的Future。** Created by hadoop on 2016/11/3.*/ public class Main {public static void main(String[] args) {ExecutorService executor = Executors.newCachedThreadPool();CompletionService<String> service = new ExecutorCompletionService<String>(executor);ReportRequest request1 = new ReportRequest("Face", service);ReportRequest request2 = new ReportRequest("Online", service);ReportProcessor processor = new ReportProcessor(false, service);Thread thread1 = new Thread(request1);Thread thread2 = new Thread(request2);Thread thread3 = new Thread(processor);thread1.start();thread2.start();thread3.start();try {thread1.join();thread2.join();} catch (InterruptedException e) {e.printStackTrace();}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}processor.setEnd(true);} }控制臺中,我們可以看到兩個任務(wù)的提交信息和結(jié)果處理信息。
ReportGenerator: Generator report Online_Report duration 4 seconds. ReportGenerator: Generator report Face_Report duration 1 seconds. ReportReceiver: received Face_Report ReportReceiver: received Online_Report轉(zhuǎn)載于:https://my.oschina.net/nenusoul/blog/849165
總結(jié)
以上是生活随笔為你收集整理的Java并发编程高级篇(十):分离任务的执行和结果的处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Hadoop Summit Tokyo
- 下一篇: inode与ln命令