回调地狱和反应模式
我可以更好地了解a的用途的一種方式
基于反應(yīng)流的方法是它簡(jiǎn)化了無(wú)阻塞IO調(diào)用的方式。
這篇文章將快速講解進(jìn)行同步遠(yuǎn)程調(diào)用所涉及的那種代碼,然后說(shuō)明如何在非阻塞IO中分層,盡管在資源(尤其是線(xiàn)程)的使用方面非常高效,但會(huì)帶來(lái)稱(chēng)為回調(diào)地獄的復(fù)雜性,并且基于反應(yīng)流的方法如何簡(jiǎn)化編程模型。
目標(biāo)服務(wù)
由于我將編寫(xiě)一個(gè)客戶(hù)呼叫,因此代表城市詳細(xì)信息的目標(biāo)服務(wù)有兩個(gè)端點(diǎn)。 當(dāng)使用uri類(lèi)型的“ / cityids”調(diào)用時(shí)返回一個(gè)城市ID列表,并且示例結(jié)果如下所示:
[ 1 , 2 , 3 , 4 , 5 , 6 , 7 ]端點(diǎn)返回給定城市ID的城市的詳細(xì)信息,例如,當(dāng)使用ID 1 –“ / cities / 1”進(jìn)行調(diào)用時(shí):
{ "country" : "USA" , "id" : 1 , "name" : "Portland" , "pop" : 1600000 }客戶(hù)的責(zé)任是獲取城市ID的列表,然后為每個(gè)城市ID獲取城市的詳細(xì)信息并將其放到城市列表中。
同步通話(huà)
我正在使用Spring Framework的RestTemplate進(jìn)行遠(yuǎn)程調(diào)用。 Kotlin函數(shù)獲取城市ID列表如下所示:
private fun getCityIds(): List<String> { val cityIdsEntity: ResponseEntity<List<String>> = restTemplate .exchange( " http://localhost: $localServerPort/cityids" , HttpMethod.GET, null , object : ParameterizedTypeReference<List<String>>() {}) return cityIdsEntity.body!! }并獲取城市的詳細(xì)信息:
private fun getCityForId(id: String): City { return restTemplate.getForObject( " http://localhost: $localServerPort/cities/$id" , City:: class .java)!! }給定這兩個(gè)功能,可以很容易地將它們組合起來(lái),以便返回城市列表:
val cityIds: List<String> = getCityIds() val cities: List<City> = cityIds .stream() .map<City> { cityId -> getCityForId(cityId) } .collect(Collectors.toList()) cities.forEach { city -> LOGGER.info(city.toString()) } 該代碼非常易于理解,但是涉及8個(gè)阻塞調(diào)用–
1.獲取7個(gè)城市ID的列表,然后獲取每個(gè)城市的詳細(xì)信息 2.獲取七個(gè)城市中每個(gè)城市的詳細(xì)信息
這些調(diào)用中的每一個(gè)都會(huì)位于不同的線(xiàn)程上。
結(jié)合使用非阻塞IO和回調(diào)
我將使用一個(gè)名為AsyncHttpClient的庫(kù)來(lái)進(jìn)行非阻塞IO調(diào)用。
進(jìn)行遠(yuǎn)程調(diào)用時(shí),AyncHttpClient返回一個(gè)ListenableFuture類(lèi)型。
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute()可以將回調(diào)附加到可監(jiān)聽(tīng)的將來(lái),以便在可用時(shí)對(duì)響應(yīng)進(jìn)行操作。
responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) .... }給定cityids列表,我想獲取城市的詳細(xì)信息,因此從響應(yīng)中,我需要進(jìn)行更多的遠(yuǎn)程調(diào)用,并為每個(gè)調(diào)用附加一個(gè)回調(diào),以按照以下方式獲取城市的詳細(xì)信息:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) cityIds.stream().map { cityId -> val cityListenableFuture = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() cityListenableFuture.addListener(Runnable { val cityDescResp = cityListenableFuture.get() val cityDesc = cityDescResp.responseBody val city = objectMapper.readValue(cityDesc, City:: class .java) LOGGER.info( "Got city: $city" ) }, executor) }.collect(Collectors.toList()) }, executor)這是一段粗糙的代碼,在一個(gè)回調(diào)中有一組回調(diào),很難對(duì)此進(jìn)行推理和理解,因此被稱(chēng)為回調(diào)地獄。
在Java CompletableFuture中使用非阻塞IO
通過(guò)返回Java的CompletableFuture作為返回類(lèi)型而不是ListenableFuture,可以對(duì)代碼進(jìn)行一些改進(jìn)。 CompletableFuture提供允許修改返回的返回類(lèi)型的運(yùn)算符。
例如,考慮獲取城市ID列表的函數(shù):
private fun getCityIds(): CompletableFuture<List<Long>> { return asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {}) l } }在這里,我使用“ thenApply”運(yùn)算符將“ CompletableFuture <Response>”轉(zhuǎn)換為“ CompletableFuture <List <Long >>”
并類(lèi)似地獲得城市的詳細(xì)信息:
private fun getCityDetail(cityId: Long): CompletableFuture<City> { return asyncHttpClient.prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody LOGGER.info( "Got {}" , s) val city = objectMapper.readValue(s, City:: class .java) city } }這是基于回調(diào)方法的改進(jìn),但是,CompletableFuture缺少足夠的運(yùn)算符,例如,在這種特定情況下,需要將所有城市詳細(xì)信息放在一起:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds() val citiesCompletableFuture: CompletableFuture<List<City>> = cityIdsFuture .thenCompose { l -> val citiesCompletable: List<CompletableFuture<City>> = l.stream() .map { cityId -> getCityDetail(cityId) }.collect(toList()) val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray()) .thenApply { _: Void? -> citiesCompletable .stream() .map { it.join() } .collect(toList()) } citiesCompletableFutureOfList }我使用了一個(gè)名為CompletableFuture.allOf的運(yùn)算符,該運(yùn)算符返回“ Void”類(lèi)型,并且必須強(qiáng)制返回所需的“” CompletableFuture <List <City >>類(lèi)型。
使用Project Reactor
Project Reactor是Reactive Streams規(guī)范的實(shí)現(xiàn)。 它有兩種特殊類(lèi)型,可返回0/1項(xiàng)目流和0 / n項(xiàng)目流–前者是Mono,后者是Flux。
Project Reactor提供了一組非常豐富的運(yùn)算符,這些運(yùn)算符允許以多種方式轉(zhuǎn)換數(shù)據(jù)流。 首先考慮該函數(shù)以返回城市ID列表:
private fun getCityIds(): Flux<Long> { return webClient.get() .uri( "/cityids" ) .exchange() .flatMapMany { response -> LOGGER.info( "Received cities.." ) response.bodyToFlux<Long>() } }我正在使用Spring出色的WebClient庫(kù)進(jìn)行遠(yuǎn)程調(diào)用,并獲得Project反應(yīng)器“ Mono <ClientResponse>”類(lèi)型的響應(yīng),可以使用“ flatMapMany”運(yùn)算符將其修改為“ Flux <Long>”類(lèi)型。
在給定城市ID的情況下,沿著相同的路線(xiàn)獲取城市的詳細(xì)信息:
private fun getCityDetail(cityId: Long?): Mono<City> { return webClient.get() .uri( "/cities/{id}" , cityId!!) .exchange() .flatMap { response -> val city: Mono<City> = response.bodyToMono() LOGGER.info( "Received city.." ) city } }在這里,使用“ flatMap”運(yùn)算符將項(xiàng)目反應(yīng)堆“ Mono <ClientResponse>”類(lèi)型轉(zhuǎn)換為“ Mono <City>”類(lèi)型。
以及從中獲取城市ID和城市的代碼:
val cityIdsFlux: Flux<Long> = getCityIds() val citiesFlux: Flux<City> = cityIdsFlux .flatMap { this .getCityDetail(it) } return citiesFlux這非常具有表現(xiàn)力-對(duì)比了基于回調(diào)的方法的混亂和基于響應(yīng)流的方法的簡(jiǎn)單性。
結(jié)論
在我看來(lái),這是使用基于響應(yīng)流的方法的最大原因之一,尤其是在涉及跨越異步邊界的場(chǎng)景(例如在這種情況下進(jìn)行遠(yuǎn)程調(diào)用)的情況下,尤其是Project Reactor。 它清除了各種回調(diào)和回調(diào)地獄,并提供了使用一組豐富的運(yùn)算符修改/轉(zhuǎn)換類(lèi)型的自然方法。
我在這里使用的所有示例的工作版本的存儲(chǔ)庫(kù)位于https://github.com/bijukunjummen/reactive-cities-demo/tree/master/src/test/kotlin/samples/geo/kotlin
翻譯自: https://www.javacodegeeks.com/2019/06/callback-hell-reactive-patterns.html
總結(jié)
- 上一篇: Fedora Linux 39 Beta
- 下一篇: 小米14 Pro影像规格升级:搭载500