java 异步_聊聊java高并发系统之异步非阻塞
作者:孫偉,目前負(fù)責(zé)京東商品詳情頁統(tǒng)一服務(wù)系統(tǒng),寫過java,寫過ngx_lua,還寫過storm等,喜歡學(xué)習(xí)研究新事物。
在做電商系統(tǒng)時,流量入口如首頁、活動頁、商品詳情頁等系統(tǒng)承載了網(wǎng)站的大部分流量,而這些系統(tǒng)的主要職責(zé)包括聚合數(shù)據(jù)拼裝模板、熱點(diǎn)統(tǒng)計、緩存、下游功能降級開關(guān)、托底數(shù)據(jù)等等。其中聚合數(shù)據(jù)需要調(diào)用其它多個系統(tǒng)服務(wù)獲取數(shù)據(jù)、拼裝數(shù)據(jù)/模板然后返回給前端,聚合數(shù)據(jù)來源主要有依賴系統(tǒng)/服務(wù)、緩存、數(shù)據(jù)庫等;而系統(tǒng)之間的調(diào)用可以通過如http接口調(diào)用(如HttpClient)、SOA服務(wù)調(diào)用(如dubbo、thrift)等等。
在Java中,如使用Tomcat,一個請求會分配一個線程進(jìn)行請求處理,該線程負(fù)責(zé)獲取數(shù)據(jù)、拼裝數(shù)據(jù)或模板然后返回給前端;在同步調(diào)用獲取數(shù)據(jù)接口的情況下(等待依賴系統(tǒng)返回數(shù)據(jù)),整個線程是一直被占用并阻塞的。如果有大量的這種請求,每個請求占用一個線程,但線程一直處于阻塞,降低了系統(tǒng)的吞吐量,這將導(dǎo)致應(yīng)用的吞吐量下降;我們希望在調(diào)用依賴的服務(wù)響應(yīng)比較慢,此時應(yīng)該讓出線程和CPU來處理下一個請求,當(dāng)依賴的服務(wù)返回了再分配相應(yīng)的線程來繼續(xù)處理。而這應(yīng)該有更好的解決方案:異步/協(xié)程。而Java是不支持協(xié)程的(雖然有些Java框架說支持,但還是高層API的封裝),因此在Java中我們還可以使用異步來提升吞吐量。目前java一些開源框架(HttpClientHttpAsyncClient、dubbo、thrift等等)大部分都支持。
幾種調(diào)用方式
同步阻塞調(diào)用
即串行調(diào)用,響應(yīng)時間為所有服務(wù)的響應(yīng)時間總和;
半異步(異步Future)
線程池,異步Future,使用場景:并發(fā)請求多服務(wù),總耗時為最長響應(yīng)時間;提升總響應(yīng)時間,但是阻塞主請求線程,高并發(fā)時依然會造成線程數(shù)過多,CPU上下文切換;
全異步(Callback)
Callback方式調(diào)用,使用場景:不考慮回調(diào)時間且只能對結(jié)果做簡單處理,如果依賴服務(wù)是兩個或兩個以上服務(wù),則不能合并兩個服務(wù)的處理結(jié)果;不阻塞主請求線程,但使用場景有限。
異步回調(diào)鏈?zhǔn)骄幣?/strong>
異步回調(diào)鏈?zhǔn)骄幣?JDK8 CompletableFuture),使用場景:其實(shí)不是異步調(diào)用方式,只是對依賴多服務(wù)的Callback調(diào)用結(jié)果處理做結(jié)果編排,來彌補(bǔ)Callback的不足,從而實(shí)現(xiàn)全異步鏈?zhǔn)秸{(diào)用。
接下來看看如何設(shè)計利用全異步Callback調(diào)用和異步回調(diào)鏈?zhǔn)骄幣盘幚斫Y(jié)果來實(shí)現(xiàn)全異步系統(tǒng)設(shè)計。
同步阻塞調(diào)用
public class Test {
public static void main(String[] args) throws Exception {
RpcService rpcService = new RpcService();
HttpService httpService = new HttpService();
//耗時10ms
Map result1 = rpcService.getRpcResult();
//耗時20ms
Integer result2 = httpService.getHttpResult();
//總耗時30ms
}
static class RpcService {
Map getRpcResult() throws Exception {
//調(diào)用遠(yuǎn)程方法(遠(yuǎn)程方法耗時約10ms,可以使用Thread.sleep模擬)
}
}
static class HttpService {
Integer getHttpResult() throws Exception {
//調(diào)用遠(yuǎn)程方法(遠(yuǎn)程方法耗時約20ms,可以使用Thread.sleep模擬)
Thread.sleep(20);
return 0;
}
}
}
半異步(異步Future)
public class Test {
final static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
RpcService rpcService = new RpcService();
HttpService httpService = new HttpService();
Future> future1 = null;
Future future2 = null;
try {
future1 = executor.submit(() -> rpcService.getRpcResult());
future2 = executor.submit(() -> httpService.getHttpResult());
//耗時10ms
Map result1 = future1.get(300, TimeUnit.MILLISECONDS);
//耗時20ms
Integer result2 = future2.get(300, TimeUnit.MILLISECONDS);
//總耗時20ms
} catch (Exception e) {
if (future1 != null) {
future1.cancel(true);
}
if (future2 != null) {
future2.cancel(true);
}
throw new RuntimeException(e);
}
}
static class RpcService {
Map getRpcResult() throws Exception {
//調(diào)用遠(yuǎn)程方法(遠(yuǎn)程方法耗時約10ms,可以使用Thread.sleep模擬)
}
}
static class HttpService {
Integer getHttpResult() throws Exception {
//調(diào)用遠(yuǎn)程方法(遠(yuǎn)程方法耗時約20ms,可以使用Thread.sleep模擬)
}
}
}
全異步(Callback)
public class AsyncTest {
public staticHttpAsyncClient httpAsyncClient;
public static CompletableFuture getHttpData(String url) {
CompletableFuture asyncFuture = new CompletableFuture();
HttpPost post = new HttpPost(url);
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post);
AsyncCharConsumer consumer = newAsyncCharConsumer() {
HttpResponse response;
protected HttpResponse buildResult(final HttpContext context) {
return response;
}
…...
};
FutureCallback callback = new FutureCallback() {
public void completed(HttpResponse response) {
asyncFuture.complete(EntityUtils.toString(response.getEntity()));
}
…...
};
httpAsyncClient.execute(producer, consumer, callback);
return asyncFuture;
}
public static void main(String[] args) throws Exception {
AsyncTest.getHttpData("http://www.jd.com");
Thread.sleep(1000000);
}
}
本示例使用HttpAsyncClient演示。
異步回調(diào)鏈?zhǔn)骄幣?/strong>
CompletableFuture提供了50多個API,可以滿足所需的各種場景的異步處理的編排,在此列舉三個場景:
場景1:三個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程;
方法test1:
public static void test1() throws Exception {
HelloClientDemoTest service = new HelloClientDemoTest();
/**
* 場景1 兩個以上服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程
* 并且兩個服務(wù)也是異步非阻塞調(diào)用
*/
CompletableFuture future1 = service.getHttpData("http://www.jd.com");
CompletableFuture future2 = service.getHttpData("http://www.jd.com");
CompletableFuture future3 =service.getHttpData("http://www.jd.com");
List futureList = Lists.newArrayList(future1,future2, future3);
CompletableFuture allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));
CompletableFuture future4 =allDoneFuture.thenApply(v -> {
List result =futureList.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
//注意順序
String result1 = (String)result.get(0);
String result2 = (String)result.get(1);
String result3 = (String)result.get(2);
//處理業(yè)務(wù)....
return result1 + result2 + result3;
}).exceptionally(e -> {
//e.printStackTrace();
return "";
});
//返回
}
場景2、兩個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程;
方法test2:
public void test2() throws Exception {
HelloClientDemoTest service = new HelloClientDemoTest();
/**
* 場景2 兩個接口并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程
* 并且兩個服務(wù)也是異步非阻塞調(diào)用
*/
CompletableFuture future1 = service.getHttpData("http://www.jd.com");
CompletableFuture future2 =service.getHttpData("http://www.jd.com");
CompletableFuture future3 =future1.thenCombine(future2, (f1, f2) -> {
//處理業(yè)務(wù)....
return f1 +
總結(jié)
以上是生活随笔為你收集整理的java 异步_聊聊java高并发系统之异步非阻塞的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。