生活随笔
收集整理的這篇文章主要介紹了
将一个简单远程调用的方式例子改为异步调用 -- 2
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
將一個簡單遠程調用的方式例子改為異步調用
第一版:https://www.cnblogs.com/nxzblogs/p/12766025.html
第二版:使用RxJava :(RxJava:https://github.com/ReactiveX/RxJava)
package com.xsxy.asynctest.test04;import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 廣播調用RPC*/
public class Test02RxJavaAsyncRpcCallTest {public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();public static ThreadPoolExecutor BIZ_EXECUTOR = new ThreadPoolExecutor(AVAILABLE_PROCESSORS, AVAILABLE_PROCESSORS, 10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException {// rxJavaRpcCall();// aSyncRxJavaRpcCall();// aSyncRpcCall2();aSync();// aSyncUserBizExecutor();}/*** rxjava 同步執行* 消耗時間大概為20s,因為rpcCall方法是同步調用的,調用線程就是main線程*/public static void rxJavaRpcCall() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0])).map(ip -> rpcCall(ip, ip)).subscribe(System.out::println);// 大概10sSystem.out.println("sync execute rxjavaRpcCall consume:" + (System.currentTimeMillis() - start));}/*** 異步調用* <p>* 在RxJava中,操作運算符不能直接使用Threads或ExecutorServices進行異步處理,而需要使用Schedulers來抽象統一API背后* 的并發調度線程池。RxJava提供了幾個可通過Schedulers訪問的標準調度執行器。* ● Schedulers.computation():在后臺運行固定數量的專用線程來計算密集型工作。大多數異步操作符使用它作為其默認調度線程池。* ● Schedulers.io():在動態變化的線程集合上運行類I/ O或阻塞操作。* ● Schedulers.single():以順序和FIFO方式在單個線程上運行。* ● Schedulers.trampoline():在其中一個參與線程中以順序和FIFO方式運行,通常用于測試目的。* <p>* RxJava還可以讓我們通過Schedulers.from(Executor)將現有的Executor(及其子類型,如ExecutorService)包裝到Scheduler中。* 例如,可以將其用于具有更大但仍然固定的線程池(與calculate()和io()不同)*/public static void aSyncRxJavaRpcCall() throws InterruptedException {long start = System.currentTimeMillis();// 使用 observeOn 讓 rpcCall 的執行由main函數所在線程切換到IO線程// 順序調用Flowable.fromArray(genIpList().toArray(new String[0]))// 切換到io線程執行.observeOn(Schedulers.io())// 映射結果.map(ip -> rpcCall(ip, ip))// 訂閱消費者.subscribe(System.out::println);// main函數不會等rpcCall調用完畢System.out.println("sync execute rxjavaRpcCall consume:" + (System.currentTimeMillis() - start));// 上邊代碼在沒有執行完10調用,main函數就結束了,因為IO線程時Deamon線程,而JVM退出的時機時沒有用戶線程// 所以需要將main函數掛起Thread.currentThread().join();// ##########################################################################################/*上代碼我們掛起了main函數所在線程,上面的代碼運行時main函數所在線程會馬上返回,然后執行sout輸出打印,并掛起自己;具體的10次rpc調用是在IO線程內執行的,到這里我們釋放了main函數所在線程來執行rpc調用,但是IO線程內的10個rpc調用還是順序執行的*/}/*** 讓10個rpc調用順序執行轉換為異步并發執行前,我們先看看另外一個操作符subscribeOn是如何在發射元素的線程執行比較耗時* 的操作時切換為異步執行的*/public static void aSyncRpcCall2() throws InterruptedException {long start = System.currentTimeMillis();Flowable.fromCallable(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();}return "Done";})// 發射元素異步執行// .subscribeOn(Schedulers.io())// 切換到io線程執行.observeOn(Schedulers.io())// 訂閱消費者.subscribe(System.out::println);// 該輸出語句按理說時會很快輸出的,但是事實并不是,還是會消耗2s左右,這個是因為,雖然消費時異步// 使用observeOn方法讓接收元素和處理元素的邏輯從main函數所在線程切換為其他線程,但是發射元素還是同步執行的// 所以我們還需要讓發射元素的邏輯異步化,而subscribeOn就是做這個事情的// (放開上邊的subscribeOn方法,這樣使用subscribeOn元素發射與observeOn接收操作全部都異步化)System.out.println("consume:" + (System.currentTimeMillis() - start));Thread.currentThread().join();// ##########################################################################################/*默認情況下被觀察對象與其上施加的操作符鏈的運行以及把運行結果通知給觀察者對象使用的是調用subscribe方法所在的線程,SubscribeOn操作符可以通過設置Scheduler來改變這個行為,讓上面的操作切換到其他線程來執行。ObserveOn操作符可以指定一個不同的Scheduler讓被觀察者對象使用其他線程來把結果通知給觀察者對象,并執行觀察者的回調函數。所以如果流發射元素時有耗時的計算或者阻塞IO,則可以通過使用subscribeOn操作來把阻塞的操作異步化(切換到其他線程來執行)。另外如果一旦數據就緒(數據發射出來),則可以通過使用observeOn來切換使用其他線程(比如前臺或者GUI線程)來對數據進行處理。需要注意SubscribeOn這個操作符指定的是被觀察者對象(發布者)本身在哪個調度器上執行,而且和在流上的操作鏈中SubscribeOn的位置無關,并且整個調用鏈上調用多次時,只有第一次才有效。而ObservableOn則是指定觀察者對象(訂閱者)在哪個調度器上接收被觀察者發來的通知,在操作符鏈上每當調用了ObservableOn這個操作符時都會進行線程的切換*/}/*** 回到10次rpc調用,如何使用flatmap和subscribeOn將同步轉為異步*/public static void aSync() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0]))// flatMap 將所有的ip轉換為 flowAble對象.flatMap(ip ->// 將每個ip作為數據源獲取一個流對象Flowable.just(ip)// 講發射邏輯改為異步.subscribeOn(Schedulers.io())// 使用map將ip對象轉為rpc調用結果 以上ipList所有的數據都是并發調用的.map(v -> rpcCall(v, v)))// 阻塞所有的rpc并發調用結束 阻塞的是main線程.blockingSubscribe(System.out::println);// 因為rpc調用是并發進行的,所以耗時大概為2.5秒System.out.println("async consume: " + (System.currentTimeMillis() - start));}/*** 回到10次rpc調用,如何使用flatmap和subscribeOn將同步轉為異步* <p>* 使用自定義線程池*/public static void aSyncUserBizExecutor() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0]))// flatMap 將所有的ip轉換為 flowAble對象.flatMap(ip ->// 將每個ip作為數據源獲取一個流對象Flowable.just(ip)// 講發射邏輯改為異步.subscribeOn(Schedulers.from(BIZ_EXECUTOR))// 使用map將ip對象轉為rpc調用結果 以上ipList所有的數據都是并發調用的.map(v -> rpcCall(v, v)))// 阻塞所有的rpc并發調用結束 阻塞的是main線程.blockingSubscribe(System.out::println);// 因為rpc調用是并發進行的,所以耗時大概為6.6秒System.out.println("async consume: " + (System.currentTimeMillis() - start));}/*** 簡單的rpcCall** @param ip* @param params*/public static String rpcCall(String ip, String params) {try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();}return params;}/*** 生成ipList** @return*/public static List<String> genIpList() {List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add("192.168.0." + i);}return list;}
}
總結
以上是生活随笔為你收集整理的将一个简单远程调用的方式例子改为异步调用 -- 2的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。