同步代码和异步代码_告别异步代码
同步代碼和異步代碼
Quasar是一個將真正的輕量級線程(纖維)添加到JVM的庫。 它們非常便宜且非常快-實際上,光纖的行為就像Erlang進程或Go goroutines-并允許您編寫簡單的阻塞代碼,同時享受與復雜異步代碼相同的性能優(yōu)勢。
在本文中,我們將學習如何將任何基于回調(diào)的異步API轉(zhuǎn)換為漂亮的(光纖)阻塞API。 它適用于希望將自己的或第三方庫與Quasar光纖集成的用戶。 如果您僅將Quasar光纖與通道或演員一起使用,或者利用Comsat項目中已經(jīng)提供的許多集成功能,則不需要了解這些知識(下面提供的代碼是應用程序開發(fā)人員從未看到的代碼)。 但是,即使您不這樣做,您也可能會發(fā)現(xiàn)這篇文章對理解Quasar如何發(fā)揮其魔力很有幫助。
為什么異步?
首先,許多庫提供異步API的原因是OS可以處理的正在運行的1個線程的數(shù)量遠遠少于OS可以維護的開放TCP連接的數(shù)量。 也就是說,您的機器可以支持比線程所提供的更高的并發(fā)性,因此庫以及使用它們的開發(fā)人員會放棄線程作為用于軟件并發(fā)2單元的抽象。 異步API不會阻塞線程,并且會導致顯著的性能提升(通常在吞吐量和服務器容量方面,而在延遲方面卻沒有那么多)。
但是,使用異步API也會創(chuàng)建正確獲得“回調(diào)地獄”名稱的代碼。 在缺乏多核處理的環(huán)境(例如Javascript)中,回調(diào)地獄已經(jīng)很糟糕了。 在諸如JVM之類的地方,您需要關心內(nèi)存可見性和同步性會變得更糟。
編寫在光纖上運行的阻塞代碼具有與異步代碼相同的優(yōu)點,但沒有缺點:您使用了不錯的阻塞API(甚至可以繼續(xù)使用現(xiàn)有的API),但是卻獲得了非阻塞代碼的所有性能優(yōu)勢。
可以肯定的是,異步API還有一個優(yōu)勢:它們使您可以同時分派多個IO操作(例如HTTP請求)。 因為這些操作通常需要很長時間才能完成,而且通常是獨立的,所以我們可以同時等待其中的幾個完成。 但是,Java期貨也可以使用此有用的功能,而無需回調(diào)。 稍后,我們將看到如何制作博克期貨。
光纖異步
許多現(xiàn)代的Java IO /數(shù)據(jù)庫庫/驅(qū)動程序都提供兩種API:一種是同步(線程)阻塞的API,另一種是基于回調(diào)的異步API(對于NIO,JAX-RS客戶端,Apache HTTP客戶端以及更多的API來說都是如此。 )。 同步API更好。
Quasar有一個編程工具,可以將任何基于回調(diào)的異步API轉(zhuǎn)換為一個很好的阻止光纖的API: FiberAsync 。 本質(zhì)上, FiberASync作用是阻止當前光纖,安裝異步回調(diào),并在觸發(fā)該回調(diào)時,它將再次喚醒光纖,并返回操作結果(如果失敗,則引發(fā)異常)。
為了了解如何使用FiberAsync ,我們將看一個API示例: FooClient 。 FooClient是一種現(xiàn)代的IO API,因此有兩種形式,一種是同步的,線程阻塞的一種,另一種是異步的。 他們來了:
interface FooClient {String op(String arg) throws FooException, InterruptedException; }interface AsyncFooClient {Future<String> asyncOp(String arg, FooCompletion<String> callback); }interface FooCompletion<T> {void success(T result);void failure(FooException exception); }請注意異步操作(如許多現(xiàn)代庫中的情況)如何都需要回調(diào)并返回前途。 現(xiàn)在,讓我們忽略未來。 我們稍后再講。
FooClient比AsyncFooClient更好,更簡單,但是它阻塞了線程并大大降低了吞吐量。 我們想要創(chuàng)建一個FooClient接口的實現(xiàn),該接口可以在光纖中運行并阻塞光纖,因此我們獲得了簡單的代碼和出色的吞吐量。 為此,我們將在AsyncFooClient使用AsyncFooClient ,但將其轉(zhuǎn)換為阻止光纖的FooClient 。 這是我們需要的所有代碼(我們將進一步對其進行簡化):
public class FiberFooClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return new FiberAsync<String, FooException>() {@Overrideprotected void requestAsync() {asyncClient.asyncOp(arg, new FooCompletion<String>() {public void success(String result) {FiberAsync.this.asyncCompleted(result);}public void failure(FooException exception) {FiberAsync.this.asyncFailed(exception);}});}}.run();} catch(SuspendExecution e) {throw new AssertionError(e);}} }現(xiàn)在,這是怎么回事? 我們正在實施的FooClient接口,但我們正在做op纖維粘連,而不是線程阻塞。 我們需要告訴Quasar我們的方法是光纖阻塞(或“可掛起”),因此我們使用@Suspendable對其進行@Suspendable 。
然后,我們將FiberAsync子類FiberAsync并實現(xiàn)requestAsync方法( FiberAsync接受的兩個通用類型參數(shù)是返回類型和操作可能拋出的已檢查異常的類型(如果有的話);對于未檢查的異常,第二個通用參數(shù)應為RuntimeException )。 requestAsync負責啟動異步操作并注冊回調(diào)。 然后,回調(diào)需要調(diào)用asyncCompleted (如果操作成功)并將其傳遞給我們希望返回的結果,或者asyncFailed (如果操作失敗)并將失敗原因的異常傳遞給它。
最后,我們調(diào)用FiberAsync.run() 。 這將阻止當前光纖,并調(diào)用requestAsync以安裝回調(diào)。 纖維將保持阻塞,直到回調(diào)被觸發(fā),它會釋放出FiberAsync通過調(diào)用或者asyncCompleted或asyncFailed 。 run方法還具有一個帶超時參數(shù)的版本,如果我們想對阻塞操作進行時間限制(通常是個好主意),該方法很有用。
需要解釋的另一件事是try/catch塊。 有兩種方法可聲明為可@Suspendable的方法:用@Suspendable對其進行注釋,或聲明其引發(fā)已檢查的異常SuspendExecution 。 FiberAsync的run方法使用了后者,因此為了編譯代碼,我們需要捕獲SuspendExecution ,但是由于它不是真正的異常,因此我們永遠無法真正捕獲它(嗯,至少在Quasar運行正常的情況下,至少不是這樣) –因此為AssertionError 。
完成后,您可以在任何光纖中使用op ,如下所示:
new Fiber<Void>(() ->{// ...String res = client.op();// ... }).start();順便說一句,所有的要短很多與脈沖星 (類星體的Clojure的API),其中異步操作:
(async-op arg #(println "result:" %))使用Pulsar的await宏將其轉(zhuǎn)換為以下同步的光纖阻塞代碼:
(println "result:" (await (async-op arg)))簡化和批量生產(chǎn)
通常,像FooClient這樣的接口將具有許多方法,并且通常, AsyncFooClient大多數(shù)方法將采用相同類型的回調(diào)( FooCompletion )。 如果是這種情況,我們可以將我們已經(jīng)看到的許多代碼封裝到FiberAsync的命名子類中:
abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> {@Overridepublic void success(T result) {asyncCompleted(result);}@Overridepublic void failure(FooException exception) {asyncFailed(exception);}@Override@Suspendablepublic T run() throws FooException, InterruptedException {try {return super.run();} catch (SuspendExecution e) {throw new AssertionError();}}@Override@Suspendablepublic T run(long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException {try {return super.run(timeout, unit);} catch (SuspendExecution e) {throw new AssertionError();}} }請注意,我們?nèi)绾问笷iberAsync直接實現(xiàn)FooCompletion回調(diào)–不是必需的,但這是一個有用的模式。 現(xiàn)在,我們的光纖阻塞op方法要簡單得多,并且該接口中的其他操作也可以輕松實現(xiàn):
@Override @Suspendable public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}}.run(); }有時,我們可能希望在常規(guī)線程而不是光纖上調(diào)用op方法。 默認情況下,如果在線程上調(diào)用FiberAsync.run() , FiberAsync.run()引發(fā)異常。 為了解決這個問題,我們要做的就是實現(xiàn)另一個FiberAsync方法requestSync ,如果在光纖上調(diào)用run ,它將調(diào)用原始的同步API。 我們的最終代碼如下(我們假設FiberFooClass具有類型為FooClient的syncClient字段):
@Override @Suspendable public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}public String requestSync() {return syncClient.op(arg);}}.run(); }就是這樣!
期貨
期貨是一種方便的方式,可以在我們等待所有獨立的IO操作完成時同時開始。 我們希望我們的纖維能夠阻擋期貨。 許多Java庫通過其異步操作返回期貨,因此用戶可以在完全異步,基于回調(diào)的用法和采用期貨的“半同步”用法之間進行選擇。 我們的AsyncFooClient接口就是這樣。
這是我們實現(xiàn)AsyncFooClient版本的AsyncFooClient ,該版本返回阻塞光纖的期貨:
import co.paralleluniverse.strands.SettableFuture;public class FiberFooAsyncClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Overridepublic Future<String> asyncOp(String arg, FooCompletion<String> callback) {final SettableFuture<T> future = new SettableFuture<>();asyncClient.asyncOp(arg, callbackFuture(future, callback))return future;}private static <T> FooCompletion<T> callbackFuture(final SettableFuture<T> future, final FooCompletion<T> callback) {return new FooCompletion<T>() {@Overridepublic void success(T result) {future.set(result);callback.completed(result);}@Overridepublic void failure(Exception ex) {future.setException(ex);callback.failed(ex);}@Overridepublic void cancelled() {future.cancel(true);callback.cancelled();}};} }如果返回, co.paralleluniverse.strands.SettableFuture返回co.paralleluniverse.strands.SettableFuture ,如果我們在光纖或普通線程(即任何類型的絞線上 )上對其進行阻塞,則效果同樣良好。
JDK 8的CompletableFuture和Guava的ListenableFuture
可以使用預先構建的FiberAsync使返回CompletionStage (或?qū)崿F(xiàn)它的CompletableFuture )的API(在JDK 8中添加到Java中)變得更容易實現(xiàn)光纖阻塞。 例如,
CompletableFuture<String> asyncOp(String arg);通過以下方式變成光纖阻塞呼叫:
String res = AsyncCompletionStage.get(asyncOp(arg));返回Google Guava的方法類似地轉(zhuǎn)換為光纖阻塞同步,因此:
ListenableFuture<String> asyncOp(String arg);通過以下方式變成光纖阻塞:
String res = AsyncListenableFuture.get(asyncOp(arg));期貨的替代品
盡管期貨是有用且熟悉的,但我們實際上并不需要使用纖維時返回它們的特殊API。 產(chǎn)生的纖維是如此便宜( Fiber類實現(xiàn)了Future ,因此纖維本身可以代替“手工”的期貨。 這是一個例子:
void work() {Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op("first operation"));Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op("second operation"));String res1 = f1.get();String res2 = f2.get(); }因此,即使我們使用的API不提供,光纖也可以為我們提供期貨。
如果沒有異步API怎么辦?
有時我們很不幸地遇到一個僅提供同步的線程阻塞API的庫。 JDBC是此類API的主要示例。 盡管Quasar不能提高使用此類庫的吞吐量,但仍然值得使API光纖兼容(實際上非??常容易)。 為什么? 因為調(diào)用同步服務的光纖也可能做其他事情。 實際上,它們可能很少調(diào)用該服務(僅當發(fā)生高速緩存未命中時,才考慮從RDBMS讀取數(shù)據(jù)的光纖)。
實現(xiàn)此目的的方法是通過在專用線程池中執(zhí)行實際的調(diào)用,然后通過FiberAsync封裝該假的異步API,將阻塞API轉(zhuǎn)變?yōu)楫惒紸PI。 這個過程是如此機械, FiberAsync有一些靜態(tài)方法可以為我們處理所有事情。 因此,假設我們的服務僅公開了阻塞的FooClient API。 要使其成為光纖阻塞,我們要做的是:
public class SadFiberFooClient implements FooClient {private final FooClient client;private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool();public FiberFooClient(FooClient client) {this.client = client;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op());} catch(SuspendExecution e) {throw new AssertionError(e);}} }FooClient此實現(xiàn)可以安全地用于線程和光纖。 實際上,當在普通線程上調(diào)用該方法時,該方法將不會麻煩將操作分配給提供的線程池,而是在當前線程上執(zhí)行該操作-就像我們使用原始FooClient實現(xiàn)時那樣。
結論
此處顯示的技術FiberAsync和cpstrands.SettableFuture正是構成Comsat項目的集成模塊的工作方式。 Comsat包括Servlet,JAX-RS(服務器和客戶端),JDBC,JDBI,jOOQ,MongoDB,Retrofit和Dropwizard的集成。
重要的是要查看如何-創(chuàng)建簡單且高性能的光纖阻塞API-我們確實重新實現(xiàn)了API 接口 ,但沒有實現(xiàn)其內(nèi)部工作:仍然僅通過其異步API來使用原始庫代碼,其丑陋之處在于現(xiàn)在對圖書館用戶隱藏了。
額外信用:單子怎么樣?
除了纖程外,還有其他方法可以處理回調(diào)地獄。 JVM世界中最著名的機制是Scala的可組合期貨,RxJava的可觀察對象以及JDK 8的CompletionStage / CompletableFuture 。 這些都是單子和單子組成的例子。 Monad可以工作,有些人喜歡使用它們,但是我認為對于大多數(shù)編程語言而言,它們是錯誤的方法。
您會看到,單子是從基于lambda演算的編程語言中借用的。 Lambda演算是一種理論計算模型,與Turing機器完全不同,但完全類似。 但是與圖靈機模型不同,lambda演算計算沒有步驟,動作或狀態(tài)的概念。 這些計算沒有做任何事情; 他們只是。 那么,Monads是Haskell等基于LC的語言將動作,狀態(tài),時間等描述為純計算的一種方式。 它們是LC語言告訴計算機“先執(zhí)行然后再執(zhí)行”的一種方法。
問題是,命令式語言已經(jīng)有了“先做然后再做”的抽象,而這種抽象就是線程。 不僅如此,而且是必須的語言通常有一個非常簡單的符號“這樣做,然后做”:聲明此后跟該語句。 命令性語言甚至考慮采用這種外來概念的唯一原因是因為(通過OS內(nèi)核)線程的實現(xiàn)不令人滿意。 但是,與其采用一個陌生,陌生的概念(并且該概念需要完全不同的API類型),不如采用一個相似但細微不同的抽象,最好是修復(線程)的實現(xiàn)。 光纖保留抽象并修復實現(xiàn)。
Java和Scala等語言中的monad的另一個問題是,這些語言不僅勢在必行,而且還允許不受限制的共享狀態(tài)突變和副作用-Haskell卻沒有。 無限制的共享狀態(tài)突變和“線程”單核的結合可能是災難性的。 在純FP語言中-由于副作用是受控的-計算單位(即函數(shù))也是并發(fā)單位:您可以安全地同時執(zhí)行任何一對函數(shù)。 當您不受限制的副作用時,情況并非如此。 函數(shù)執(zhí)行的順序,兩個函數(shù)是否可以同時執(zhí)行以及一個函數(shù)是否以及何時可以觀察到另一個函數(shù)執(zhí)行的共享狀態(tài)突變都是非常重要的問題。 結果,作為“線程” monad的一部分運行的函數(shù)要么必須是純函數(shù)(沒有任何副作用),要么必須非常小心如何執(zhí)行這些副作用。 這正是我們要避免的事情。 因此,盡管單子組合確實比回調(diào)地獄生成了更好的代碼,但它們不能解決異步代碼引入的任何并發(fā)問題。
聚苯乙烯
上一節(jié)不應理解為像Haskell這樣的純“ FP”語言的認可,因為我實際上認為它們帶來了太多其他問題。 我相信(不久的將來)命令性語言3將允許共享狀態(tài)變異,但具有一些事務語義。 我相信那些未來的語言將主要從Clojure和Erlang等語言中獲得靈感。
翻譯自: https://www.javacodegeeks.com/2015/04/farewell-to-asynchronous-code.html
同步代碼和異步代碼
總結
以上是生活随笔為你收集整理的同步代码和异步代码_告别异步代码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑鼠标经常卡住不动怎么办 更新鼠标驱动
- 下一篇: 橙光游戏电脑版(橙光游戏电脑版网页)