反应式框架Reactor中的Mono和Flux
Reactive基于事件驅動(事件模式或者說訂閱者模式),類似于Netty異步事件編程模型,對不同的事件做不同的處理。所有信息都通過一個編程模型處理,就像水在管道里面運動一樣(這里把事件比作水流)
- 響應流必須是無阻塞的。
- 響應流必須是一個數據流。
- 它必須可以異步執行。
- 并且它也應該能夠處理背壓。
背壓是反應流中的一個重要概念,可以理解為,生產者可以感受到消費者反饋的消費壓力,并根據壓力進行動態調整生產速率。
反應式編程框架主要采用了觀察者模式,而Spring Reactor的核心則是對觀察者模式的一種衍伸。關于觀察者模式的架構中被觀察者(Observable)和觀察者(Subscriber)處在不同的線程環境中時,由于者各自的工作量不一樣,導致它們產生事件和處理事件的速度不一樣,這時就出現了兩種情況:
- 被觀察者產生事件慢一些,觀察者處理事件很快。那么觀察者就會等著被觀察者發送事件好比觀察者在等米下鍋,程序等待)。
- 被觀察者產生事件的速度很快,而觀察者處理很慢。那就出問題了,如果不作處理的話,事件會堆積起來,最終擠爆你的內存,導致程序崩潰。(好比被觀察者生產的大米沒人吃,堆積最后就會爛掉)。為了方便下面理解Mono和Flux,也可以理解為Publisher(發布者也可以理解為被觀察者)主動推送數據給Subscriber(訂閱者也可以叫觀察者),如果Publisher發布消息太快,超過了Subscriber的處理速度,如何處理。這時就出現了Backpressure(背壓—–指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略)
在傳統的編程范式中,我們一般通過迭代器(Iterator)模式來遍歷一個序列。這種遍歷方式是由調用者來控制節奏的,采用的是拉的方式。每次由調用者通過next()方法來獲取序列中的下一個值。
使用反應式流時采用的則是推的方式,即常見的發布者-訂閱者模式。當發布者有新的數據產生時,這些數據會被推送到訂閱者來進行處理。在反應式流上可以添加各種不同的操作來對數據進行處理,形成數據處理鏈。這個以聲明式的方式添加的處理鏈只在訂閱者進行訂閱操作時才會真正執行。
反應式編程來源于數據流和變化的傳播,意味著由底層的執行模型負責通過數據流來自動傳播變化。比如求值一個簡單的表達式 c=a+b,當 a 或者 b 的值發生變化時,傳統的編程范式需要對 a+b 進行重新計算來得到 c 的值。如果使用反應式編程,當 a 或者 b 的值發生變化時,c 的值會自動更新。
- Reactive Streams 是規范,
- Reactor 實現了 Reactive Streams。
- Web Flux 以 Reactor 為基礎,實現 Web 領域的反應式編程框架
關于Mono和Flux
在Reactor中,經常使用的類并不多,主要有以下兩個:
Mono 實現了 org.reactivestreams.Publisher 接口,代表0到1個元素的發布者(Publisher)。
Flux 同樣實現了 org.reactivestreams.Publisher 接口,代表0到N個元素的發布者(Subscriber)。
Flux 和 Mono 是 Reactor 中的兩個基本概念。Mono和Flux都是Publisher(發布者)。
Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。
Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉換。對一個 Flux 序列進行計數操作,得到的結果是一個 Mono<Long>對象。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。
Publisher
由于響應流的特點,我們不能再返回一個簡單的POJO對象來表示結果了。必須返回一個類似Java中的Future的概念,在有結果可用時通知消費者進行消費響應。
Reactive Stream規范中這種被定義為Publisher<T>?,Publisher<T>是一個可以提供0-N個序列元素的提供者,并根據其訂閱者Subscriber<? super T>的需求推送元素。一個Publisher<T>可以支持多個訂閱者,并可以根據訂閱者的邏輯進行推送序列元素。
響應式的一個重要特點:當沒有訂閱時發布者什么也不做。
區分響應式API的類型:從返回的類型我們就可以知道一個方法會“發射并忘記”或“請求并等待”(Mono),還是在處理一個包含多個數據項的流(Flux)。Flux和Mono的一些操作利用了這個特點在這兩種類型間互相轉換。例如,調用Flux<T>的single()方法將返回一個Mono<T>,而使用concatWith()方法把兩個Mono串在一起就可以得到一個Flux。
Flux
flux可以觸發零到多個事件,并根據實際情況結束處理或觸發錯誤。
Flux?是一個發出(emit)0-N個元素組成的異步序列的Publisher<T>,可以被onComplete信號或者onError信號所終止。在響應流規范中存在三種給下游消費者調用的方法?onNext,?onComplete, 和onError。
創建
第一種方式是通過 Flux 類中的靜態方法。
just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在發布這些元素之后會自動結束。
fromArray(),fromIterable()和 fromStream():可以從一個數組、Iterable 對象或 Stream 對象中創建 Flux 對象。
empty():創建一個不包含任何元素,只發布結束消息的序列。
never():創建一個不包含任何消息通知的序列。
range(int start, int count):創建包含從 start 起始的 count 個數量的 Integer 對象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):創建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素發布之前的延遲時間。
intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過該方法通過毫秒數來指定時間間隔和延遲時間。
例1. 通過 Flux 類的靜態方法創建 Flux 序列
Flux.just("Hello", "World").subscribe(System.out::println);Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);Flux.empty().subscribe(System.out::println);Flux.range(1, 10).subscribe(System.out::println);Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);Flux.intervalMillis(1000).subscribe(System.out::println);上面的這些靜態方法適合于簡單的序列生成,當序列的生成需要復雜的邏輯時,則應該使用 generate() 或 create() 方法。
Flux.create((t) -> {t.next("create");t.next("create1");t.complete();}).subscribe(System.out::println);Flux.generate(sink -> {sink.next("Hello");sink.complete(); }).subscribe(System.out::println);final Random random = new Random(); Flux.generate(ArrayList::new, (list, sink) -> {int value = random.nextInt(100);list.add(value);sink.next(value);if (list.size() == 10) {sink.complete();}return list; }).subscribe(System.out::println);create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對象。FluxSink 支持同步和異步的消息產生,并且可以在一次調用中產生多個元素。
Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next(i);}sink.complete(); }).subscribe(System.out::println);Mono
?多只觸發一個事件,可以把Mono<Void>用于在異步任務完成時發出通知?
Mono?是一個發出(emit)0-1個元素的Publisher<T>,可以被onComplete信號或者onError信號所終止。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println); Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);//empty():創建一個不包含任何元素,只發布結束消息的序列 Mono.empty().subscribe(System.out::println); #允許消費者在有結果可用時進行消費//just():可以指定序列中包含的全部元素。創建出來的 Mono序列在發布這些元素之后會自動結束。 Mono.just("www.xttblog.com").subscribe(System.out::println);//ustOrEmpty():從一個 Optional 對象或可能為 null 的對象中創建 Mono。 //只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產生對應的元素。 Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("業余草").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("業余草")).subscribe(System.out::println);//error(Throwable error):創建一個只包含錯誤消息的序列。 Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);//never():創建一個不包含任何消息通知的序列。 Mono.never().subscribe(System.out::println);//通過 create()方法來使用 MonoSink 來創建 Mono。 Mono.create(sink -> sink.success("業余草")).subscribe(System.out::println); //通過fromRunnable創建,并實現異常處理 Mono.fromRunnable(() -> {System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println);//通過fromCallable創建 Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);//通過fromSupplier創建 Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println);//delay(Duration duration)和 delayMillis(long duration):創建一個 Mono 序列,在指定的延遲時間之后,產生數字 0 作為唯一值。 long start = System.currentTimeMillis(); Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> {System.out.println("生產數據源:"+ n);System.out.println("當前線程ID:"+ Thread.currentThread().getId() + ",生產到消費耗時:"+ (System.currentTimeMillis() - start)); }); System.out.println("主線程"+ Thread.currentThread().getId() + "耗時:"+ (System.currentTimeMillis() - start)); while(!disposable.isDisposed()) { }?
總結
以上是生活随笔為你收集整理的反应式框架Reactor中的Mono和Flux的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: js实现批量下载文件
- 下一篇: 卸载精灵 v4.2 是什么