聊聊resilience4j的CircuitBreakerStateMachine
序
本文主要研究一下resilience4j的CircuitBreakerStateMachine
CircuitBreakerStateMachine
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java
/*** A CircuitBreaker finite state machine.*/ public final class CircuitBreakerStateMachine implements CircuitBreaker {private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerStateMachine.class);private final String name;private final AtomicReference<CircuitBreakerState> stateReference;private final CircuitBreakerConfig circuitBreakerConfig;private final CircuitBreakerEventProcessor eventProcessor;/*** Creates a circuitBreaker.** @param name the name of the CircuitBreaker* @param circuitBreakerConfig The CircuitBreaker configuration.*/public CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig) {this.name = name;this.circuitBreakerConfig = circuitBreakerConfig;this.stateReference = new AtomicReference<>(new ClosedState(this));this.eventProcessor = new CircuitBreakerEventProcessor();}/*** Creates a circuitBreaker with default config.** @param name the name of the CircuitBreaker*/public CircuitBreakerStateMachine(String name) {this(name, CircuitBreakerConfig.ofDefaults());}/*** Creates a circuitBreaker.** @param name the name of the CircuitBreaker* @param circuitBreakerConfig The CircuitBreaker configuration supplier.*/public CircuitBreakerStateMachine(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfig) {this(name, circuitBreakerConfig.get());}/*** Requests permission to call this backend.** @return true, if the call is allowed.*/@Overridepublic boolean isCallPermitted() {boolean callPermitted = stateReference.get().isCallPermitted();if (!callPermitted) {publishCallNotPermittedEvent();}return callPermitted;}@Overridepublic void onError(long durationInNanos, Throwable throwable) {if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) {if (LOG.isDebugEnabled()) {LOG.debug(String.format("CircuitBreaker '%s' recorded a failure:", name), throwable);}publishCircuitErrorEvent(name, durationInNanos, throwable);stateReference.get().onError(throwable);} else {publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);}}@Overridepublic void onSuccess(long durationInNanos) {publishSuccessEvent(durationInNanos);stateReference.get().onSuccess();}/*** Get the state of this CircuitBreaker.** @return the the state of this CircuitBreaker*/@Overridepublic State getState() {return this.stateReference.get().getState();}/*** Get the name of this CircuitBreaker.** @return the the name of this CircuitBreaker*/@Overridepublic String getName() {return this.name;}/*** Get the config of this CircuitBreaker.** @return the config of this CircuitBreaker*/@Overridepublic CircuitBreakerConfig getCircuitBreakerConfig() {return circuitBreakerConfig;}@Overridepublic Metrics getMetrics() {return this.stateReference.get().getMetrics();}/*** {@inheritDoc}*/@Overridepublic String toString() {return String.format("CircuitBreaker '%s'", this.name);}@Overridepublic void reset() {CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> new ClosedState(this));if (previousState.getState() != CLOSED) {publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), CLOSED));}publishResetEvent();}private void stateTransition(State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) {CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {if (currentState.getState() == newState) {return currentState;}return newStateGenerator.apply(currentState);});if (previousState.getState() != newState) {publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));}}@Overridepublic void transitionToDisabledState() {stateTransition(DISABLED, currentState -> new DisabledState(this));}@Overridepublic void transitionToForcedOpenState() {stateTransition(FORCED_OPEN, currentState -> new ForcedOpenState(this));}@Overridepublic void transitionToClosedState() {stateTransition(CLOSED, currentState -> new ClosedState(this, currentState.getMetrics()));}@Overridepublic void transitionToOpenState() {stateTransition(OPEN, currentState -> new OpenState(this, currentState.getMetrics()));}@Overridepublic void transitionToHalfOpenState() {stateTransition(HALF_OPEN, currentState -> new HalfOpenState(this));}//...... } 復制代碼- CircuitBreakerStateMachine實現了CircuitBreakerStateMachine接口
- AtomicReference用來記錄當前斷路器的狀態
- 狀態轉換接口內部都調用了stateTransition方法,里頭主要是更新AtomicReference以及發布事件
- stateTransition以及reset方法里頭都調用了StateTransition.transitionBetween來發布事件
- CircuitBreakerEventProcessor用來處理事件
CircuitBreakerState
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java
/*** Abstract state of the CircuitBreaker state machine.*/ abstract class CircuitBreakerState{CircuitBreakerStateMachine stateMachine;CircuitBreakerState(CircuitBreakerStateMachine stateMachine) {this.stateMachine = stateMachine;}abstract boolean isCallPermitted();abstract void onError(Throwable throwable);abstract void onSuccess();abstract CircuitBreaker.State getState();abstract CircuitBreakerMetrics getMetrics();/*** Should the CircuitBreaker in this state publish events* @return a boolean signaling if the events should be published*/boolean shouldPublishEvents(CircuitBreakerEvent event){return event.getEventType().forcePublish || getState().allowPublish;} } 復制代碼- CircuitBreakerState與CircuitBreakerStateMachine二者相互保存各自的引用
- CircuitBreakerState的子類有OpenState、HalfOpenState、ForcedOpenState、ClosedState、DisabledState
- 每個子類的onError、onSuccess方法自己判斷是否進行狀態轉移,如果要轉移會調用CircuitBreakerStateMachine的transitionTo開頭的方法
StateTransition.transitionBetween
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java
/*** State transitions of the CircuitBreaker state machine.*/enum StateTransition {CLOSED_TO_OPEN(State.CLOSED, State.OPEN),CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),OPEN_TO_CLOSED(State.OPEN, State.CLOSED),OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),OPEN_TO_DISABLED(State.OPEN, State.DISABLED),OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),DISABLED_TO_OPEN(State.DISABLED, State.OPEN),DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);private final State fromState;private final State toState;private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP =Arrays.stream(StateTransition.values()).collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));private boolean matches(State fromState, State toState) {return this.fromState == fromState && this.toState == toState;}public static StateTransition transitionBetween(State fromState, State toState){final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));if(stateTransition == null) {throw new IllegalStateException(String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));}return stateTransition;}StateTransition(State fromState, State toState) {this.fromState = fromState;this.toState = toState;}public State getFromState() {return fromState;}public State getToState() {return toState;}@Overridepublic String toString(){return String.format("State transition from %s to %s", fromState, toState);}} 復制代碼- StateTransition定義了一系列狀態轉換的路徑
- StateTransition.transitionBetween方法會對這些狀態轉換進行校驗,不合法的拋出IllegalStateException
CircuitBreakerEventProcessor
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {@Overridepublic EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);return this;}@Overridepublic EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);return this;}@Overridepublic EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);return this;}@Overridepublic EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);return this;}@Overridepublic EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);return this;}@Overridepublic EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);return this;}@Overridepublic void consumeEvent(CircuitBreakerEvent event) {super.processEvent(event);}} 復制代碼- CircuitBreakerEventProcessor繼承了EventProcessor,處理CircuitBreakerEvent事件
- CircuitBreakerEventProcessor也實現了EventConsumer以及EventPublisher接口
- onSuccess、onError、onStateTransition、onReset、onIgnoredError、onCallNotPermitted里頭調用了registerConsumer方法
- consumeEvent里頭則是調用了processEvent方法
EventProcessor
resilience4j-core-0.13.0-sources.jar!/io/github/resilience4j/core/EventProcessor.java
public class EventProcessor<T> implements EventPublisher<T> {protected volatile boolean consumerRegistered;private volatile EventConsumer<T> onEventConsumer;private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();public boolean hasConsumers(){return consumerRegistered;}@SuppressWarnings("unchecked")public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){consumerRegistered = true;eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);}@SuppressWarnings("unchecked")public <E extends T> boolean processEvent(E event) {boolean consumed = false;if(onEventConsumer != null){onEventConsumer.consumeEvent(event);consumed = true;}if(!eventConsumers.isEmpty()){EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());if(eventConsumer != null){eventConsumer.consumeEvent(event);consumed = true;}}return consumed;}@Overridepublic void onEvent(EventConsumer<T> onEventConsumer) {consumerRegistered = true;this.onEventConsumer = onEventConsumer;} } 復制代碼- registerConsumer方法主要是往eventConsumers設置事件類型的消費者
- processEvent方法主要是查找相應的事件消費者去處理事件
EventPublisher
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java
/*** An EventPublisher can be used to register event consumers.*/interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);} 復制代碼- 這個EventPublisher接口繼承了io.github.resilience4j.core.EventPublisher,邏輯上有點混亂
- 這些on方法分別處理了CircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent事件
CircuitBreakerEvent
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/CircuitBreakerEvent.java
/*** An event which is created by a CircuitBreaker.*/ public interface CircuitBreakerEvent {/*** Returns the name of the CircuitBreaker which has created the event.** @return the name of the CircuitBreaker which has created the event*/String getCircuitBreakerName();/*** Returns the type of the CircuitBreaker event.** @return the type of the CircuitBreaker event*/Type getEventType();/*** Returns the creation time of CircuitBreaker event.** @return the creation time of CircuitBreaker event*/ZonedDateTime getCreationTime();/*** Event types which are created by a CircuitBreaker.*/enum Type {/** A CircuitBreakerEvent which informs that an error has been recorded */ERROR(false),/** A CircuitBreakerEvent which informs that an error has been ignored */IGNORED_ERROR(false),/** A CircuitBreakerEvent which informs that a success has been recorded */SUCCESS(false),/** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */NOT_PERMITTED(false),/** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */STATE_TRANSITION(true),/** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */RESET(true),/** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */FORCED_OPEN(false),/** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */DISABLED(false);public final boolean forcePublish;Type(boolean forcePublish) {this.forcePublish = forcePublish;}} } 復制代碼- CircuitBreakerEvent接口定義了事件的Type枚舉
- 規范了getCircuitBreakerName、getEventType、getCreationTime方法
AbstractCircuitBreakerEvent
resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/event/AbstractCircuitBreakerEvent.java
abstract class AbstractCircuitBreakerEvent implements CircuitBreakerEvent {private final String circuitBreakerName;private final ZonedDateTime creationTime;AbstractCircuitBreakerEvent(String circuitBreakerName) {this.circuitBreakerName = circuitBreakerName;this.creationTime = ZonedDateTime.now();}@Overridepublic String getCircuitBreakerName() {return circuitBreakerName;}@Overridepublic ZonedDateTime getCreationTime() {return creationTime;} } 復制代碼- 定義了circuitBreakerName、creationTime屬性
- 重寫了getCircuitBreakerName、getCreationTime方法
- ircuitBreakerOnSuccessEvent、CircuitBreakerOnErrorEvent、CircuitBreakerOnStateTransitionEvent、CircuitBreakerOnResetEvent、CircuitBreakerOnIgnoredErrorEvent、CircuitBreakerOnCallNotPermittedEvent都是從AbstractCircuitBreakerEvent繼承而來,個別的自定義了自己的屬性,主要是重寫getEventType以及toString方法
小結
- CircuitBreakerStateMachine里頭維護了一個AtomicReference引用,對應的onError及onSuccess方法都委托給改引用對應的狀態的onError以及onSuccess方法
- 子類的onError以及onSuccess方法方法則自行判斷是否需要進行狀態切換以及切換到什么狀態,自己調用CircuitBreakerStateMachine的transitionTo開頭的方法,來改變AtomicReference的值(借助子類的各自實現來化解狀態轉換的復雜邏輯),同時發布一些事件。
doc
- Resilience4j is a fault tolerance library designed for Java8 and functional programming
總結
以上是生活随笔為你收集整理的聊聊resilience4j的CircuitBreakerStateMachine的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OSChina 周四乱弹 —— 画种稻画
- 下一篇: SpringBoot—— @Compon