Reactive 响应式编程简单使用
Reactive Stream 模型
了解reactive stream(Flow類)
在介紹java版本的reactive stream之前,我們先回顧一下reactive stream需要做哪些事情:
1.能夠處理無效數量的消息
2.消息處理是有順序的
3.可以異步的在組件之間傳遞消息
4.一定是非阻塞和backpressure的
為了實現這4個功能,reactive stream定義了4個接口,Publisher,Subscriber,Subscription,Processor。這四個接口實際上是一個觀察者模式的實現。接下來我們詳細來分析一下各個接口的作用和約定。
Publisher
先看下Publisher的定義:
public interface Publisher<T> {public void subscribe(Subscriber<? super T> s); }Publisher就是用來生成消息的。它定義了一個subscribe方法,傳入一個Subscriber。這個方法用來將Publisher和Subscriber進行連接。
一個Publisher可以連接多個Subscriber。
每次調用subscribe建立連接,都會創建一個新的Subscription,Subscription和subscriber是一一對應的。
一個Subscriber只能夠subscribe一次Publisher。
如果subscribe失敗或者被拒絕,則會出發Subscriber.onError(Throwable)方法。
Subscriber
先看下Subscriber的定義:
public interface Subscriber<T> {// 只會調用一次,用于建立發布者和訂閱者之間的關系public void onSubscribe(Subscription s);public void onNext(T t);// 發生錯誤時就會調用,極大概率會終止程序public void onError(Throwable t);// 只會調用一次,當發布者調用 close()public void onComplete(); }Subscriber就是消息的接收者。
在Publisher和Subscriber建立連接的時候會觸發onSubscribe(Subscription s)方法。
當調用Subscription.request(long)方法時,onNext(T t)會被觸發,根據request請求參數的大小,onNext會被觸發一次或者多次。
在發生異常或者結束時會觸發onError(Throwable t)或者onComplete()方法。
Subscription
先看下Subscription的定義:
public interface Subscription {// 第一次 onNext() 就是由這個方法觸發的,array[] 初始值是 32、最大值默認是 256public void request(long n);public void cancel(); }Subscription代表著一對一的Subscriber和Publisher之間的Subscribe關系。
request(long n)意思是向publisher請求多少個events,這會觸發Subscriber.onNext方法。
cancel()則是請求Publisher停止發送信息,并清除資源。
Processor
先看下Processor的定義:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }Processor即是Subscriber又是Publisher,它代表著一種處理狀態。
JDK中reactive stream的實現
在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實現。
Flow從JDK9就開始有了。我們看下它的結構:
從上圖我們可以看到在JDK中Flow是一個final class,而Subscriber,Publisher,Subscription,Processor都是它的內部類。
背壓(Back Pressure)概念
異步系統背壓
在同步系統重發布者和訂閱者的工作效率相當,發布者發一個消息之后阻塞,等待訂閱者消費。訂閱者消費完之后,訂閱者阻塞,等待發布者發布。這種同步方式效率很低。
由于同步方式效率低,一般使用異步處理機制,發布者和消費者速度都是不一樣,那么如何協調工作呢?
有兩種情況:
- 情況一:當訂閱者消費速度比發布者快時,會出現無消息可消費情況
- 情況二:當發布者速度比消費者快時,容易造成消息堆積,有兩大類解決方案:
- 改變訂閱者,適當消息拒絕
- 改變發布者,由訂閱者去控制發布者的速度。這種解決方案稱之為背壓(back pressure)。使用背壓策略可確保較快的發布者不會壓制較慢的訂閱者。
反應式流模型
反應式從2013年開始,作為非阻塞的異步流處理標準。宗旨在解決處理元素流(即消息流、數據流)問題——如何將元素從發布者傳遞到訂閱者,而不需要發布者阻塞,不需要訂閱者有無邊界緩沖區,不需要訂閱者丟棄無法處理的元素。
反應式流模型可以解決這個問題,該模型非常簡單:訂閱者向發布者發送異步請求,訂閱 n 個元素,然后發布者向訂閱者異步發送 n 個或少于 n 個元素。反應式流會在 pull 和 push 模型流處理之間動態切換。當發布者快、訂閱者慢時,它使用 pull 模型,當發布者慢,訂閱者快時,它使用 push 模型。
2015 年發布了用于處理反應式流的規范和 Java API
JDK9 中的 SubmissionPublisher
先看下SubmissionPublisher的定義:
public class SubmissionPublisher<T> implements Publisher<T>,AutoCloseable {Jdk9 中天然實現了 Publisher 接口,異步發送消息
兩種代碼風格編寫
Processor 對應模型代碼一如下:
package main.mytest.reactor.processor;import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher;@SuppressWarnings("all") public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>>>>Processor 開始建立關系,請求只會調用一次 onSubscribe 方法");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println(">>>>Processor 開始接處理數據...item="+item);/** 中間業務處理,然后再通過 submit() 轉發給訂閱者 */this.submit(item.toUpperCase());this.subscription.request(1);// this.subscription.cancel();}/*** 注意這個方法要是重寫了就不執行接收方的代碼邏輯了,超級坑的哦* @param subscriber the subscriber*//*@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {System.out.println(">>>>Processor 開始訂閱 subscribe");this.subscription.request(1);}*/@Overridepublic void onError(Throwable throwable) {System.out.println(">>>>>>Processor 程序出錯,請及時修復...e="+throwable);this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println(">>>>Processor 程序完成,服務關閉");} }然后開始測試:
package main.mytest.reactor.processor;import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher;public class ReactiveProcessorDemo {public static void main(String[] args) throws InterruptedException {/** 創建發布者 */SubmissionPublisher<String> publisher = new SubmissionPublisher();/** 創建 Processor 處理器 */ReactiveProcessor processor = new ReactiveProcessor();/** 建立發布者和 Processor 之間的聯系 */publisher.subscribe(processor);/** 建立真正的訂閱者 */Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("=======>開始建立訂閱關系");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("======>開始接受數據 item="+item);this.subscription.request(10);}@Overridepublic void onError(Throwable throwable) {System.out.println("======>出錯了");this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("======>業務完成");}};/** 建立 Processor 和訂閱者之間的聯系 */processor.subscribe(subscriber);/** 開始提交業務任務 */for (int i = 0; i < 100; i++) {publisher.submit("hello reactive language...");}publisher.close();Thread.currentThread().join();} }第二種寫法如下:
package main.mytest.reactor.reactivestream;import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit;public class ReactiveStreamDemo {public static void main(String[] args) throws InterruptedException {/** 發布者會初始化好 Executor 線程池(ForkJoinPool) */SubmissionPublisher publisher = new SubmissionPublisher<>();Flow.Subscriber<String> subscriber = new Flow.Subscriber() {Flow.Subscription subscription = null;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>>>>onSubscribe 開始訂閱消息,相當于 init 方法只會執行一次");this.subscription = subscription;/** 第一次你需要告訴發布者你需要接收多少數據 */subscription.request(10);}@Overridepublic void onNext(Object item) {/** 開始業務處理 */System.out.println(">>>>>onNext 接收數據:"+item);try {TimeUnit.MICROSECONDS.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}/** 業務處理完成之后還需要告訴發布者你需要接受多少數據 */this.subscription.request(10); // 背壓System.out.println("...");// int a = 10/0;}@Overridepublic void onError(Throwable throwable) {System.out.println(">>>>訂閱出錯,e="+throwable);}@Overridepublic void onComplete() {System.out.println(">>>>>onComplete 完成");}};/** 其實是這里去調用 Subscriber 的方法 */publisher.subscribe(subscriber);for (int i = 0; i < 10; i++) {publisher.submit("同學們,開始準備上課啦..");}/** 完成業務處理 */publisher.close();Thread.currentThread().join();} }總結
reactive stream的出現有效的解決了異步系統中的背壓問題。只不過reactive stream只是一個接口標準或者說是一種協議,具體的實現還需要自己去實現。
以上就是詳解Java中的reactive stream協議的詳細內容,更多關于Java中的reactive stream協議的資料請關注云海天教程其它相關文章!
Reactor 響應式編程
官方文檔:Web on Reactive Stack
可以發現 WebFlux 開發的是放在 Netty Servlet 中的,和傳統的 SpringMVC 開發的不一樣,傳統的直接放在 Servlet 容器中
Reactor 簡介
采用異步非阻塞式方式處理請求,并且是在 reactive-stream 基礎上進一步增強,說白了就是對其進行了一層封裝,衍生出來兩種典型的類:Flux、Mono
- Flux 是表示0-多的情況
- Mono 表示0-1的情況
Mono、Flux 的簡單創建
package com.gwm.webflux.test;import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream;public class FluxDemo {static Map<Integer, String> dataBase = new HashMap<>();public static void main(String[] args) {/** Mono 的創建 */Mono.empty().subscribe(System.out::println);Mono.just("Mono 只能發送一個或者是0個").subscribe(FluxDemo::print);/** Flux 的創建如下 */Flux.just("1", "b", "c")/** 如果配置異步的話可能會導致結果輸出不來,需要讓線程活著,比如 join 操作等...*/.publishOn(Schedulers.newSingle("自定義線程1")).map(v -> "【" + v + "】").subscribe(FluxDemo::print // 等價于 Subscriber onNext(T t) 方法, r -> System.out.println(r) // 等于 Subscriber onError(E e), () -> { // 等于 Subscriber onComplete()System.out.println(Thread.currentThread().getName() + ",完成操作");}, sub -> { // 等于 Subscriber onSubscribe(Subscription sub)/** 2 表示請求多少個元素,也就是背壓 */sub.request(2);});/** 使用匿名內部類創建 Flux */Flux.just("hello","a","b").map(v -> "(" + v + ")").publishOn(Schedulers.newSingle("自定義線程2")).subscribe(new Subscriber<String>() {Subscription subscription = null;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println(Thread.currentThread().getName()+" 執行 onSubscribe 方法");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String s) {System.out.println(Thread.currentThread().getName()+" 執行 onNext 方法,接受到的數據 s="+s);this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread().getName()+" 執行 onError 方法");}@Overridepublic void onComplete() {System.out.println(Thread.currentThread().getName()+" 執行 onComplete 方法");}});Flux.fromIterable(Arrays.asList("1", "b", "c")).subscribe(FluxDemo::print);Flux.fromArray(new String[]{"a", "Bc", "ddd"}).subscribe(FluxDemo::print);Flux.fromStream(Stream.of(1, 2, 3, 4, 4, ",a", "d")).subscribe(FluxDemo::print);Flux.range(1, 3).subscribe(FluxDemo::print);/*** 動態生成 Flux 對象*/Flux.generate(()->1,(val,sink)->{// 將數據放入到管道內sink.next("message #"+val);// 假設到了 10 就開始關閉這個管道if (val == 10)sink.complete();return val + 1;}).subscribe(System.out::println);/*** map 和 flatMap 演示* map 是返回具體實體對象* flatMap 返回的是一個 Mono,相當于把數據又放到了另一個通道里面*/Flux.just("a","b","c").map(r->r.toUpperCase()).subscribe(System.out::println);Flux.just("a","b","c").flatMap(r->Mono.just(r.toUpperCase())).subscribe(System.out::println);/*** 調用 API 控制代碼執行順序,不要直接分成兩個 Mono 或者 Flux 執行* 因為 Flux 和 Mono 極大可能不在同一個線程中執行*/Mono.just(add(100,"諸葛亮")) // 往數據庫里面插入一條數據.then(queryById(100)) // 然后從這個方法中獲取這條數據是否插入成功.subscribe(System.out::println); // 輸出該數據/*** Flux 和 Mono 之間轉換* Flux 轉成 Mono 有很多,一般可以封裝成集合(Map、List等等)*/Mono.just("1000").flux().subscribe(System.out::println);Flux.just("flux1","flux2","flux3").collectList().subscribe(System.out::println);/*** zip 合并多個 Mono 或者 Flux,這個是非常有用的,類比 CompletableFuture.allOf() 方法*/String userName = "司馬懿";Flux<String> just = Flux.just(userName + "的最新訂單");Flux<String> just1 = Flux.just(userName + "的最新評論");Flux<String> just2 = Flux.just(userName + "的訂單明細");Flux.zip(just, just1, just2).subscribe(r-> System.out.println("合并結果輸出:r="+r));}public static Mono<Void> add(Integer id,String value) {dataBase.computeIfAbsent(id,e->value);return Mono.empty();}public static Mono<String> queryById(Integer id) {return Mono.just(dataBase.get(id));}public static void print(Object result) {System.out.printf("執行線程是:[%s],result=%s\n", Thread.currentThread().getName(), result);} }WebFlux
引入依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>注意不要和 spring-boot-starter-web 模塊一起引入,否則會優先啟動 Servlet 容器,反之會驅動 Netty 容器。
Controller 層代碼演示:
@RestController public class HelloController {@RequestMapping("/hello")public Mono<String> hello(String str) {System.out.println(">>>>>hello str="+str);return Mono.just(str.toUpperCase());}@RequestMapping("/hello2")public String hello2(String str) {System.out.println(">>>>>hello2222 str="+str);return str.toUpperCase()+"/hello2";} }發現還是基于 SpringMVC 注解式的開發,下面介紹基于 RouterFunction 函數式編程,是一種全新的編程方式。每一種操作都是基于函數式 API 開發,包括代碼執行順序,不是說誰寫在前面就誰先執行,都是通過調用 API 進行控制。可以類比 CompletableFuture。
RouterFunction 類
主要是利用 RouterFunction 接口的鏈式編程,ServerReponse 不是 ServeltResponse 對象,是 Netty 單獨封裝了一個 ServerResponse 對象,是放在 Netty Container 容器中
@Configuration public class FluxConfiguration {private static final String PATH_PREFIX = "/webflux/";@Beanpublic RouterFunction<ServerResponse> routers(StudentHandler studentHandler) {return RouterFunctions.route().POST(PATH_PREFIX + "create", studentHandler::create).GET(PATH_PREFIX + "deleteById/{id}", studentHandler::deleteById).build();}@Beanpublic RouterFunction<ServerResponse> customerRouter(StudentHandler studentHandler) {System.out.println(">>>>>>>customerRouter");return RouterFunctions.route(RequestPredicates.path("/func/hello"),studentHandler::findStudent);}@Beanpublic RouterFunction<ServerResponse> routerFunction() {System.out.println(">>>>>>>customerRouter");return RouterFunctions.route().GET("/func/hello2", request ->ServerResponse.ok().bodyValue(">>>>>hello routerFunction....")).build();}}對應的處理 handler:
@Component public class StudentHandler {public Mono<ServerResponse> findStudent(ServerRequest request) {System.out.println(">>>>>>request=" + request);return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue("hello,findStudent");}public Mono<ServerResponse> create(ServerRequest request) {System.out.println(">>>>>>create=" + request);ServerHttpRequest request1 = request.exchange().getRequest();MultiValueMap<String, String> map = request1.getQueryParams();BookFactory.createBook(Book.builder().price(Double.valueOf(map.getFirst("price"))).cateGory(map.getFirst("cateGory")).title(map.getFirst("title")).build());return ServerResponse.ok().bodyValue("創建成功");}public Mono<ServerResponse> deleteById(ServerRequest request) {String id = request.pathVariable("id");System.out.println(">>>>>>deleteById= id=" + id);return BookFactory.queryBookMonoById(id).flatMap(book -> {BookFactory.remove(book);return ServerResponse.ok().build();}).switchIfEmpty(ServerResponse.notFound().build());} }public class BookFactory {public static List<Book> list;public static List<Book> dataList = new ArrayList<>();static {list = List.of(Book.builder().title("《遮天》").price(100.23).cateGory("仙俠類").build(),Book.builder().title("《完美世界》").price(222.22).cateGory("仙俠類").build(),Book.builder().title("《斗破蒼穹》").price(123.22).cateGory("動作類").build(),Book.builder().title("《斗羅大陸》").price(23.22).cateGory("動作類").build(),Book.builder().title("《問天》").price(199.99).cateGory("言情類").build());list.forEach(dataList::add);}public static List<Book> queryAllBooks() {return dataList;}public static boolean createBook(Book book) {System.out.println(">>>>createBook book="+book);return dataList.add(book);}public static boolean remove(Book book) {System.out.println(">>>>remove book="+book);return dataList.remove(book);}public static Optional<Book> queryBookById(String id) {return Optional.ofNullable(dataList.get(dataList.size()-1));}public static Mono<Book> queryBookMonoById(String id) {return Mono.justOrEmpty(queryBookById(id));} }RouteLocator 網關實現類
引入相關依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId><version>3.1.1</version></dependency>代碼演示:
@Configuration public class GatewayAutoConfiguration {/*** 原來的訪問方式:http://localhost:8080/hello* 加網關訪問方式:http://localhost:8080/gw/simple/hello* 這兩個效果是一樣的*/@Beanpublic RouteLocator routeLocator(RouteLocatorBuilder builder) {return builder.routes().route("gw-simple",r -> r.path("/gw/simple/***").filters(f -> f.rewritePath("/gw/simple/(?<path>.*)", "/$\\{path}")).uri("http://localhost:8080/")) // 最簡單的一個路由網關實現方式.route("gw-lb",r -> r.path("/gw/lb/***").filters(f -> f.rewritePath("/gw/lb/(?<path>.*)", "/$\\{path}")).uri("lb://web-flux-demo8080")) // 可以實現負載均衡功能.build();} }瀏覽器中輸入:http://localhost:8080/hello 或者 http://localhost:8080/gw/simple/hello 都可以被準發到后臺。
?
r2dbc 異步數據庫操作
引入依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency>application.properties 配置:
spring.application.name=web-flux-demo8080 spring.r2dbc.name=r2dbc spring.r2dbc.url=r2dbcs:mysql://127.0.0.1:3306/gwmdb?serverTimezone=GMT%2B8 spring.r2dbc.username=root spring.r2dbc.password=itsme999配置類:
@Configuration public class R2dbcAutoConfiguration {/*** 因為 r2dbc 沒有 orm 框架的功能,所以需要配置下面這段代碼去自動生成 table 表*///@Beanpublic ConnectionFactoryInitializer connectionFactoryInitializer(ConnectionFactory connectionFactory){ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();initializer.setConnectionFactory(connectionFactory);CompositeDatabasePopulator populator = new CompositeDatabasePopulator();populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));initializer.setDatabasePopulator(populator);return initializer;} }直接使用 DatabaseClient 工具類操作數據庫即可:
@Repository public class StudentDao {@Autowiredprivate DatabaseClient databaseClient;public Mono<Book> queryById() {// 調用 DatabaseClient 提供的 API 即可,類比 Conditionreturn null;} }WebClient 遠程調用工具
基于 Netty 編寫的遠程訪問工具,在高并發場景性能會比 RestTemplate 好。
package com.gwm.webflux.client;import com.gwm.webflux.data.Book; import io.netty.channel.ChannelOption; import io.netty.handler.timeout.ReadTimeoutHandler; import org.springframework.http.HttpStatus; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient;import java.util.concurrent.TimeUnit;public class WebClientDemo {public static void main(String[] args) {WebClient webClient = WebClient.create("http://xxxx");webClient.post().uri("/book").body(Mono.just(Book.builder().build()),Book.class).exchange().flatMap(clientResponse ->{if (clientResponse.statusCode() != HttpStatus.CREATED) {return clientResponse.createException().flatMap(Mono::error);}System.out.println(">>>>>>>調用出錯...error code="+clientResponse.statusCode());return Mono.just(clientResponse);}) // 添加異常處理.retry(3) // 重試3次// retryBackoff(3,Duration.ofSeconds(1)) // 間隔時間成指數級別增長:1、2、4、8.doOnNext(System.out::println).subscribe();webClient.get().uri("/book2").retrieve().bodyToFlux(Book.class).doOnNext(aBook -> System.out.println(">>>>>> GET BOOKS:"+aBook)).blockLast();/** webClient 超時的設置*/HttpClient httpClient = HttpClient.create().tcpConfiguration(tcpClient -> {tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500).doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(5, TimeUnit.SECONDS)));return tcpClient;});ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);/** 最終獲取到了可以自定義超時的客戶端工具 */WebClient webClientTimeout = WebClient.builder().clientConnector(reactorClientHttpConnector).build();} }總結
以上是生活随笔為你收集整理的Reactive 响应式编程简单使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CSDN 2018博客之星评选,感谢大家
- 下一篇: 设计模式实用讲解