java并发编程学习5--forkJoin
【概念
分支和并框架的目的是以遞歸的方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體的結果,它是ExecutorService的一個實現(xiàn),它把子任務分配給線程池(ForkJoinPool)中的工作線程。某些應用可能對每個處理器內核飯別試用一個線程,來完成計算密集任務,例如圖像處理。java7引入forkjoin框架,專門用來支持這一類應用。假設有一個處理任務,它可以很自然的分解成子任務。
【使用方式
要把任務提交到線程池,必須創(chuàng)建RecursiveTask<R>的一個子類,其中R是并行化任務產生的結果(如果沒有結果使用RecursiveAction類型)。然后在子類中實現(xiàn)product abstract R compute()方法即可。這個方法同時實現(xiàn)了“拆分子任務”,“任務不可拆時”的處理邏輯。如下所示:
if(任務足夠小){順序計算該任務的值; }else{將任務分成兩個子任務;遞歸調用本方法;合并每個子任務的結果; }【最佳實踐
【工作竊取
我們很難確定要滿足什么條件才可以不再拆分任務。但是分出大量的小任務是一個好的選擇,因為在理想情況下,劃分并行任務時因該讓每個任務都花費相同的時間。讓cpu的所有內核都一樣的繁忙,但是現(xiàn)實中我們的子任務花費的時間大不相同,這是因為有許多我們無法確認的情況出現(xiàn):io,rpc,分配效率等等。分支合并框架使用:工作竊取來解決內核之間任務不匹配的問題。讓所有任務大致相同的被平均分配到forkjoinpool的每個線程上。每個線程都擁有一個雙向鏈式隊列來保存它的任務,每完成一個任務就從隊列頭部取出下一個任務執(zhí)行。當一個線程的任務隊列已空,而其他線程還在繁忙,這個空閑線程就隨機選擇一個繁忙線程并從其隊列尾部拿走一個任務開始執(zhí)行,直到所有的任務執(zhí)行完畢。
【例子
1.輸出數(shù)組中有多少個數(shù)字小于0.5
public class ExerciseFilter {//數(shù)據(jù)源static double numbers[] = new double[100];static {for(int i = 0 ; i < 100 ; i++){numbers[i] = i + 1;}numbers[0] = 0.08;numbers[1] = 0.18;numbers[11] = 0.18;}public static void main(String[] args) {Counter counter = new Counter(numbers,x -> x < 0.5);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動并行任務pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class Counter extends RecursiveTask<Integer>{//分界線,當一個數(shù)組的長度 < 1000 就不再繼續(xù)拆分public static final int THRESHOLD = 1000;//數(shù)組private double[] values;//判斷條件private DoublePredicate filter;public Counter(double [] values,DoublePredicate filter){this.values = values;this.filter = filter;}@Overrideprotected Integer compute() {//任務足夠小就不再拆分if(values.length < THRESHOLD ){//返回該數(shù)組中有多少數(shù)字滿足判斷邏輯int count = 0;for(int i = 0; i < values.length ; i++){if(filter.test(values[i])){count++;}}return count;}else {//將打數(shù)組拆分成兩個int mid = values.length / 2;Counter first = new Counter(Arrays.copyOfRange(values,0,mid),filter);//第一個子任務提交到線程池first.fork();Counter second = new Counter(Arrays.copyOfRange(values,mid,values.length),filter);//當前線程執(zhí)行第二個子任務,節(jié)約一個線程的開銷int secondResult = second.compute();//等待第一個子任務執(zhí)行完畢int firstResult = first.join();return firstResult + secondResult;}} }2.列表中求和
public class ExerciseSum {//數(shù)據(jù)源static int sum[] = new int[100];static {for(int i = 0 ; i < 100 ; i++){sum[i] = i + 1;}}public static void main(String[] args) {CounterSum counter = new CounterSum(sum);ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class CounterSum extends RecursiveTask<Integer> {//最小拆分單位:每個小數(shù)組length = 10public static final int THRESHOLD = 10;private int[] values;public CounterSum(int [] values){this.values = values;}@Overrideprotected Integer compute() {if(values.length < THRESHOLD ){int count = 0;for(int i = 0; i < values.length ; i++){count += values[i];}return count;}else {int mid = values.length / 2;CounterSum first = new CounterSum(Arrays.copyOfRange(values,0,mid));first.fork();CounterSum second = new CounterSum(Arrays.copyOfRange(values,mid,values.length));int secondResult = second.compute();int firstResult = first.join();return firstResult + secondResult;}} }3.排序
public class ExerciseSort {//數(shù)據(jù)源static int num[] = new int[100];static {Random r = new Random();for(int i = 0 ; i < 100 ; i++){num[i] = r.nextInt(100);}}public static void main(String[] args) {CounterSort counter = new CounterSort(num);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動并行任務pool.invoke(counter);System.out.println((System.currentTimeMillis() - st));Arrays.stream(counter.join()).forEach(System.out::println);} }class CounterSort extends RecursiveTask<int[]> {//最小拆分單位:每個小數(shù)組length = 10public static final int THRESHOLD = 10;//待排序數(shù)組private int[] values;public CounterSort(int [] values){this.values = values;}@Overrideprotected int[] compute() {if(values.length < THRESHOLD ){int[] result = new int[values.length];//1.單數(shù)組排序result = Arrays.stream(values).sorted().toArray();return result;}else {int mid = values.length / 2;CounterSort first = new CounterSort(Arrays.copyOfRange(values,0,mid));first.fork();CounterSort second = new CounterSort(Arrays.copyOfRange(values,mid,values.length));int[] secondResult = second.compute();int[] firstResult = first.join();//兩個數(shù)組混合排序int[] combineRsult = combineIntArray(firstResult,secondResult);return combineRsult;}}private int[] combineIntArray(int[] a1,int[] a2){int result[] = new int[a1.length + a2.length];int a1Len = a1.length;int a2Len = a2.length;int destLen = result.length;for(int index = 0 , a1Index = 0 , a2Index = 0 ; index < destLen ; index++) {int value1 = a1Index >= a1Len?Integer.MAX_VALUE:a1[a1Index];int value2 = a2Index >= a2Len?Integer.MAX_VALUE:a2[a2Index];if(value1 < value2) {a1Index++;result[index] = value1;}else {a2Index++;result[index] = value2;}}return result;} }總結
以上是生活随笔為你收集整理的java并发编程学习5--forkJoin的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HMAC-SHA1加密
- 下一篇: linux中shell变量$#,$@,$