并行化:你的高并发大杀器
來(lái)源:咖啡拿鐵
1.前言
想必?zé)釔?ài)游戲的同學(xué)小時(shí)候,都幻想過(guò)要是自己要是能像鳴人那樣會(huì)多重影分身之術(shù),就能一邊打游戲一邊上課了,可惜漫畫(huà)就是漫畫(huà),現(xiàn)實(shí)中并沒(méi)有這個(gè)技術(shù),你要么只有老老實(shí)實(shí)的上課,要么就只有逃課去打游戲了。雖然在現(xiàn)實(shí)中我們無(wú)法實(shí)現(xiàn)多重影分身這樣的技術(shù),但是我們可以在計(jì)算機(jī)世界中實(shí)現(xiàn)我們這樣的愿望。
2.計(jì)算機(jī)中的分身術(shù)
計(jì)算機(jī)中的分身術(shù)不是天生就有了。在1971年,1971年,英特爾推出的全球第一顆通用型微處理器4004,由2300個(gè)晶體管構(gòu)成。當(dāng)時(shí),公司的聯(lián)合創(chuàng)始人之一戈登摩爾就提出大名鼎鼎的“摩爾定律”——每過(guò)18個(gè)月,芯片上可以集成的晶體管數(shù)目將增加一倍。最初的主頻740kHz(每秒運(yùn)行74萬(wàn)次),現(xiàn)在過(guò)了快50年了,大家去買(mǎi)電腦的時(shí)候會(huì)發(fā)現(xiàn)現(xiàn)在的主頻都能達(dá)到4.0GHZ了(每秒40億次)。
但是主頻越高帶來(lái)的收益卻是越來(lái)越小:
據(jù)測(cè)算,主頻每增加1G,功耗將上升25瓦,而在芯片功耗超過(guò)150瓦后,現(xiàn)有的風(fēng)冷散熱系統(tǒng)將無(wú)法滿足散熱的需要。有部分CPU都可以用來(lái)煎雞蛋了。
流水線過(guò)長(zhǎng),使得單位頻率效能低下,越大的主頻其實(shí)整體性能反而不如小的主頻。
戈登摩爾認(rèn)為摩爾定律未來(lái)10-20年會(huì)失效。
在單核主頻遇到瓶頸的情況下,多核CPU應(yīng)運(yùn)而生,不僅提升了性能,并且降低了功耗。所以多核CPU逐漸成為現(xiàn)在市場(chǎng)的主流,這樣讓我們的多線程編程也更加的容易。
說(shuō)到了多核CPU就一定要說(shuō)GPU,大家可能對(duì)這個(gè)比較陌生,但是一說(shuō)到顯卡就肯定不陌生,筆者搞過(guò)一段時(shí)間的CUDA編程,我才意識(shí)到這個(gè)才是真正的并行計(jì)算,大家都知道圖片像素點(diǎn)吧,比如19201080的圖片有210萬(wàn)個(gè)像素點(diǎn),如果想要把一張圖片的每個(gè)像素點(diǎn)都進(jìn)行轉(zhuǎn)換一下,那在我們java里面可能就要循環(huán)遍歷210萬(wàn)次。 就算我們用多線程8核CPU,那也得循環(huán)幾十萬(wàn)次。但是如果使用Cuda,最多可以365535*512=100661760(一億)個(gè)線程并行執(zhí)行,就這種級(jí)別的圖片那也是馬上處理完成。但是Cuda一般適合于圖片這種,有大量的像素點(diǎn)需要同時(shí)處理,但是指令集很少所以邏輯不能太復(fù)雜。
GPU只是用來(lái)擴(kuò)展介紹,感興趣可以和筆者交流。
3.應(yīng)用中的并行
一說(shuō)起讓你的服務(wù)高性能的手段,那么異步化,并行化這些肯定會(huì)第一時(shí)間在你腦海中顯現(xiàn)出來(lái),在之前的文章:《異步化,你的高并發(fā)大殺器》中已經(jīng)介紹過(guò)了異步化的優(yōu)化手段,有興趣的朋友可以看看。并行化可以用來(lái)配合異步化,也可以用來(lái)單獨(dú)做優(yōu)化。
我們可以想想有這么一個(gè)需求,在你下外賣(mài)訂單的時(shí)候,這筆訂單可能還需要查,用戶(hù)信息,折扣信息,商家信息,菜品信息等,用同步的方式調(diào)用,如下圖所示:
設(shè)想一下這5個(gè)查詢(xún)服務(wù),平均每次消耗50ms,那么本次調(diào)用至少是250ms,我們細(xì)想一下,在這個(gè)這五個(gè)服務(wù)其實(shí)并沒(méi)有任何的依賴(lài),誰(shuí)先獲取誰(shuí)后獲取都可以,那么我們可以想想,是否可以用多重影分身之術(shù),同時(shí)獲取這五個(gè)服務(wù)的信息呢?
優(yōu)化如下:
將這五個(gè)查詢(xún)服務(wù)并行查詢(xún),在理想情況下可以?xún)?yōu)化至50ms。當(dāng)然說(shuō)起來(lái)簡(jiǎn)單,我們真正如何落地呢?
3.1 CountDownLatch/Phaser
CountDownLatch和Phaser是JDK提供的同步工具類(lèi)Phaser是1.7版本之后提供的工具類(lèi)而CountDownLatch是1.5版本之后提供的工具類(lèi)。這里簡(jiǎn)單介紹一下CountDownLatch,可以將其看成是一個(gè)計(jì)數(shù)器,await()方法可以阻塞至超時(shí)或者計(jì)數(shù)器減至0,其他線程當(dāng)完成自己目標(biāo)的時(shí)候可以減少1,利用這個(gè)機(jī)制我們可以將其用來(lái)做并發(fā)。?
可以用如下的代碼實(shí)現(xiàn)我們上面的下訂單的需求:
public?class?CountDownTask?{????private?static?final?int?CORE_POOL_SIZE?=?4;
????private?static?final?int?MAX_POOL_SIZE?=?12;
????private?static?final?long?KEEP_ALIVE_TIME?=?5L;
????private?final?static?int?QUEUE_SIZE?=?1600;
????protected?final?static?ExecutorService?THREAD_POOL?=?new?ThreadPoolExecutor(CORE_POOL_SIZE,?MAX_POOL_SIZE,
????????????KEEP_ALIVE_TIME,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(QUEUE_SIZE));
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?新建一個(gè)為5的計(jì)數(shù)器
????????CountDownLatch?countDownLatch?=?new?CountDownLatch(5);
????????OrderInfo?orderInfo?=?new?OrderInfo();
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Customer,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setCustomerInfo(new?CustomerInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Discount,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setDiscountInfo(new?DiscountInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Food,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setFoodListInfo(new?FoodListInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Tenant,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setTenantInfo(new?TenantInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當(dāng)前任務(wù)OtherInfo,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setOtherInfo(new?OtherInfo());
????????????countDownLatch.countDown();
????????});
????????countDownLatch.await(1,?TimeUnit.SECONDS);
????????System.out.println("主線程:"+?Thread.currentThread().getName());
????}
}
建立一個(gè)線程池(具體配置根據(jù)具體業(yè)務(wù),具體機(jī)器配置),進(jìn)行并發(fā)的執(zhí)行我們的任務(wù)(生成用戶(hù)信息,菜品信息等),最后利用await方法阻塞等待結(jié)果成功返回。
3.2CompletableFuture
相信各位同學(xué)已經(jīng)發(fā)現(xiàn),CountDownLatch雖然能實(shí)現(xiàn)我們需要滿足的功能但是其任然有個(gè)問(wèn)題是,在我們的業(yè)務(wù)代碼需要耦合CountDownLatch的代碼,比如在我們獲取用戶(hù)信息之后我們會(huì)執(zhí)行countDownLatch.countDown(),很明顯我們的業(yè)務(wù)代碼顯然不應(yīng)該關(guān)心這一部分邏輯,并且在開(kāi)發(fā)的過(guò)程中萬(wàn)一寫(xiě)漏了,那我們的await方法將只會(huì)被各種異常喚醒。
所以在JDK1.8中提供了一個(gè)類(lèi)CompletableFuture,它是一個(gè)多功能的非阻塞的Future。(什么是Future:用來(lái)代表異步結(jié)果,并且提供了檢查計(jì)算完成,等待完成,檢索結(jié)果完成等方法。)在我之前的這篇文章中詳細(xì)介紹了《異步技巧之CompletableFuture》,有興趣的可以看這篇文章。
我們將每個(gè)任務(wù)的計(jì)算完成的結(jié)果都用CompletableFuture來(lái)表示,利用CompletableFuture.allOf匯聚成一個(gè)大的CompletableFuture,那么利用get()方法就可以阻塞。
public?class?CompletableFutureParallel?{????private?static?final?int?CORE_POOL_SIZE?=?4;
????private?static?final?int?MAX_POOL_SIZE?=?12;
????private?static?final?long?KEEP_ALIVE_TIME?=?5L;
????private?final?static?int?QUEUE_SIZE?=?1600;
????protected?final?static?ExecutorService?THREAD_POOL?=?new?ThreadPoolExecutor(CORE_POOL_SIZE,?MAX_POOL_SIZE,
????????????KEEP_ALIVE_TIME,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(QUEUE_SIZE));
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
????????OrderInfo?orderInfo?=?new?OrderInfo();
????????//CompletableFuture?的List
????????List<CompletableFuture>?futures?=?new?ArrayList<>();
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Customer,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setCustomerInfo(new?CustomerInfo());
????????},?THREAD_POOL));
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Discount,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setDiscountInfo(new?DiscountInfo());
????????},?THREAD_POOL));
????????futures.add(?CompletableFuture.runAsync(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Food,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setFoodListInfo(new?FoodListInfo());
????????},?THREAD_POOL));
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當(dāng)前任務(wù)Other,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setOtherInfo(new?OtherInfo());
????????},?THREAD_POOL));
????????CompletableFuture?allDoneFuture?=?CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()]));
????????allDoneFuture.get(10,?TimeUnit.SECONDS);
????????System.out.println(orderInfo);
????}
}
可以看見(jiàn)我們使用CompletableFuture能很快的完成的需求,當(dāng)然這還不夠。
3.3 Fork/Join
我們上面用CompletableFuture完成了我們對(duì)多組任務(wù)并行執(zhí)行,但是其依然是依賴(lài)我們的線程池,在我們的線程池中使用的是阻塞隊(duì)列,也就是當(dāng)我們某個(gè)線程執(zhí)行完任務(wù)的時(shí)候需要通過(guò)這個(gè)阻塞隊(duì)列進(jìn)行,那么肯定會(huì)發(fā)生競(jìng)爭(zhēng),所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。
ForkJoinPool中每個(gè)線程都有自己的工作隊(duì)列,并且采用Work-Steal算法防止線程饑餓。 Worker線程用LIFO的方法取出任務(wù),但是會(huì)用FIFO的方法去偷取別人隊(duì)列的任務(wù),這樣就減少了鎖的沖突。
網(wǎng)上這個(gè)框架的例子很多,我們看看如何使用代碼其完成我們上面的下訂單需求:
public?class?OrderTask?extends?RecursiveTask<OrderInfo>?{????@Override
????protected?OrderInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????//?定義其他五種并行TasK
????????CustomerTask?customerTask?=?new?CustomerTask();
????????TenantTask?tenantTask?=?new?TenantTask();
????????DiscountTask?discountTask?=?new?DiscountTask();
????????FoodTask?foodTask?=?new?FoodTask();
????????OtherTask?otherTask?=?new?OtherTask();
????????invokeAll(customerTask,?tenantTask,?discountTask,?foodTask,?otherTask);
????????OrderInfo?orderInfo?=?new?OrderInfo(customerTask.join(),?tenantTask.join(),?discountTask.join(),?foodTask.join(),?otherTask.join());
????????return?orderInfo;
????}
????public?static?void?main(String[]?args)?{
????????ForkJoinPool?forkJoinPool?=?new?ForkJoinPool(Runtime.getRuntime().availableProcessors()?-1?);
????????System.out.println(forkJoinPool.invoke(new?OrderTask()));
????}
}
class?CustomerTask?extends?RecursiveTask<CustomerInfo>{
????@Override
????protected?CustomerInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?CustomerInfo();
????}
}
class?TenantTask?extends?RecursiveTask<TenantInfo>{
????@Override
????protected?TenantInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?TenantInfo();
????}
}
class?DiscountTask?extends?RecursiveTask<DiscountInfo>{
????@Override
????protected?DiscountInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?DiscountInfo();
????}
}
class?FoodTask?extends?RecursiveTask<FoodListInfo>{
????@Override
????protected?FoodListInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?FoodListInfo();
????}
}
class?OtherTask?extends?RecursiveTask<OtherInfo>{
????@Override
????protected?OtherInfo?compute()?{
????????System.out.println("執(zhí)行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?OtherInfo();
????}
}
我們定義一個(gè)OrderTask并且定義五個(gè)獲取信息的任務(wù),在compute中分別fork執(zhí)行這五個(gè)任務(wù),最后在將這五個(gè)任務(wù)的結(jié)果通過(guò)Join獲得,最后完成我們的并行化的需求。
3.4 parallelStream
在jdk1.8中提供了并行流的API,當(dāng)我們使用集合的時(shí)候能很好的進(jìn)行并行處理,下面舉了一個(gè)簡(jiǎn)單的例子從1加到100:
public?class?ParallelStream?{????public?static?void?main(String[]?args)?{
????????ArrayList<Integer>?list?=?new?ArrayList<Integer>();
????????for?(int?i?=?1;?i?<=?100;?i++)?{
????????????list.add(i);
????????}
????????LongAdder?sum?=?new?LongAdder();
????????list.parallelStream().forEach(integer?->?{
//????????????System.out.println("當(dāng)前線程"?+?Thread.currentThread().getName());
????????????sum.add(integer);
????????});
????????System.out.println(sum);
????}
}
parallelStream中底層使用的那一套也是Fork/Join的那一套,默認(rèn)的并發(fā)程度是可用CPU數(shù)-1。
3.5 分片
可以想象有這么一個(gè)需求,每天定時(shí)對(duì)id在某個(gè)范圍之間的用戶(hù)發(fā)券,比如這個(gè)范圍之間的用戶(hù)有幾百萬(wàn),如果給一臺(tái)機(jī)器發(fā)的話,可能全部發(fā)完需要很久的時(shí)間,所以分布式調(diào)度框架比如:elastic-job都提供了分片的功能,比如你用50臺(tái)機(jī)器,那么id%50=0的在第0臺(tái)機(jī)器上,=1的在第1臺(tái)機(jī)器上發(fā)券,那么我們的執(zhí)行時(shí)間其實(shí)就分?jǐn)偟搅瞬煌臋C(jī)器上了。
4.并行化注意事項(xiàng)
線程安全:在parallelStream中我們列舉的代碼中使用的是LongAdder,并沒(méi)有直接使用我們的Integer和Long,這個(gè)是因?yàn)樵诙嗑€程環(huán)境下Integer和Long線程不安全。所以線程安全我們需要特別注意。
合理參數(shù)配置:可以看見(jiàn)我們需要配置的參數(shù)比較多,比如我們的線程池的大小,等待隊(duì)列大小,并行度大小以及我們的等待超時(shí)時(shí)間等等,我們都需要根據(jù)自己的業(yè)務(wù)不斷的調(diào)優(yōu)防止出現(xiàn)隊(duì)列不夠用或者超時(shí)時(shí)間不合理等等。
5.最后
本文介紹了什么是并行化,并行化的各種歷史,在Java中如何實(shí)現(xiàn)并行化,以及并行化的注意事項(xiàng)。希望大家對(duì)并行化有個(gè)比較全面的認(rèn)識(shí)。最后給大家提個(gè)兩個(gè)小問(wèn)題:
在我們并行化當(dāng)中有某個(gè)任務(wù)如果某個(gè)任務(wù)出現(xiàn)了異常應(yīng)該怎么辦?
在我們并行化當(dāng)中有某個(gè)任務(wù)的信息并不是強(qiáng)依賴(lài),也就是如果出現(xiàn)了問(wèn)題這部分信息我們也可以不需要,當(dāng)并行化的時(shí)候,這種任務(wù)出現(xiàn)了異常應(yīng)該怎么辦?
-END-
?近期熱文:
重磅:Elasticsearch上市!市值近50億美元
利用SPRING管理熱加載的GROOVY對(duì)象!
Spring Boot中如何擴(kuò)展XML請(qǐng)求和響應(yīng)的支持
Java 11正式發(fā)布,新特性解讀
系統(tǒng)優(yōu)化總結(jié)—系統(tǒng)層面
NIO相關(guān)基礎(chǔ)篇
以Dubbo為例,聊聊如何為開(kāi)源項(xiàng)目做貢獻(xiàn)
25個(gè)面試中最常問(wèn)的問(wèn)題和答案
關(guān)注我
點(diǎn)擊“閱讀原文”,看本號(hào)其他精彩內(nèi)容
總結(jié)
以上是生活随笔為你收集整理的并行化:你的高并发大杀器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 脉冲耦合神经网络(PCNN)-pulse
- 下一篇: 测试鼠标宏软件,KINBAS VP900