使用Java 8 CompletableFuture和Rx-Java Observable
我想使用Java 8 CompletableFuture和Rx-Java Observable探索一個簡單的分散聚集場景。
場景很簡單–產生大約10個任務,每個任務返回一個字符串,最終將結果收集到一個列表中。
順序的
其順序版本如下:
public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString()); }private String generateTask(int i) {Util.delay(2000);return i + "-" + "test"; }隨著CompletableFuture
可以使用稱為supplyAsync的實用程序方法來使方法返回CompletableFuture,我正在使用此方法的一種變體,它接受要使用的顯式Executor ,而且我故意為其中一個輸入拋出異常:
private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService); }現在分散任務:
List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());在分散任務結束時,結果是CompletableFuture列表。 現在,要從中獲取String列表有些棘手,這里我使用Stackoverflow中建議的一種解決方案:
CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));這里僅使用CompletableFuture.allOf方法來構成下一步操作,一旦所有分散的任務都完成了,則一旦完成任務,期貨就會再次流式傳輸并收集到一個字符串列表中。
然后可以異步顯示最終結果:
result.thenAccept(l -> {logger.info(l.toString()); });使用Rx-java Observable
使用Rx-java進行分散收集相對比CompletableFuture版本更干凈,因為Rx-java提供了更好的方法將結果組合在一起,這也是執行分散任務的方法:
private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService)); }并分散任務:
List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());我又有了一個Observable的列表,而我需要的是一個結果列表,Observable提供了一個合并方法來做到這一點:
Observable<List<String>> merged = Observable.merge(obs).toList();可以訂閱并在可用時打印結果:
merged.subscribe(l -> logger.info(l.toString()));翻譯自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html
總結
以上是生活随笔為你收集整理的使用Java 8 CompletableFuture和Rx-Java Observable的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 两台无线路由器怎样实现无缝漫游两台无线路
- 下一篇: 使用Unsafe真的是关于速度或功能吗?