javascript
你的响应阻塞了没有?--Spring-WebFlux源码分析
1. Spring WebFlux是什么?
Spring WebFlux是Spring Framework 5.0中引入的新的反應式Web框架。 與Spring MVC不同,它不需要Servlet API,完全異步和非阻塞, 并通過Reactor項目實現Reactive Streams規范。 并且可以在諸如Netty,Undertow和Servlet 3.1+容器的服務器上運行。
Reactor 也是 Spring 5 中反應式編程的基礎,它一個新的反應式編程庫。
2. Reactor是什么?
Reactor offers non-blocking and backpressure-ready network runtimes including local TCP/HTTP/UDP client & servers based on the robust Netty?framework.
Reactor提供了一個非阻塞的,高并發的基于健壯的Netty框架的網絡運行API,包括本地tcp/http/udp 客戶端和服務端。
重要的兩個概念
Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉換。對一個 Flux 序列進行計數操作,得到的結果是一個 Mono<Long>對象。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。
簡單說Mono返回單個元素,Flux返回多個元素
3. spring webflux處理請求流程
核心控制器DispatcherHandler,等同于阻塞方式的DispatcherServlet
/*** Central dispatcher for HTTP request handlers/controllers. Dispatches to* registered handlers for processing a request, providing convenient mapping* facilities.** <p>{@code DispatcherHandler} discovers the delegate components it needs from* Spring configuration. It detects the following in the application context:* <ul>* <li>{@link HandlerMapping} -- map requests to handler objects* <li>{@link HandlerAdapter} -- for using any handler interface* <li>{@link HandlerResultHandler} -- process handler return values* </ul>** <p>{@code DispatcherHandler} is also designed to be a Spring bean itself and* implements {@link ApplicationContextAware} for access to the context it runs* in. If {@code DispatcherHandler} is declared with the bean name "webHandler"* it is discovered by {@link WebHttpHandlerBuilder#applicationContext} which* creates a processing chain together with {@code WebFilter},* {@code WebExceptionHandler} and others.** <p>A {@code DispatcherHandler} bean declaration is included in* {@link org.springframework.web.reactive.config.EnableWebFlux @EnableWebFlux}* configuration.** @author Rossen Stoyanchev* @author Sebastien Deleuze* @author Juergen Hoeller* @since 5.0* @see WebHttpHandlerBuilder#applicationContext(ApplicationContext)*/3.1 初始化
獲取HandlerMapping,HandlerAdapter,HandlerResultHandler的所有實例
?
protected void initStrategies(ApplicationContext context) { Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerMapping.class, true, false); //1 ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());AnnotationAwareOrderComparator.sort(mappings);this.handlerMappings = Collections.unmodifiableList(mappings); Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerAdapter.class, true, false); //2 this.handlerAdapters = new ArrayList<>(adapterBeans.values());AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerResultHandler.class, true, false); //3this.resultHandlers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(this.resultHandlers);}其中,1.獲取所有HandlerMapping實例
2.獲取所有HandlerAdapter實例
3.獲取所有HandlerResultHandler實例
3.2 流式處理請求
public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange))//1 .next().switchIfEmpty(createNotFoundError())//2 .flatMap(handler -> invokeHandler(exchange, handler))//3 .flatMap(result -> handleResult(exchange, result));//4}其中,第一步,從handlerMapping這個map中獲取HandlerMapping
第二步,觸發HandlerApter的handle方法
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {if (this.handlerAdapters != null) {for (HandlerAdapter handlerAdapter : this.handlerAdapters) {if (handlerAdapter.supports(handler)) {return handlerAdapter.handle(exchange, handler);}}}return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));}第三步,觸發HandlerResultHandler?的handleResult方法
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {return getResultHandler(result).handleResult(exchange, result).onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));}private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {if (this.resultHandlers != null) {for (HandlerResultHandler resultHandler : this.resultHandlers) {if (resultHandler.supports(handlerResult)) {return resultHandler;}}}throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());}4.HandlerMapping實現
5.HanlderAdapter的實現
6.HandlerResultHandler的實現
?7.不同容器的實現
?7.1 Reactor實現ReactorHttpHandlerAdapter
執行apply方法
@Overridepublic Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());try {ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);if (request.getMethod() == HttpMethod.HEAD) {response = new HttpHeadResponseDecorator(response);}return this.httpHandler.handle(request, response).doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())).doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));}catch (URISyntaxException ex) {if (logger.isDebugEnabled()) {logger.debug("Failed to get request URI: " + ex.getMessage());}reactorResponse.status(HttpResponseStatus.BAD_REQUEST);return Mono.empty();}}其中,HttpHandler的定義
*** Lowest level contract for reactive HTTP request handling that serves as a* common denominator across different runtimes.** <p>Higher-level, but still generic, building blocks for applications such as* {@code WebFilter}, {@code WebSession}, {@code ServerWebExchange}, and others* are available in the {@code org.springframework.web.server} package.** <p>Application level programming models such as annotated controllers and* functional handlers are available in the {@code spring-webflux} module.** <p>Typically an {@link HttpHandler} represents an entire application with* higher-level programming models bridged via* {@link org.springframework.web.server.adapter.WebHttpHandlerBuilder}.* Multiple applications at unique context paths can be plugged in with the* help of the {@link ContextPathCompositeHandler}.** @author Arjen Poutsma* @author Rossen Stoyanchev* @since 5.0* @see ContextPathCompositeHandler*/具體的實現類為:ContextPathCompositeHandler
/*** {@code HttpHandler} delegating requests to one of several {@code HttpHandler}'s* based on simple, prefix-based mappings.** <p>This is intended as a coarse-grained mechanism for delegating requests to* one of several applications -- each represented by an {@code HttpHandler}, with* the application "context path" (the prefix-based mapping) exposed via* {@link ServerHttpRequest#getPath()}.** @author Rossen Stoyanchev* @since 5.0*/@Overridepublic Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {// Remove underlying context path first (e.g. Servlet container)String path = request.getPath().pathWithinApplication().value();return this.handlerMap.entrySet().stream().filter(entry -> path.startsWith(entry.getKey())).findFirst().map(entry -> {String contextPath = request.getPath().contextPath().value() + entry.getKey();ServerHttpRequest newRequest = request.mutate().contextPath(contextPath).build();return entry.getValue().handle(newRequest, response);}).orElseGet(() -> {response.setStatusCode(HttpStatus.NOT_FOUND);return response.setComplete();});}基于前綴的映射Handler
?7.2 Jetty實現JettyHttpHandlerAdapter
繼承自ServletHttpHandlerAdapter 實現了Servlet,執行service方法
@Overridepublic void service(ServletRequest request, ServletResponse response) throws ServletException, IOException {// Check for existing error attribute firstif (DispatcherType.ASYNC.equals(request.getDispatcherType())) {Throwable ex = (Throwable) request.getAttribute(WRITE_ERROR_ATTRIBUTE_NAME);throw new ServletException("Failed to create response content", ex);}// Start async before Read/WriteListener registrationAsyncContext asyncContext = request.startAsync();asyncContext.setTimeout(-1);ServletServerHttpRequest httpRequest;try {httpRequest = createRequest(((HttpServletRequest) request), asyncContext);//1}catch (URISyntaxException ex) {if (logger.isDebugEnabled()) {logger.debug("Failed to get request URL: " + ex.getMessage());}((HttpServletResponse) response).setStatus(400);asyncContext.complete();return;} ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest);//2if (httpRequest.getMethod() == HttpMethod.HEAD) {httpResponse = new HttpHeadResponseDecorator(httpResponse);}AtomicBoolean isCompleted = new AtomicBoolean();HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest);asyncContext.addListener(listener);HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest);this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);//3}其中,1.創建request
2.創建response
? ? ? ? ? ? 3.handler執行的結果進行subscribe
JettyHttpHandlerAdapter是ServletHttpHandlerAdapter 的擴展,重寫了創建request? 創建response方法
?7.3 Tomcat實現TomcatHttpHandlerAdapter
TomcatHttpHandlerAdapter是ServletHttpHandlerAdapter 的擴展,重寫了創建request? 創建response方法
?7.4?AbstractReactiveWebInitializer抽象類
繼承自AbstractReactiveWebInitializer的類可以在servlet容器中安裝一個Spring Reactive Web Application。
@Overridepublic void onStartup(ServletContext servletContext) throws ServletException {String servletName = getServletName();Assert.hasLength(servletName, "getServletName() must not return null or empty");ApplicationContext applicationContext = createApplicationContext();Assert.notNull(applicationContext, "createApplicationContext() must not return null");refreshApplicationContext(applicationContext);registerCloseListener(servletContext, applicationContext); HttpHandler httpHandler = WebHttpHandlerBuilder.applicationContext(applicationContext).build();ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler);ServletRegistration.Dynamic registration = servletContext.addServlet(servletName, servlet);if (registration == null) {throw new IllegalStateException("Failed to register servlet with name '" + servletName + "'. " +"Check if there is another servlet registered under the same name.");}registration.setLoadOnStartup(1);registration.addMapping(getServletMapping());registration.setAsyncSupported(true);}它通過將ServletHttpHandlerAdapter實例作為一個servlet安裝到servler容器中。
8.總結
? DispatcherHandler的流程是
1.通過 HandlerMapping(和DispathcherServlet中的HandlerMapping不同)獲取到HandlerAdapter放到ServerWebExchange的屬性中
2.獲取到HandlerAdapter后觸發handle方法,得到HandlerResult
3.通過HandlerResult,觸發handleResult,針對不同的返回類找到不同的HandlerResultHandler如
視圖渲染ViewResolutionResultHandler,
ServerResponseResultHandler, ResponseBodyResultHandler, ResponseEntityResultHandler? 不同容器有不同的實現,如Reactor,Jetty,Tomcat等。
參考文獻:
【1】https://blog.csdn.net/qq_15144655/article/details/80708915
【2】https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
轉載于:https://www.cnblogs.com/davidwang456/p/10396168.html
總結
以上是生活随笔為你收集整理的你的响应阻塞了没有?--Spring-WebFlux源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Golang中的自动伸缩和自防御设计
- 下一篇: Lucene打分规则与Similarit