java高并发(十六)J.U.C之ForkJoin
ForkJoin框架是Java7提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
與MapReduce思想非常類似。從字面意思上看,Fork就是把一個(gè)大任務(wù)切割成若干個(gè)子任務(wù)并行執(zhí)行,Join就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到大任務(wù)的結(jié)果。主要采用工作竊取算法。
? ? 工作竊取算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行。下面是工作竊取的流程圖:
為什么要使用工作竊取算法?
? ? 加入我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割成若干個(gè)互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng),于是把這些子任務(wù)分別放到不同的隊(duì)列里,為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里面的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)。比如A線程負(fù)責(zé)處理A隊(duì)列里面的任務(wù),但是有些線程會(huì)先把自己的隊(duì)列里面的任務(wù)干完,而其他線程還有對(duì)應(yīng)的任務(wù)等待處理,干完活的線程就會(huì)幫其他線程干活,于是就會(huì)去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行,這時(shí)他們會(huì)訪問(wèn)同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程與被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。這種優(yōu)點(diǎn)就是充分利用線程進(jìn)行并行計(jì)算,減少了線程間的競(jìng)爭(zhēng),缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng)(比如在雙端隊(duì)列只有一個(gè)任務(wù)時(shí)),同時(shí)也消耗了更多的系統(tǒng)資源(比如創(chuàng)建了多個(gè)線程和多個(gè)雙端隊(duì)列)。
? ? 對(duì)于ForkJoin框架而言,當(dāng)一個(gè)任務(wù)正在等待他使用ForkJoin操作創(chuàng)建的子任務(wù)結(jié)束時(shí),執(zhí)行這個(gè)任務(wù)的工作線程查找其他未被執(zhí)行的任務(wù),并開(kāi)始他的執(zhí)行,通過(guò)這種方式,線程充分利用他的運(yùn)行時(shí)間來(lái)提高應(yīng)用程序的性能,為了實(shí)現(xiàn)這個(gè)目標(biāo)ForkJoin框架執(zhí)行的任務(wù)有一些局限性。
局限性
- 任務(wù)只能使用Fork和Join操作來(lái)作為同步機(jī)制,如果使用了其他同步機(jī)制,那他們?cè)谕讲僮鲿r(shí)工作線程就不能執(zhí)行其他任務(wù)了。比如在forkjoin框架中使任務(wù)進(jìn)入sleep,在睡眠期間內(nèi)正在執(zhí)行這個(gè)任務(wù)的工作線程將不會(huì)執(zhí)行其他任務(wù)了。
- 我們所拆分的任務(wù)不應(yīng)該去執(zhí)行IO操作。例如讀寫(xiě)數(shù)據(jù)文件
- 任務(wù)不能拋出檢查異常。必須通過(guò)必要的代碼來(lái)處理他們
ForkJoin框架的核心是兩個(gè)類:ForkJoinPool和ForkJoinTask。ForkJoinPool負(fù)責(zé)做實(shí)現(xiàn)(包括工作竊取算法)他管理工作線程提供任務(wù)的狀態(tài)以及他們的執(zhí)行信息。而ForkJoinTask則主要提供在任務(wù)中執(zhí)行Fork和Join操作的機(jī)制。
@Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任務(wù)足夠小就計(jì)算任務(wù)boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 執(zhí)行子任務(wù)leftTask.fork();rightTask.fork();//等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任務(wù)sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();//生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4...+100ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100);Future<Integer> result = forkJoinPool.submit(taskExample);try{log.info("result:{}", result.get());} catch (InterruptedException e) {log.error("exception", e);e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}} }?
總結(jié)
以上是生活随笔為你收集整理的java高并发(十六)J.U.C之ForkJoin的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java高并发(十五)J.U.C之Fut
- 下一篇: java高并发(十七)J.U.C之Blo