Mono 的执行流程
目錄
- 前言
- 一、示例
- 二、流程
- 1、構建數據發布者
- 2、構建數據訂閱者
- 3、建立訂閱關系
- 4、請求數據
- 5、發布數據
- 6、發布完成
前言
本文主要同時簡單的示例來分析一下Mono在發布訂閱過程中的執行流程。
一、示例
@Testpublic void executeProcessTest() {Mono.just("hello mono").filter(v -> v != null).map(v -> v + " map").defaultIfEmpty("default value").subscribe(System.out::println);}二、流程
1、構建數據發布者
(1)Mono.just(“hello mono”)
返回 MonoJust,包裝值
public static <T> Mono<T> just(T data) {return onAssembly(new MonoJust<>(data));}MonoJust(T value) {this.value = Objects.requireNonNull(value, "value");}(2)filter
返回 MonoFilterFuseable ,包裝 MonoJust 和 predicate
public final Mono<T> filter(final Predicate<? super T> tester) {if (this instanceof Fuseable) {return onAssembly(new MonoFilterFuseable<>(this, tester));}return onAssembly(new MonoFilter<>(this, tester));}MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {super(source);this.predicate = Objects.requireNonNull(predicate, "predicate");}(3)map
返回 MonoMapFuseable 包裝 MonoFilterFuseable 和 mapper
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {if (this instanceof Fuseable) {return onAssembly(new MonoMapFuseable<>(this, mapper));}return onAssembly(new MonoMap<>(this, mapper));}MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {super(source);this.mapper = Objects.requireNonNull(mapper, "mapper");}(4)defaultIfEmpty
返回MonoDefaultIfEmpty,包裝 MonoMapFuseable 和 defaultValue
public final Mono<T> defaultIfEmpty(T defaultV) {if (this instanceof Fuseable.ScalarCallable) {try {T v = block();if (v == null) {return Mono.just(defaultV);}}catch (Throwable e) {//leave MonoError returns as this}return this;}return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));}MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {super(source);this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");}數據發布者的發布流程:
數據 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty
2、構建數據訂閱者
從示例中的 subscribe() 開始
(1) subscribe()
傳入 consumer 消費者
public final Disposable subscribe(Consumer<? super T> consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer) {return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,completeConsumer, null, initialContext));}創建 LambdaMonoSubscriber 對象,包裝最終的消費者consumer
(2)subscribeWith()
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;} public final void subscribe(Subscriber<? super T> actual) {//最后一層發布者,這里是 MonoDefaultIfEmptyCorePublisher publisher = Operators.onLastAssembly(this);//最后一層訂閱者,這里是 LambdaMonoSubscriber CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);//發布者與訂閱者建立聯系try {if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}//發布者發布數據給訂閱者publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}(3)發布者與訂閱者建立聯系的過程
核心方法:
subscriber = operator.subscribeOrReturn(subscriber);a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber
返回 DefaultIfEmptySubscriber,作為 LambdaMonoSubscriber 的發布者
@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);}b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber
返回 MapFuseableSubscriber,作為 DefaultIfEmptySubscriber 的發布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof ConditionalSubscriber) {ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);}return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);}c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber
返回 FilterFuseableSubscriber,作為 MapFuseableSubscriber 的發布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {if (actual instanceof ConditionalSubscriber) {return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);}return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);}此時發布者與訂閱者關系:
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer
3、建立訂閱關系
publisher.subscribe(subscriber);此時publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
創建 scalarSubscription ,包裝 FilterFuseableSubscriber
根據發布訂閱關系依次調用訂閱者的 onSubscribe() 建立訂閱關系
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
進入 LambdaMonoSubscriber 的 onSubscribe()
@Overridepublic final void onSubscribe(Subscription s) {if (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {try {subscriptionConsumer.accept(s);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();onError(t);}}else {//請求數據s.request(Long.MAX_VALUE);}}}4、請求數據
通過訂閱關系調用 request() 請求數據,
s.request(Long.MAX_VALUE);即根據下面的關系鏈反向請求數據
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
最終到了 Operators 類中
@Overridepublic void request(long n) {if (validate(n)) {if (ONCE.compareAndSet(this, 0, 1)) {Subscriber<? super T> a = actual;//發布數據a.onNext(value);if(once != 2) {//發布完成a.onComplete();}}}}5、發布數據
從 FilterFuseableSubscriber 開始調用 onNext() 發布數據,根據依次發布給各自的訂閱者,最終數據到了最后一個訂閱者 LambdaMonoSubscriber
LambdaMonoSubscriber.java@Overridepublic final void onNext(T x) {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {Operators.onNextDropped(x, this.initialContext);return;}if (consumer != null) {try {//最終調用 consumer 消費數據consumer.accept(x);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();doError(t);}}if (completeConsumer != null) {try {completeConsumer.run();}catch (Throwable t) {Operators.onErrorDropped(t, this.initialContext);}}}6、發布完成
在數據發布依次到消費者消費后,進入第4步中的 a.onComplete();
依次調用各自的訂閱者調用 onComplete()。
總結
以上是生活随笔為你收集整理的Mono 的执行流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三种嵌入式操作系统(Palm OS 、W
- 下一篇: mysql initialize 什么意