分支合并 Fork-Join 框架
一、什么是 Fork-Join
Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架,這種開發(fā)方法也叫分治編程。分治編程可以極大地利用CPU資源,提高任務(wù)執(zhí)行的效率,也是目前與多線程有關(guān)的前沿技術(shù)。
框架圖:
- fork():利用另一個(gè) ForkJoinPool 線程異步執(zhí)行新創(chuàng)建的子任務(wù)
- join():讀取第一個(gè)子任務(wù)的結(jié)果,尚未完成就等待
二、傳統(tǒng)的分治編程會(huì)遇到什么問題
分治的原理上面已經(jīng)介紹了,就是切割大任務(wù)成小任務(wù)來完成??雌饋砗孟褚膊浑y實(shí)現(xiàn)啊!為什么專門弄一個(gè)新的框架呢?
我們先看一下,在不使用 Fork-Join 框架時(shí),使用普通的線程池是怎么實(shí)現(xiàn)的。
看起來一切都很美好。真的嗎?別忘了, 每一個(gè)切割任務(wù)的線程(如線程A)都被阻塞了,直到其子任務(wù)完成,才能繼續(xù)往下運(yùn)行 。如果任務(wù)太大了,需要切割多次,那么就會(huì)有多個(gè)線程被阻塞,性能將會(huì)急速下降。更糟糕的是,如果你的線程池的線程數(shù)量是有上限的,極可能會(huì)造成池中所有線程被阻塞,線程池?zé)o法執(zhí)行任務(wù)。
三、普通線程池實(shí)現(xiàn)分治時(shí)阻塞的問題
public class NormalThreadPoolDivideAndConquer {//固定大小的線程池,池中線程數(shù)量為3static ExecutorService fixPoolExecutors = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException, ExecutionException {//計(jì)算 1+2+...+10 的結(jié)果CountTaskCallable task = new CountTaskCallable(1,10);//提交主人翁Future<Integer> future = fixPoolExecutors.submit(task);System.out.println("計(jì)算的結(jié)果:"+future.get());} } class CountTaskCallable implements Callable<Integer> {//設(shè)置閥值為2private static final int THRESHOLD = 2;private int start;private int end;public CountTaskCallable(int start, int end) {super();this.start = start;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;//判斷任務(wù)的大小是否超過閥值,也即是兩個(gè)相加的數(shù)的差值不能大于2,在這里意味著需要分為大于4個(gè)子任務(wù)進(jìn)行計(jì)算,而線程池只有3個(gè),機(jī)會(huì)造成阻塞boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {System.out.println("切割的任務(wù):"+start+"加到"+end+" 執(zhí)行此任務(wù)的線程是 "+Thread.currentThread().getName());int middle = (start + end) / 2;CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);// 將子任務(wù)提交到線程池中Future<Integer> leftFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(leftTaskCallable);Future<Integer> rightFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(rightTaskCallable);//阻塞等待子任務(wù)的執(zhí)行結(jié)果int leftResult = leftFuture.get();int rightResult = rightFuture.get();// 合并子任務(wù)的執(zhí)行結(jié)果sum = leftResult + rightResult;}return sum;} }- 運(yùn)行結(jié)果:
池的線程只有三個(gè),當(dāng)任務(wù)分割了三次后,池中的線程也就都被阻塞了,無法再執(zhí)行任何任務(wù),一直卡著動(dòng)不了。為了解決這個(gè)問題,工作竊取算法呼之欲出
四、工作竊取算法
針對(duì)上面的問題,Fork-Join 框架使用了“工作竊取(work-stealing)”算法。工作竊取(work-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。在《Java 并發(fā)編程的藝術(shù)》對(duì)工作竊取算法的解釋:
使用工作竊取算法有什么優(yōu)勢(shì)呢?
- 假如我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng),于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。
Fork-Join 框架中的工作竊取算法的優(yōu)點(diǎn)可以總結(jié)為以下兩點(diǎn):
五、Fork-Join 框架的使用介紹
- ForkJoinPool: 執(zhí)行任務(wù)的線程池,繼承了 AbstractExecutorService 類。
- ForkJoinWorkerThread:執(zhí)行任務(wù)的工作線程(即ForkJoinPool線程池里的線程)。每個(gè)線程都維護(hù)著一個(gè)內(nèi)部隊(duì)列,用于存放“內(nèi)部任務(wù)”。繼承了 Thread類。
- ForkJoinTask: 一個(gè)用于ForkJoinPool的任務(wù)抽象類。實(shí)現(xiàn)了 Future 接口
因?yàn)镕orkJoinTask比較復(fù)雜,抽象方法比較多,日常使用時(shí)一般不會(huì)繼承ForkJoinTask來實(shí)現(xiàn)自定義的任務(wù),而是繼承ForkJoinTask的兩個(gè)子類,實(shí)現(xiàn) compute() 方法:
- RecursiveTask: 子任務(wù)帶返回結(jié)果時(shí)使用
- RecursiveAction: 子任務(wù)不帶返回結(jié)果時(shí)使用
compute 方法的實(shí)現(xiàn)模式一般是:
if 任務(wù)足夠小直接返回結(jié)果 else分割成N個(gè)子任務(wù)依次調(diào)用每個(gè)子任務(wù)的fork方法執(zhí)行子任務(wù)依次調(diào)用每個(gè)子任務(wù)的join方法合并執(zhí)行結(jié)果六、Fork-Join 例子演示
- 計(jì)算 1+2+…+12 的結(jié)果。
使用Fork/Join框架首先要考慮到的是如何分割任務(wù),如果我們希望每個(gè)子任務(wù)最多執(zhí)行兩個(gè)數(shù)的相加,那么我們?cè)O(shè)置分割的閾值是2,由于是12個(gè)數(shù)字相加。同時(shí),觀察執(zhí)行任務(wù)的線程名稱,理解工作竊取算法的實(shí)現(xiàn)。
public class CountTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool forkJoinPool = new ForkJoinPool();//創(chuàng)建一個(gè)計(jì)算任務(wù),計(jì)算 由1加到12CountTask countTask = new CountTask(1, 12);Future<Integer> future = forkJoinPool.submit(countTask);System.out.println("最終的計(jì)算結(jié)果:" + future.get());} }class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= THRESHOLD;//任務(wù)已經(jīng)足夠小,可以直接計(jì)算,并返回結(jié)果if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("執(zhí)行計(jì)算任務(wù),計(jì)算 " + start + "到 " + end + "的和 ,結(jié)果是:" + sum + " 執(zhí)行此任務(wù)的線程:" + Thread.currentThread().getName());} else { //任務(wù)過大,需要切割System.out.println("任務(wù)過大,切割的任務(wù): " + start + "加到 " + end + "的和 執(zhí)行此任務(wù)的線程:" + Thread.currentThread().getName());int middle = (start + end) / 2;//切割成兩個(gè)子任務(wù)CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);//執(zhí)行子任務(wù)leftTask.fork();rightTask.fork();//等待子任務(wù)的完成,并獲取執(zhí)行結(jié)果int leftResult = leftTask.join();int rightResult = rightTask.join();//合并子任務(wù)sum = leftResult + rightResult;}return sum;} }- 運(yùn)行結(jié)果:
從結(jié)果可以看出:
提交的計(jì)算任務(wù)是由線程1執(zhí)行,線程1進(jìn)行了第一次切割,切割成兩個(gè)子任務(wù) “7加到12“ 和
”1加到6“,并提交這兩個(gè)子任務(wù)。然后這兩個(gè)任務(wù)便被 線程2、線程3 給竊取了。線程1 的內(nèi)部隊(duì)列中已經(jīng)沒有任務(wù)了,這時(shí)候,線程2、線程3
也分別進(jìn)行了一次任務(wù)切割并各自提交了兩個(gè)子任務(wù),于是線程1也去竊取任務(wù)(這里竊取的都是線程2的子任務(wù))。
- RecursiveAction 演示
遍歷指定目錄(含子目錄)找尋指定類型文件
參考文章
參考文章
總結(jié)
以上是生活随笔為你收集整理的分支合并 Fork-Join 框架的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 产品经理,如何降噪学习?
- 下一篇: 5G零售行业应用白皮书