《Java8实战》笔记(07):并行数据处理与性能
并行數據處理與性能
在Java 7之前,并行處理數據集合非常麻煩。
- 第一,你得明確地把包含數據的數據結構分成若干子部分。
- 第二,你要給每個子部分分配一個獨立的線程。
- 第三,你需要在恰當的時候對它們進行同步來避免不希望出現的競爭條件,等待所有線程完成,最后把這些部分結果合并起來。
Java 7引入了一個叫作分支/合并的框架,讓這些操作更穩定、更不易出錯。
并行流
ParallelStreams
可以通過對收集源調用Stream接口parallelStream方法來把集合轉換為并行流。并行流就是一個把內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流。
假設你需要寫一個方法,接受數字n作為參數,并返回從1到給定參數的所有數字的和。
public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); }用更為傳統的Java術語來說,這段代碼與下面的迭代等價:
public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result; }- 這似乎是利用并行處理的好機會,特別是n很大的時候。那怎么入手呢?
- 你要對結果變量進行同步嗎?
- 用多少個線程呢?
- 誰負責生成數呢?
- 誰來做加法呢?
用并行流的話,這問題就簡單多了!
將順序流轉換為并行流
public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }Stream在內部分成了幾塊。因此可以對不同的塊獨立并行進行歸納操作。最后,同一個歸納操作會將各個子流的部分歸納結果合并起來,得到整個原始流的歸納結果。
類似地,你只需要對并行流調用sequential方法就可以把它變成順序流。請注意,你可能以為把這兩個方法結合起來,就可以更細化地控制在遍歷流時哪些操作要并行執行,哪些要順序執行。
例如
stream.parallel().filter(...).sequential().map(...).parallel().reduce();但最后一次parallel或sequential調用會影響整個流水線。在本例中,流水線會并行執行,因為最后調用的是它。
并行流內部使用了默認的ForkJoinPool,它默認的
線程數量就是你的處理器數量, 這個值是由Runtime.getRuntime().availableProcessors()得到的。
但是你可以通過系統屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");一般而言,讓ForkJoinPool的大小等于處理器數量是個不錯的默認值,除非你有很好的理由,否則我們強烈建議不要修改它。
測量流性能
ParallelStreamsHarness
測量對前n個自然數求和的函數的性能
public long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) fastest = duration;}return fastest; }測試代碼:
System.out.println("Sequential sum done in:" + measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs\\n");System.out.println("Parallel Sum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs\\n" );運行結果:
Sequential Sum done in: 311 msecs\nIterative Sum done in: 17 msecs\nParallel Sum done in: 11641 msecs\nParallel Sum的運行結果相當令人失望,求和方法的并行版本比順序版本要慢很多。
你如何解釋這個意外的結果呢?這里實際上有兩個問題:
- iterate生成的是裝箱的對象,必須拆箱成數字才能求和;
- 我們很難把iterate分成多個獨立塊來并行執行。
具體來說,iterate很難分割成能夠獨立執行的小塊,因為每次應用這個函數都要依賴前一次應用的結果
使用更有針對性的方法
一個叫LongStream.rangeClosed的方法。這個方法與iterate相比有兩個優點。
- LongStream.rangeClosed直接產生原始類型的long數字,沒有裝箱拆箱的開銷。
- LongStream.rangeClosed會生成數字范圍,很容易拆分為獨立的小塊。例如,范圍120可分為15、610、1115和16~20。
測試代碼:
System.out.println("Range Sum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs\\n"); System.out.println("Parallel range Sum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs\\n" );運行結果:
Range Sum done in: 38 msecs\n Parallel range Sum done in: 20 msecs\n使用正確的數據結構然后使其并行工作能夠保證最佳的性能。
請記住,并行化并不是沒有代價的。并行化過程本身需要對流做遞歸劃分,把每個子流的歸納操作分配到不同的線程,然后把這些操作的結果合并成一個值。
但在多個內核之間移動數據的代價也可能比你想的要大,所以很重要的一點是要保證在內核中并行執行工作的時間比在內核之間傳輸數據的時間長。
總而言之,很多情況下不可能或不方便并行化。然而,在使用并行Stream加速代碼之前,你必須確保用得對;如果結果錯了,算得快就毫無意義了。
正確使用并行流
錯用并行流而產生錯誤的首要原因,就是使用的算法改變了某些共享狀態。
下面是另一種實現對前n個自然數求和的方法,但這會改變一個共享累加器:
public static long sideEffectSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).forEach(accumulator::add);return accumulator.total; }public static class Accumulator {private long total = 0;public void add(long value) {total += value;} }測試代碼:
System.out.println("SideEffect Sequential sum done in: " + measurePerf(ParallelStreams::sideEffectSum, 10_000_000L) + " msecs\\n" );運行結果:
SideEffect Sequential sum done in: 15 msecs\n改用同步:
public static long sideEffectParallelSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);return accumulator.total; }測試代碼:
System.out.println("SideEffect prallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs\\n" );運行結果:
Result: 40982006038773 Result: 31276474800658 Result: 24968119902981 Result: 33235695815058 Result: 38464302195225 Result: 34974467550362 Result: 35961255749290 Result: 50000005000000 Result: 50000005000000 Result: 50000005000000 SideEffect prallel sum done in: 8 msecs\n部分結果離正確值50000005000000差很遠。
這是由于多個線程在同時訪問累加器,執行total += value,而這
一句雖然看似簡單,卻不是一個原子操作。問題的根源在于,forEach中調用的方法有副作用,它會改變多個線程共享的對象的可變狀態。
共享可變狀態會影響并行流以及并行計算
現在,記住要避免共享可變狀態,確保并行Stream得到正確的結果
高效使用并行流建議
一般而言,想給出任何關于什么時候該用并行流的定量建議都是不可能也毫無意義的,因為任何類似于“僅當至少有一千個(或一百萬個或隨便什么數字)元素的時候才用并行流)”的建議對于某臺特定機器上的某個特定操作可能是對的,但在略有差異的另一種情況下可能就是大錯特錯。盡管如此,我們至少可以提出一些定性意見,幫你決定某個特定情況下是否有必要使用并行流。
-
如果有疑問,測量。把順序流轉成并行流輕而易舉,但卻不一定是好事。并行流并不總是比順序流快。此外,并行流有時候會和你的直覺不一致,所以在考慮選擇順序流還是并行流時,第一個也是最重要的建議就是用適當的基準來檢查其性能。
-
留意裝箱。自動裝箱和拆箱操作會大大降低性能。Java 8中有原始類型流(IntStream、LongStream、DoubleStream)來避免這種操作,但凡有可能都應該用這些流。
-
有些操作本身在并行流上的性能就比順序流差。特別是limit和findFirst等依賴于元素順序的操作,它們在并行流上執行的代價非常大。例如findAny會比findFirst性能好,因為它不一定要按順序來執行。你總是可以調用unordered方法來把有序流變成無序流。那么,如果你需要流中的n個元素而不是專門要前n個的話,對無序并行流調用limit可能會比單個有序流(比如數據源是一個List)更高效。
-
還要考慮流的操作流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素通過流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味著使用并行流時性能好的可能性比較大。
-
對于較小的數據量,選擇并行流幾乎從來都不是一個好的決定。并行處理少數幾個元素的好處還抵不上并行化造成的額外開銷。
-
要考慮流背后的數據結構是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因為前者用不著遍歷就可以平均拆分,而后者則必須遍歷。另外,用range工廠方法創建的原始類型流也可以快速分解。
流的數據源和可分解性
| ArrayList | 極佳 |
| LinkedList | 差 |
| IntStream.range | 極佳 |
| Stream.iterate | 差 |
| HashSet | 好 |
| TreeSet | 好 |
-
流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。例如,一個SIZED流可以分成大小相等的兩部分,這樣每個部分都可以比較高效地并行處理,但篩選操作可能丟棄的元素個數卻無法預測,導致流本身的大小未知。
-
還要考慮終端操作中合并步驟的代價是大是小(例如Collector中的combiner方法)。如果這一步代價很大,那么組合每個子流產生的部分結果所付出的代價就可能會超出通過并行流得到的性能提升。
并行流背后使用的基礎架構是Java 7中引入的分支/合并框架。
分支/合并框架
分支/合并框架的目的是以遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(稱為ForkJoinPool)中的工作線程。首先來看看如何定義任務和子任務。
使用RecursiveTask
要把任務提交到這個池,必須創建RecursiveTask的一個子類,其中R是并行化任務(以及所有子任務)產生的結果類型,或者如果任務不返回結果,則是RecursiveAction類型(當然它可能會更新其他非局部機構)。
要定義RecursiveTask,只需實現它唯一的抽象方法compute:
protected abstract R compute();這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。正由于此,這個方法的實現類似于下面的偽代碼:
if (任務足夠小或不可分) {順序計算該任務 } else {將任務分成兩個子任務遞歸調用本方法,拆分每個子任務,等待所有子任務完成合并每個子任務的結果 }ForkJoinSumCalculator
請注意在實際應用時,使用多個ForkJoinPool是沒有什么意義的。正是出于這個原因,一般來說把它實例化一次,然后把實例保存在靜態字段中,使之成為單例,這樣就可以在軟件中任何部分方便地重用了。這里創建時用了其默認的無參數構造函數,這意味著想讓線程池使用JVM能夠使用的所有處理器。
更確切地說,該構造函數將使用Runtime.availableProcessors的返回值來決定線程池使用的線程數。請注意availableProcessors方法雖然看起來是處理器,但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。
運行ForkJoinSumCalculator
當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個線程執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,如果不夠小則會把要求和的數組分成兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。
因此,這一過程可以遞歸重復,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小于等于10 000)。這時會順序計算每個任務的結果,然后由分支過程創建的(隱含的)任務二叉樹遍歷回到它的根。接下來會合并每個子任務的部分結果,從而得到總任務的結果。
調用ForkJoin
public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task); }測試代碼:
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs\\n" );運行結果:
ForkJoin sum done in: 142 msecs\n這個性能看起來比用并行流的版本要差,但這只是因為必須先要把整個數字流都放進一個long[],之后才能在ForkJoinSumCalculator任務中使用它。
使用分支/合并框架的最佳做法
雖然分支/合并框架還算簡單易用,不幸的是它也很容易被誤用。以下是幾個有效使用它的最佳做法。
-
對一個任務調用join方法會阻塞調用方,直到該任務做出結果。因此,有必要在兩個子任務的計算都開始之后再調用它。否則,你得到的版本會比原始的順序算法更慢更復雜,因為每個子任務都必須等待另一個子任務完成才能啟動。
-
不應該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啟動并行計算。
-
對子任務調用fork方法可以把它排進ForkJoinPool。同時對左邊和右邊的子任務調用它似乎很自然,但這樣做的效率要比直接對其中一個調用compute低。這樣做你可以為其中一個子任務重用同一線程,從而避免在線程池中多分配一個任務造成的開銷。
-
調試使用分支/合并框架的并行計算可能有點棘手。特別是你平常都在你喜歡的IDE里面看棧跟蹤(stack trace)來找問題,但放在分支?合并計算上就不行了,因為調用compute的線程并不是概念上的調用方,后者是調用fork的那個。
-
和并行流一樣,你不應理所當然地認為在多核處理器上使用分支/合并框架就比順序計算快。我們已經說過,一個任務可以分解成多個獨立的子任務,才能讓性能在并行化時有所提升。所有這些子任務的運行時間都應該比分出新任務所花的時間長;一個慣用方法是把輸入/輸出放在一個子任務里,計算放在另一個里,這樣計算就可以和輸入/輸出同時進行。此外,在比較同一算法的順序和并行版本的性能時還有別的因素要考慮。就像任何其他Java代碼一樣,分支/合并框架需要“預熱”或者說要執行幾遍才會被JIT編譯器優化。這就是為什么在測量性能之前跑幾遍程序很重要,我們的測試框架就是這么做的。同時還要知道,編譯器內置的優化可能會為順序版本帶來一些優勢(例如執行死碼分析——刪去從未被使用的計算)。
對于分支/合并拆分策略還有最后一點補充:你必須選擇一個標準,來決定任務是要進一步拆分還是已小到可以順序求值。
工作竊取
在ForkJoinSumCalculator的例子中,我們決定在要求和的數組中最多包含10 000個項目時就不再創建子任務了。這個選擇是很隨意的,但大多數情況下也很難找到一個好的啟發式方法來確定它,只能試幾個不同的值來嘗試優化它。
在我們的測試案例中,我們先用了一個有1000萬項目的數組,意味著ForkJoinSumCalculator至少會分出1000個子任務來。這似乎有點浪費資源,因為我們用來運行它的機器上只有四個內核。在這個特定例子中可能確實是這樣,因為所有的任務都受CPU約束,預計所花的時間也差不多。
但分出大量的小任務一般來說都是一個好的選擇。這是因為,理想情況下,劃分并行任務時,應該讓每個任務都用完全相同的時間完成,讓所有的CPU內核都同樣繁忙。不幸的是,實際中,每個子任務所花的時間可能天差地別,要么是因為劃分策略效率低,要么是有不可預知的原因,比如磁盤訪問慢,或是需要和外部服務協調執行。
分支/合并框架工程用一種稱為工作竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味著這些任務差不多被平均分配到ForkJoinPool中的所有線程上。每個線程都為分配給它的任務保存一個雙向鏈式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執
行。
基于前面所述的原因,某個線程可能早早完成了分配給它的所有任務,也就是它的隊列已經空了,而其他的線程還很忙。這時,這個線程并沒有閑下來,而是隨機選了一個別的線程,從隊列的尾巴上“偷走”一個任務。這個過程一直繼續下去,直到所有的任務都執行完畢,所有的隊列都清空。這就是為什么要劃成許多小任務而不是少數幾個大任務,這有助于更好地在工作線程
之間平衡負載。
一般來說,這種工作竊取算法用于在池中的工作線程之間重新分配和平衡任務。下圖展示了這個過程。當工作線程隊列中有一個任務被分成兩個子任務時,一個子任務就被閑置的工作線程“偷走”了。如前所述,這個過程可以不斷遞歸,直到規定子任務應順序執行的條件為真。
Spliterator
Spliterator是Java 8中加入的另一個新接口;這個名字代表“可分迭代器”(splitable iterator)。和Iterator一樣,Spliterator也用于遍歷數據源中的元素,但它是為了并行執行而設計的。雖然在實踐中可能用不著自己開發Spliterator,但了解一下它的實現方式會讓你對并行流的工作原理有更深入的了解。Java 8已經為集合框架中包含的所有數據結構提供了一個默認的Spliterator實現
public interface Spliterator<T> {boolean tryAdvance(Consumer<? super T> action);Spliterator<T> trySplit();long estimateSize();int characteristics(); }-
T是Spliterator遍歷的元素的類型。tryAdvance方法的行為類似于普通的Iterator,因為它會按順序一個一個使用Spliterator中的元素,并且如果還有其他元素要遍
歷就返回true。 -
trySplit是專為Spliterator接口設計的,因為它可以把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個并行處理。
-
Spliterator還可通過estimateSize方法估計還剩下多少元素要遍歷,因為即使不那么確切,能快速算出來是一個值
也有助于讓拆分均勻一點。
拆分過程
將Stream拆分成多個部分的算法是一個遞歸過程,如下圖:
Spliterator的特性
Spliterator接口聲明的最后一個抽象方法是characteristics,它將返回一個int,代表Spliterator本身特性集的編碼。使用Spliterator的客戶可以用這些特性來更好地控制和優化它的使用。(不幸的是,雖然它們在概念上與收集器的特性有重疊,
編碼卻不一樣。)
| ORDERED | 元素有既定的順序(例如List),因此Spliterator在遍歷和劃分時也會遵循這一順序 |
| DISTINCT | 對于任意一對遍歷過的元素x和y,x.equals(y)返回false |
| SORTED | 遍歷的元素按照一個預定義的順序排序 |
| SIZED | 該Spliterator由一個已知大小的源建立(例如Set),因此estimatedSize()返回的是準確值 |
| NONNULL | 保證遍歷的元素不會為null |
| IMMUTABLE | Spliterator的數據源不能修改。這意味著在遍歷時不能添加、刪除或修改任何元素 |
| CONCURRENT | 該Spliterator的數據源可以被其他線程同時修改而無需同步 |
| SUBSIZED | 該Spliterator和所有從它拆分出來的Spliterator都是SIZED |
實現你自己的Spliterator
WordCount
開發一個簡單的方法來數數一個String中的單詞數。
public int countWordsIteratively(String s) {int counter = 0;boolean lastSpace = true;for (char c : s.toCharArray()) {if (Character.isWhitespace(c)) {lastSpace = true;} else {if (lastSpace) counter++;lastSpace = false;}}return counter; }final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita ";System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");運行結果
Found 19 words以函數式風格重寫單詞計數器
首先你需要把String轉換成一個流。不幸的是,原始類型的流僅限于int、long和double,所以你只能用Stream:
Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt); ```java你可以對這個流做歸約來計算字數。在歸約流時,你得保留由兩個變量組成的狀態:一個int用來計算到目前為止數過的字數,還有一個boolean用來記得上一個遇到的Character是不是空格。所以你必須創建一個新類WordCounter來把這個狀態封裝起來[WordCount#WordCounter](WordCount.java)```java private int countWords(Stream<Character> stream) {WordCounter wordCounter = stream.reduce(new WordCounter(0, true),WordCounter::accumulate,WordCounter::combine);return wordCounter.getCounter(); }Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);System.out.println("Found " + countWords(stream) + " words");運行結果:
Found 19 words讓WordCounter并行工作
嘗試用并行流來加快字數統計,如下所示:
System.out.println("Found " + countWords(stream.parallel()) + " words");不幸的是,這次的輸出是:
Found 25 words問題的根源在于原始的String在任意位置拆分,所以有時一個詞會被分為兩個詞,然后數了兩次。
這就說明,拆分流會影響結果,而把順序流換成并行流就可能使結果出錯。
如何解決這個問題呢?解決方案就是要確保String不是在隨機位置拆開的,而只能在詞尾拆開。要做到這一點,你必須為Character實現一個Spliterator,它只能在兩個詞之間拆開String,然后由此創建并行流。
WordCount#WordCounterSpliterator
快速回顧一下實現了Spliterator 接口的WordCounterSpliterator中的各個函數。
-
tryAdvance方法把String中當前位置的Character傳給了Consumer,并讓位置加一。作為參數傳遞的Consumer是一個Java內部類,在遍歷流時將要處理的Character傳給了一系列要對其執行的函數。這里只有一個歸約函數,即WordCounter類的accumulate方法。如果新的指針位置小于String的總長,且還有要遍歷的Character,則tryAdvance返回true。
-
trySplit方法是Spliterator中最重要的一個方法,因為它定義了拆分要遍歷的數據結構的邏輯。就像在ForkJoinSumCalculator中實現的RecursiveTask的compute方法一樣(分支/合并框架的使用方式),首先要設定不再進一步拆分的下限。這里用了一個非常低的下限——10個Character,僅僅是為了保證程序會對那個比較短的String做幾次拆分。在實際應用中,就像分支/合并的例子那樣,你肯定要用更高的下限來避免生成太多的任務。如果剩余的Character數量低于下限,你就返回null表示無需進一步拆分。相反,如果你需要執行拆分,就把試探的拆分位置設在要解析的String塊的中間。但我們沒有直接使用這個拆分位置,因為要避免把詞在中間斷開,于是就往前找,直到找到一個空格。一旦找到了適當的拆分位置,就可以創建一個新的Spliterator來遍歷從當前位置到拆分位置的子串;把當前位置this設為拆分位置,因為之前的部分將由新Spliterator來處理,最后返回。
-
還需要遍歷的元素的estimatedSize就是這個Spliterator解析的String的總長度和當前遍歷的位置的差。
-
最后,characteristic方法告訴框架這個Spliterator是ORDERED(順序就是String中各個Character的次序)、SIZED(estimatedSize方法的返回值是精確的)、SUBSIZED(trySplit方法創建的其他Spliterator也有確切大小)、NONNULL(String中不能有為null的Character)和IMMUTABLE(在解析String時不能再添加Character,因為String本身是一個不可變類)的。
調用方法
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true);System.out.println("Found " + countWords(stream) + " words");運行結果
Found 19 wordsSpliterator還有最后一個值得注意的功能,就是可以在第一次遍歷、第一次拆分或第一次查詢估計大小時綁定元素的數據源,而不是在創建時就綁定。這種情況下,它稱為延遲綁定(late-binding)的Spliterator。
小結
- 內部迭代讓你可以并行處理一個流,而無需在代碼中顯式使用和協調不同的線程。
- 雖然并行處理一個流很容易,卻不能保證程序在所有情況下都運行得更快。并行軟件的行為和性能有時是違反直覺的,因此一定要測量,確保你并沒有把程序拖得更慢。
- 像并行流那樣對一個數據集并行執行操作可以提升性能,特別是要處理的元素數量龐大,或處理單個元素特別耗時的時候。
- 從性能角度來看,使用正確的數據結構,如盡可能利用原始流而不是一般化的流,幾乎總是比嘗試并行化某些操作更為重要。
- 分支/合并框架讓你得以用遞歸方式將可以并行的任務拆分成更小的任務,在不同的線程上執行,然后將各個子任務的結果合并起來生成整體結果。
- Spliterator定義了并行流如何拆分它要遍歷的數據。
總結
以上是生活随笔為你收集整理的《Java8实战》笔记(07):并行数据处理与性能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学点数学(4)-协方差矩阵
- 下一篇: 大数据学习(10)--流计算