parallelstream启动的线程数_谈谈并行流parallelStream
一、parallelStream是什么
Java8中提供了能夠更方便處理集合數(shù)據(jù)的Stream類,其中parallelStream()方法能夠充分利用多核CPU的優(yōu)勢,使用多線程加快對集合數(shù)據(jù)的處理速度。不熟悉Stream類的同學(xué),可以先參考我的另外一篇文章Java8中Stream的常用方法
parallelStream()方法的源碼如下:
/** * @return a possibly parallel {@code Stream} over the elements in this * collection * @since 1.8 */ default Stream parallelStream() { return StreamSupport.stream(spliterator(), true); }從注釋的@return a possibly parallel可以看得出來,parallelStream()并不是一定返回一個并行流,有可能parallelStream()全是由主線程順序執(zhí)行的。
二、parallelStream內(nèi)部使用了哪些線程
以一個簡單的例子,來看看parallelStream內(nèi)部到底使用了哪些線程
Integer[] array = new Integer[]{1, 2, 3, 4, 5}; Arrays.asList(array).parallelStream().forEach(i -> { System.out.println(Thread.currentThread().getName() + " num:" + i); });輸出結(jié)果如下:
可以看得出來,結(jié)果是亂序輸出的,且參與并行處理的線程有主線程以及ForkJoinPool中的worker線程
三、Fork/Join框架
注:本文不會深入研究Fork/Join框架的源碼以及與線程池的異同點,只是僅僅解開parallelStream的面紗,后續(xù)會有更深入的文章去講解Fork/Join框架的原理。
parallelStream的底層是基于ForkJoinPool的,ForkJoinPool實現(xiàn)了ExecutorService接口,因此和線程池的關(guān)系微妙。(對線程池不熟悉的同學(xué),可以參考我的另外一篇文章說說線程池)
ForkJoinPool和ExecutorService的繼承關(guān)系如圖所示:
Fork/Join框架主要采用分而治之的理念來處理問題,對于一個比較大的任務(wù),首先將它拆分(fork)為兩個小任務(wù)task1與task2。
使用新的線程thread1去處理task1,thread2去處理task2。
如果thread1認為task1還是太大,則繼續(xù)往下拆分成新的子任務(wù)task3與task4。
thread2認為task2任務(wù)量不大,則立即進行處理,形成結(jié)果result2。
之后將task3和task4的處理結(jié)果合并(join)成result1,最后將result1與result2合并成最后的結(jié)果。
用圖來描述可能更加直觀:
下面用一個示例代碼,計算出1到10000的和(實際上應(yīng)該取到一個很大的數(shù)字,這里為了演示方便就到10000就結(jié)束)來演示ForkJoinPool的簡單使用。
package com.qcy.testStream;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;import java.util.stream.IntStream;/** * @author qcy * @create 2020/08/13 21:31:45 */public class Task extends RecursiveTask { //起始數(shù) private Integer startNum; //結(jié)束數(shù) private Integer endNum; //最大間隔數(shù) private Integer maxInterval; public Task(Integer startNum, Integer endNum, Integer maxInterval) { this.startNum = startNum; this.endNum = endNum; this.maxInterval = maxInterval; } @Override protected Integer compute() { if (endNum - startNum < maxInterval) { //任務(wù)足夠小,不需要拆分 return IntStream.rangeClosed(startNum, endNum).sum(); } //需要拆分任務(wù) int middleNum = (startNum + endNum) % 2 == 0 ? (startNum + endNum) / 2 : (startNum + endNum - 1) / 2; Task t1 = new Task(startNum, middleNum, maxInterval); Task t2 = new Task(middleNum + 1, endNum, maxInterval); //使用invokeAll,能讓這兩個任務(wù)被并行處理 invokeAll(t1, t2); //使用t1.fork()、t2.fork()則讓這兩個任務(wù)串行處理 return t1.join() + t2.join(); } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); Task task = new Task(1, 10000, 100); ForkJoinTask future = pool.submit(task); System.out.println(future.get()); }}當我們使用默認的不帶參數(shù)的方法構(gòu)造ForkJoinPool時,默認最大的線程并行數(shù)量是當前CPU的核數(shù)。在一定程度上,這樣做能夠減少線程上下文切換的次數(shù)。
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }當然,我們可以使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=x,其中x為ForkJoinPool中的線程數(shù)量,當設(shè)定為2時,則上述計算1到10000之和的任務(wù),總共只有兩個線程來處理任務(wù)。
注意此參數(shù)是全局的,會影響其他parallelStream中的線程總數(shù)。
但是對于第一個例子,一共會有3個線程來進行處理,多出來那個線程是主線程。如圖所示:
四、使用parallelStream的一些注意點
(1)parallelStream并行流一定要使用線程安全的對象,比如有這樣的一個場景
List list = new ArrayList<>(); IntStream.rangeClosed(1, 10000).parallel().forEach(i -> list.add(i));執(zhí)行就立即報錯了:
ArrayList本身就是一個線程不安全的容器,在多線程的操作下,擴容操作可能會導(dǎo)致產(chǎn)生數(shù)組越界的異常。
此時,要么使用線程安全的容器,比如Vector,要么使用collect完成串行收集。
List collect = IntStream.rangeClosed(1, 10000) .parallel() .boxed() .collect(Collectors.toList());(2)線程關(guān)聯(lián)的ThreadLocal將會失效
(不熟悉ThreadLocal的同學(xué),可以參考我的另外一篇文章淺談ThreadLocal)
這一點從第二小節(jié)就可以看出,主線程參與到parallelStream中的任務(wù)處理的過程中。如果我們處理的任務(wù)方法中包含對ThreadLocal的處理,可能除主線程之外的所有線程都獲取不到自己的線程局部變量,加之ForkJoinPool中的線程是反復(fù)使用的,線程關(guān)聯(lián)的ThreadLocal會發(fā)生共用的情況。
所以我的建議是,parallelStream中就不要使用ThreadLocal了,要么在任務(wù)處理方法中,第一行先進行ThreadLocal.set(),之后再由ThreadLocal.get()獲取到自己的線程局部變量。
非要用ThreadLocal的話,為了規(guī)避使用不當而帶來內(nèi)存泄漏的風(fēng)險,可以參考我的這篇文章ThreadLocal使用不好,小心造成內(nèi)存泄露!
(3)使用parallelStream也不一定會提升性能
在CPU資源緊張的時候,使用并行流可能會帶來頻繁的線程上下文切換,導(dǎo)致并行流執(zhí)行的效率還沒有串行執(zhí)行的效率高。
總結(jié)
以上是生活随笔為你收集整理的parallelstream启动的线程数_谈谈并行流parallelStream的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python怎么连接mysql数据库_p
- 下一篇: ai一个线段多个箭头_初学设计却分不清P