Reactor:深入理解reactor core
文章目錄
- 簡介
- 自定義Subscriber
- Backpressure處理
- 創(chuàng)建Flux
- 使用generate
- 使用create
- 使用push
- 使用Handle
簡介
上篇文章我們簡單的介紹了Reactor的發(fā)展史和基本的Flux和Mono的使用,本文將會進(jìn)一步挖掘Reactor的高級用法,一起來看看吧。
自定義Subscriber
之前的文章我們提到了4個Flux的subscribe的方法:
Disposable subscribe(); Disposable subscribe(Consumer<? super T> consumer); Disposable subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer); Disposable subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer); Disposable subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer);這四個方法,需要我們使用lambda表達(dá)式來自定義consumer,errorConsumer,completeSonsumer和subscriptionConsumer這四個Consumer。
寫起來比較復(fù)雜,看起來也不太方便,我們考慮一下,這四個Consumer是不是和Subscriber接口中定義的4個方法是一一對應(yīng)的呢?
public static interface Subscriber<T> {public void onSubscribe(Subscription subscription);public void onNext(T item);public void onError(Throwable throwable);public void onComplete();}對的,所以我們有一個更加簡單點(diǎn)的subscribe方法:
public final void subscribe(Subscriber<? super T> actual)這個subscribe方法直接接收一個Subscriber類。從而實(shí)現(xiàn)了所有的功能。
自己寫Subscriber太麻煩了,Reactor為我們提供了一個BaseSubscriber的類,它實(shí)現(xiàn)了Subscriber中的所有功能,還附帶了一些其他的方法。
我們看下BaseSubscriber的定義:
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,Disposable注意,BaseSubscriber是單次使用的,這就意味著,如果它首先subscription到Publisher1,然后subscription到Publisher2,那么將會取消對第一個Publisher的訂閱。
因?yàn)锽aseSubscriber是一個抽象類,所以我們需要繼承它,并且重寫我們需要自己實(shí)現(xiàn)的方法。
下面看一個自定義的Subscriber:
public class CustSubscriber<T> extends BaseSubscriber<T> {public void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribed");request(1);}public void hookOnNext(T value) {System.out.println(value);request(1);} }BaseSubscriber中有很多以hook開頭的方法,這些方法都是我們可以重寫的,而Subscriber原生定義的on開頭的方法,在BaseSubscriber中都是final的,都是不能重寫的。
我們看一個定義:
@Overridepublic final void onSubscribe(Subscription s) {if (Operators.setOnce(S, this, s)) {try {hookOnSubscribe(s);}catch (Throwable throwable) {onError(Operators.onOperatorError(s, throwable, currentContext()));}}}可以看到,它內(nèi)部實(shí)際上調(diào)用了hook的方法。
上面的CustSubscriber中,我們重寫了兩個方法,一個是hookOnSubscribe,在建立訂閱的時候調(diào)用,一個是hookOnNext,在收到onNext信號的時候調(diào)用。
在這些方法中,給了我們足夠的自定義空間,上面的例子中我們調(diào)用了request(1),表示再請求一個元素。
其他的hook方法還有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。
Backpressure處理
我們之前講過了,reactive stream的最大特征就是可以處理Backpressure。
什么是Backpressure呢?就是當(dāng)consumer處理過不來的時候,可以通知producer來減少生產(chǎn)速度。
我們看下BaseSubscriber中默認(rèn)的hookOnSubscribe實(shí)現(xiàn):
protected void hookOnSubscribe(Subscription subscription){subscription.request(Long.MAX_VALUE);}可以看到默認(rèn)是request無限數(shù)目的值。 也就是說默認(rèn)情況下沒有Backpressure。
通過重寫hookOnSubscribe方法,我們可以自定義處理速度。
除了request之外,我們還可以在publisher中限制subscriber的速度。
public final Flux<T> limitRate(int prefetchRate) {return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));}在Flux中,我們有一個limitRate方法,可以設(shè)定publisher的速度。
比如subscriber request(100),然后我們設(shè)置limitRate(10),那么最多producer一次只會產(chǎn)生10個元素。
創(chuàng)建Flux
接下來,我們要講解一下怎么創(chuàng)建Flux,通常來講有4種方法來創(chuàng)建Flux。
使用generate
第一種方法就是最簡單的同步創(chuàng)建的generate.
先看一個例子:
public void useGenerate(){Flux<String> flux = Flux.generate(() -> 0,(state, sink) -> {sink.next("3 x " + state + " = " + 3*state);if (state == 10) sink.complete();return state + 1;});flux.subscribe(System.out::println);}輸出結(jié)果:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30上面的例子中,我們使用generate方法來同步的生成元素。
generate接收兩個參數(shù):
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)第一個參數(shù)是stateSupplier,用來指定初始化的狀態(tài)。
第二個參數(shù)是一個generator,用來消費(fèi)SynchronousSink,并生成新的狀態(tài)。
上面的例子中,我們每次將state+1,一直加到10。
然后使用subscribe來將所有的生成元素輸出。
使用create
Flux也提供了一個create方法來創(chuàng)建Flux,create可以是同步也可以是異步的,并且支持多線程操作。
因?yàn)閏reate沒有初始的state狀態(tài),所以可以用在多線程中。
create的一個非常有用的地方就是可以將第三方的異步API和Flux關(guān)聯(lián)起來,舉個例子,我們有一個自定義的EventProcessor,當(dāng)處理相應(yīng)的事件的時候,會去調(diào)用注冊到Processor中的listener的一些方法。
interface MyEventListener<T> {void onDataChunk(List<T> chunk);void processComplete();}我們怎么把這個Listener的響應(yīng)行為和Flux關(guān)聯(lián)起來呢?
public void useCreate(){EventProcessor myEventProcessor = new EventProcessor();Flux<String> bridge = Flux.create(sink -> {myEventProcessor.register(new MyEventListener<String>() {public void onDataChunk(List<String> chunk) {for(String s : chunk) {sink.next(s);}}public void processComplete() {sink.complete();}});});}使用create就夠了,create接收一個consumer參數(shù):
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)這個consumer的本質(zhì)是去消費(fèi)FluxSink對象。
上面的例子在MyEventListener的事件中對FluxSink對象進(jìn)行消費(fèi)。
使用push
push和create一樣,也支持異步操作,但是同時只能有一個線程來調(diào)用next, complete 或者 error方法,所以它是單線程的。
使用Handle
Handle和上面的三個方法不同,它是一個實(shí)例方法。
它和generate很類似,也是消費(fèi)SynchronousSink對象。
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);不同的是它的參數(shù)是一個BiConsumer,是沒有返回值的。
看一個使用的例子:
public void useHandle(){Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20).handle((i, sink) -> {String letter = alphabet(i);if (letter != null)sink.next(letter);});alphabet.subscribe(System.out::println);}public String alphabet(int letterNumber) {if (letterNumber < 1 || letterNumber > 26) {return null;}int letterIndexAscii = 'A' + letterNumber - 1;return "" + (char) letterIndexAscii;}本文的例子learn-reactive
本文作者:flydean程序那些事
本文鏈接:http://www.flydean.com/reactor-core-in-depth/
本文來源:flydean的博客
歡迎關(guān)注我的公眾號:「程序那些事」最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發(fā)現(xiàn)!
總結(jié)
以上是生活随笔為你收集整理的Reactor:深入理解reactor core的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 响应式编程简介之:Reactor
- 下一篇: 快乐学算法之:字典树Trie