拥抱 Java 8 并行流吧,速度飞起!
前言
在?Java7?之前,如果想要并行處理一個集合,我們需要以下幾步:
手動分成幾部分?
為每部分創(chuàng)建線程
在適當(dāng)?shù)臅r候合并。并且還需要關(guān)注多個線程之間共享變量的修改問題。
而?Java8?為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看吧
并行流
認(rèn)識和開啟并行流
什么是并行流:?并行流就是將一個流的內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個不同數(shù)據(jù)塊的流。例如有這么一個需求:
有一個 List 集合,而 list 中每個 apple 對象只有重量,我們也知道 apple 的單價是 5元/kg,現(xiàn)在需要計算出每個 apple 的單價,傳統(tǒng)的方式是這樣:
List<Apple>?appleList?=?new?ArrayList<>();?//?假裝數(shù)據(jù)是從庫里查出來的for?(Apple?apple?:?appleList)?{apple.setPrice(5.0?*?apple.getWeight()?/?1000); }我們通過迭代器遍歷 list 中的 apple 對象,完成了每個 apple 價格的計算。而這個算法的時間復(fù)雜度是 O(list.size()) 隨著 list 大小的增加,耗時也會跟著線性增加。并行流可以大大縮短這個時間。
并行流處理該集合的方法如下:
appleList.parallelStream().forEach(apple?->?apple.setPrice(5.0?*?apple.getWeight()?/?1000));和普通流的區(qū)別是這里調(diào)用的?parallelStream()?方法。當(dāng)然也可以通過 stream.parallel() 將普通流轉(zhuǎn)換成并行流。推薦看下:Java 8 創(chuàng)建 Stream 的 10 種方式,更多可以關(guān)注Java技術(shù)棧公眾號回復(fù)java獲取系列教程。
并行流也能通過 sequential() 方法轉(zhuǎn)換為順序流,但要注意:流的并行和順序轉(zhuǎn)換不會對流本身做任何實際的變化,僅僅是打了個標(biāo)記而已。并且在一條流水線上對流進(jìn)行多次并行 / 順序的轉(zhuǎn)換,生效的是最后一次的方法調(diào)用
并行流如此方便,它的線程從那里來呢?有多少個?怎么配置呢?
并行流內(nèi)部使用了默認(rèn)的?ForkJoinPool?線程池。默認(rèn)的線程數(shù)量就是處理器的核心數(shù),而配置系統(tǒng)核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。不過該值是全局變量。
改變他會影響所有并行流。目前還無法為每個流配置專屬的線程數(shù)。一般來說采用處理器核心數(shù)是不錯的選擇
測試并行流的性能
為了更容易的測試性能,我們在每次計算完蘋果價格后,讓線程睡 1s,表示在這期間執(zhí)行了其他 IO 相關(guān)的操作,并輸出程序執(zhí)行耗時,順序執(zhí)行的耗時:
public?static?void?main(String[]?args)?throws?InterruptedException?{List<Apple>?appleList?=?initAppleList();Date?begin?=?new?Date();for?(Apple?apple?:?appleList)?{apple.setPrice(5.0?*?apple.getWeight()?/?1000);Thread.sleep(1000);}Date?end?=?new?Date();log.info("蘋果數(shù)量:{}個, 耗時:{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000); }并行版本
List<Apple>?appleList?=?initAppleList();Date?begin?=?new?Date(); appleList.parallelStream() .forEach(apple?->{apple.setPrice(5.0?*?apple.getWeight()?/?1000);try?{Thread.sleep(1000);}?catch?(InterruptedException?e)?{e.printStackTrace();}}); Date?end?=?new?Date(); log.info("蘋果數(shù)量:{}個, 耗時:{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000);耗時情況
?
跟我們的預(yù)測一致,我的電腦是 四核I5 處理器,開啟并行后四個處理器每人執(zhí)行一個線程,最后 1s 完成了任務(wù)!
并行流可以隨便用嗎?
可拆分性影響流的速度
通過上面的測試,有的人會輕易得到一個結(jié)論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用?Stream?提供的內(nèi)部迭代來實現(xiàn)了。
事實真的是這樣嗎?并行流真的如此完美嗎?答案當(dāng)然是否定的。大家可以復(fù)制下面的代碼,在自己的電腦上測試。測試完后可以發(fā)現(xiàn),并行流并不總是最快的處理方式。
對于 iterate 方法來處理的前 n 個數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導(dǎo)致的慢。可并行版本是為什么慢呢?這里有兩個需要注意的點:
iterate 生成的是裝箱的對象,必須拆箱成數(shù)字才能求和
我們很難把 iterate 分成多個獨立的塊來并行執(zhí)行
這個問題很有意思,我們必須意識到某些流操作比其他操作更容易并行化。對于 iterate 來說,每次應(yīng)用這個函數(shù)都要依賴于前一次應(yīng)用的結(jié)果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因為并行化再次增加了開支。
而對于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的
共享變量修改的問題
并行流雖然輕易的實現(xiàn)了多線程,但是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分別使用順序流和并行流計算前n個自然數(shù)的和
public?static?long?sideEffectSum(long?n)?{Accumulator?accumulator?=?new?Accumulator();LongStream.rangeClosed(1,?n).forEach(accumulator::add);return?accumulator.total; }public?static?long?sideEffectParallelSum(long?n)?{Accumulator?accumulator?=?new?Accumulator();LongStream.rangeClosed(1,?n).parallel().forEach(accumulator::add);return?accumulator.total; }public?static?class?Accumulator?{private?long?total?=?0;public?void?add(long?value)?{total?+=?value;} }順序執(zhí)行每次輸出的結(jié)果都是:50000005000000,而并行執(zhí)行的結(jié)果卻五花八門了。這是因為每次訪問 totle 都會存在數(shù)據(jù)競爭,關(guān)于數(shù)據(jù)競爭的原因,大家可以看看關(guān)于?volatile?的博客。因此當(dāng)代碼中存在修改共享變量的操作時,是不建議使用并行流的。
并行流的使用注意
在并行流的使用上有下面幾點需要注意:
-
盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來處理數(shù)字,以避免頻繁拆裝箱帶來的額外開銷
-
要考慮流的操作流水線的總計算成本,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味著使用并行流帶來收益的可能性越大
例如:前端傳來幾種類型的資源,需要存儲到數(shù)據(jù)庫。每種資源對應(yīng)不同的表。我們可以視作類型數(shù)為 N,存儲數(shù)據(jù)庫的網(wǎng)絡(luò)耗時 + 插入操作耗時為 Q。一般情況下網(wǎng)絡(luò)耗時都是比較大的。因此該操作就比較適合并行處理。當(dāng)然當(dāng)類型數(shù)目大于核心數(shù)時,該操作的性能提升就會打一定的折扣了。更好的優(yōu)化方法在日后的博客會為大家奉上
-
對于較少的數(shù)據(jù)量,不建議使用并行流
-
容易拆分成塊的流數(shù)據(jù),建議使用并行流
以下是一些常見的集合框架對應(yīng)流的可拆分性能表:
| ArrayList | 極佳 |
| LinkedList | 差 |
| IntStream.range | 極佳 |
| Stream.iterate | 差 |
| HashSet | 好 |
| TreeSet | 好 |
碼字不易,如果你覺得讀完以后有收獲,不妨點個推薦讓更多的人看到吧!
版權(quán)聲明:本文首發(fā)于博客園,作者:后青春期的Keats 地址:https://www.cnblogs.com/keatsCoder/
總結(jié)
以上是生活随笔為你收集整理的拥抱 Java 8 并行流吧,速度飞起!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Docker+Jenkins+Nginx
- 下一篇: 两难!到底用 Spring BeanUt