webflux系列--reactor源码(二)
生活随笔
收集整理的這篇文章主要介紹了
webflux系列--reactor源码(二)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
操作符(Operator)
合并多個Flux
combineLatest
concat
concatMap
merge
repeat
cache
行為(behavior)
聚合操作
collect
reduce
distinct
group by
scan
其他
錯誤處理
Backpressure(背壓)
event
retry
using
dematerialize,materialize
發布和訂閱(hot流和cold流)
subscribe
//subscribe(consumer,...) 最終調用public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,completeConsumer,null,initialContext));}public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;}public final void subscribe(Subscriber<? super T> actual) {CorePublisher publisher = Operators.onLastAssembly(this);CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);try {//如果Publisher是OptimizableOperator,則轉換遞歸調用為loop調用。if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}} public final Flux<T> subscriberContext(Context mergeContext) {return subscriberContext(c -> c.putAll(mergeContext));}public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) {return new FluxContextStart<>(this, doOnContext);}將已有Cold流轉變為Hot流
使用Processor構造Hot流
replay
基于時間的操作
timeout
超過設置的時間沒有emit 元素,則拋出異常。
delay
defer
元素相關Operator
filter
public final Flux<T> filter(Predicate<? super T> p) {if (this instanceof Fuseable) {return onAssembly(new FluxFilterFuseable<>(this, p));}return onAssembly(new FluxFilter<>(this, p)); } final class FluxFilter<T> extends InternalFluxOperator<T, T> {final Predicate<? super T> predicate;static final class FilterSubscriber<T>implements InnerOperator<T, T>,Fuseable.ConditionalSubscriber<T> {final CoreSubscriber<? super T> actual;final Context ctx;final Predicate<? super T> predicate;Subscription s;boolean done; ... ... ... @Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, this.ctx);return;}boolean b;try {b = predicate.test(t);}catch (Throwable e) {Throwable e_ = Operators.onNextError(t, e, this.ctx, s);if (e_ != null) {onError(e_);}else {s.request(1);}Operators.onDiscard(t, this.ctx);return;}if (b) {//發射出 元素。actual.onNext(t);}else {//觸發 onDiscardOperators.onDiscard(t, this.ctx);//請求下一個。s.request(1);}}} }filterWhen與filter過程類似,不過將發射這一步修改為放入buffer中,直到流結束將整個buffer返回。
//把upstream的值通過一個Publisher把 value 映射為一個true或false,僅Publisher第一個發射的值被考慮,如果Publisher發射的值為empty,則 輸入值不會映射成任何值。 class FluxFilterWhen<T> extends InternalFluxOperator<T, T> {final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;final int bufferSize;@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxFilterWhenSubscriber<>(actual, asyncPredicate, bufferSize);}static final class FluxFilterWhenSubscriber<T> implements InnerOperator<T, T> {final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;//緩存大小final int bufferSize;//緩存數組,數組大小會被初始化:大于bufferSize的2個指數值的最小值。通過hash值計算下標 final AtomicReferenceArray<T> toFilter;final CoreSubscriber<? super T> actual;final Context ctx;int consumed;//消費者下標long consumerIndex;long emitted;Boolean innerResult;//生產者下標long producerIndex;Subscription upstream;volatile boolean cancelled;volatile FilterWhenInner current;volatile boolean done;volatile Throwable error;volatile long requested;volatile int state;volatile int wip;//條件常量。代表取消。static final FilterWhenInner INNER_CANCELLED = new FilterWhenInner(null, false);@Overridepublic void request(long n) {if (Operators.validate(n)) {Operators.addCap(REQUESTED, this, n);drain();}}@Overridepublic void onNext(T t) {long pi = producerIndex;int m = toFilter.length() - 1;//計算下標int offset = (int)pi & m;//把元素加入緩存。toFilter.lazySet(offset, t);producerIndex = pi + 1;drain();} @Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(upstream, s)) {upstream = s;actual.onSubscribe(this);s.request(bufferSize);}} void drain() {if (WIP.getAndIncrement(this) != 0) {return;}int missed = 1;int limit = Operators.unboundedOrLimit(bufferSize);long e = emitted;long ci = consumerIndex;int f = consumed;int m = toFilter.length() - 1;Subscriber<? super T> a = actual;for (;;) {long r = requested;//發射的數量與requested數量不一樣,則一直發送。while (e != r) {if (cancelled) {clear();return;}boolean d = done;//下標int offset = (int)ci & m;T t = toFilter.get(offset);boolean empty = t == null;//如果取消并且未獲取到數據,則中斷。if (d && empty) {Throwable ex = Exceptions.terminate(ERROR, this);if (ex == null) {a.onComplete();} else {a.onError(ex);}return;}//未取消,但是獲取到null,退出。if (empty) {break;}int s = state;if (s == STATE_FRESH) {Publisher<Boolean> p;try {//獲取映射到的Publisherp = Objects.requireNonNull(asyncPredicate.apply(t), "The asyncPredicate returned a null value");} catch (Throwable ex) {Exceptions.throwIfFatal(ex);Exceptions.addThrowable(ERROR, this, ex);p = null; //discarded as "old" below}//有Publisher,if (p != null) {//if (p instanceof Callable) {Boolean u;try {u = ((Callable<Boolean>)p).call();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);Exceptions.addThrowable(ERROR, this, ex);u = null; //triggers discard below}//返回true,則發射元素元素t。if (u != null && u) {a.onNext(t);e++;}//false,則取消元素t。else {Operators.onDiscard(t, ctx);}} else {//Publisher不是callable,FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));if (CURRENT.compareAndSet(this,null, inner)) {state = STATE_RUNNING;p.subscribe(inner);break;}}}T old = toFilter.getAndSet(offset, null);Operators.onDiscard(old, ctx);ci++;if (++f == limit) {f = 0;upstream.request(limit);}} elseif (s == STATE_RESULT) {Boolean u = innerResult;innerResult = null;if (u != null && u) {a.onNext(t);e++;}else {Operators.onDiscard(t, ctx);}toFilter.lazySet(offset, null);ci++;if (++f == limit) {f = 0;upstream.request(limit);}state = STATE_FRESH;} else {break;}}//發射的數量與requested數量一樣。if (e == r) {if (cancelled) {clear();return;}boolean d = done;int offset = (int)ci & m;T t = toFilter.get(offset);boolean empty = t == null;if (d && empty) {Throwable ex = Exceptions.terminate(ERROR, this);if (ex == null) {a.onComplete();} else {a.onError(ex);}return;}}int w = wip;if (missed == w) {consumed = f;consumerIndex = ci;emitted = e;missed = WIP.addAndGet(this, -missed);if (missed == 0) {break;}} else {missed = w;}}}}static final class FilterWhenInner implements InnerConsumer<Boolean> {final FluxFilterWhenSubscriber<?> parent;final boolean cancelOnNext;//是否已完成。boolean done;volatile Subscription sub;static final AtomicReferenceFieldUpdater<FilterWhenInner, Subscription> SUB =AtomicReferenceFieldUpdater.newUpdater(FilterWhenInner.class, Subscription.class, "sub"); ... ... ... @Overridepublic void onSubscribe(Subscription s) {//設置sub,并且請求所有元素。if (Operators.setOnce(SUB, this, s)) {s.request(Long.MAX_VALUE);}}@Overridepublic void onNext(Boolean t) {if (!done) {if (cancelOnNext) {sub.cancel();}done = true;parent.innerResult(t);}}@Overridepublic void onError(Throwable t) {if (!done) {done = true;parent.innerError(t);} else {Operators.onErrorDropped(t, parent.currentContext());}}@Overridepublic void onComplete() {if (!done) {done = true;parent.innerComplete();}}void cancel() {Operators.terminate(SUB, this);}}take
final class FluxTake<T> extends InternalFluxOperator<T, T> {@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}long r = remaining;if (r == 0) {s.cancel();onComplete();return;}remaining = --r;boolean stop = r == 0L;actual.onNext(t);if (stop) {s.cancel();onComplete();}}}skip
final class FluxSkip<T> extends InternalFluxOperator<T, T> { static final class SkipSubscriber<T> } }sample
single
element
插入元素
switch
忽略元素值
then
sort
map
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {if (this instanceof Fuseable) {return onAssembly(new FluxMapFuseable<>(this, mapper));}return onAssembly(new FluxMap<>(this, mapper));} final class FluxMap<T, R> extends InternalFluxOperator<T, R> {final Function<? super T, ? extends R> mapper;public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof Fuseable.ConditionalSubscriber) {Fuseable.ConditionalSubscriber<? super R> cs =(Fuseable.ConditionalSubscriber<? super R>) actual;return new MapConditionalSubscriber<>(cs, mapper);}//包裝Subscriber為MapSubscriberreturn new MapSubscriber<>(actual, mapper);} static final class MapSubscriber<T, R>implements InnerOperator<T, R> {final CoreSubscriber<? super R> actual;final Function<? super T, ? extends R> mapper;boolean done;//原始 SubscriptionSubscription s; ... ...@Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}R v;try {//應用mapperv = Objects.requireNonNull(mapper.apply(t),"The mapper returned a null value.");}catch (Throwable e) {//對異常進行處理。Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);if (e_ != null) {onError(e_);}else {s.request(1);}return;}//把mapper處理之后的值 發射出去。actual.onNext(v);}@Overridepublic void onError(Throwable t) {if (done) {Operators.onErrorDropped(t, actual.currentContext());return;}done = true;actual.onError(t);}@Overridepublic void onComplete() {if (done) {return;}done = true;actual.onComplete();} ... ...@Overridepublic void request(long n) {//s.request(n);}}}expand
雜項
拆分
window
buffer
回到同步操作
to
調試類Operator
log
elapsed
timestamp
checkpoint
總結
以上是生活随笔為你收集整理的webflux系列--reactor源码(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: webflux系列--reactor源码
- 下一篇: InfluxDB基本使用说明