Java8函数式编程(2)--流与管道
流的特性
流不同于集合,流不存儲值,流的目的是處理值。將集合作為流的源,創建流不會導致數據流動。當終止操作需要值時,流從集合中獲取值。
流不會持有值。對于源不是集合的流,流不會持有值
IntStream.iterator(1,i->i*2).limit(10).forEachOrderd(System.out::Print);iterator產生無限流,lambda表達式:i->i*2 被調用的次數 由limit()限定,而不是創建一個無限的集合。
流背后的思想是延遲計算:只到需要時才計算值。
原生流
Java5中引用的自動裝箱與拆箱特性使程序員無需考慮原生類型與相應包裝類型之間的差別,但是會帶來性能問題。
Optional<Integer> s = Arrays.asList(1,2,3,4,5).stream().map(i -> i + 1).max(Integer::Compartor) ;調用map()方法時,會先拆箱i為int,i+1之后再裝箱為Integer。
原生流可以減少這些操作,類型包括:IntStream,LongStream,DoubleStream。原生流與引用流Stream具有相同的實現思想。
管道(流水線,Pinpline)
流的威力是通過組合起來的管道來實現的。管道源自流的源,然后通過中間操作進行各種轉換,最后在終止操作中停止。
| Stream操作分類 | ||
| 中間操作(Intermediate operations) | 無狀態(Stateless) | unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() |
| 有狀態(Stateful) | distinct() sorted()? limit() skip() | |
| 結束操作(Terminal operations) | 非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() max() min() count() |
| 短路操作(short-circuiting) | anyMatch() allMatch() noneMatch() findFirst() findAny() | |
Stream上的所有操作分為兩類:中間操作和結束操作,中間操作只是一種標記,只有結束操作才會觸發實際計算。中間操作又可以分為無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操作是指元素的處理不受前面元素的影響,而有狀態的中間操作必須等到所有元素處理之后才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前并不能確定排序結果;結束操作又可以分為短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結果,比如找到第一個滿足條件的元素。之所以要進行如此精細的劃分,是因為底層對每一種情況的處理方式不同。
操作記錄
關于操作如何記錄,在JDK源碼注釋中多次用stage來標識用戶的每一次操作,而通常情況下Stream的操作又需要一個回調函數,所以一個完整的操作是由數據來源、操作、回調函數組成的三元組來表示。而在具體實現中,使用實例化的ReferencePipeline來表示,即圖1-2中的Head、StatelessOp、StatefulOp的實例。
圖中通過Collection.stream()方法得到Head也就是stage0,緊接著調用一系列的中間操作,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一起,構成整個流水線,由于每個Stage都記錄了前一個Stage和本次的操作以及回調函數,依靠這種結構就能建立起對數據源的所有操作。這就是Stream記錄操作的方式。
?
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {@SuppressWarnings("rawtypes")private final AbstractPipeline sourceStage;/*** The "upstream" pipeline, or null if this is the source stage.*/@SuppressWarnings("rawtypes")private final AbstractPipeline previousStage;/**管道或操作類型*/protected final int sourceOrOpFlags;/*** The next stage in the pipeline, or null if this is the last stage.* Effectively final at the point of linking to the next pipeline.*/@SuppressWarnings("rawtypes")private AbstractPipeline nextStage;private int depth;/*** The combined source and operation flags for the source and all operations* up to and including the operation represented by this pipeline object.* Valid at the point of pipeline preparation for evaluation.*/private int combinedFlags; }操作疊加
Stage只解決了操作記錄問題,Stage僅保存了當前的操作,直到當前Stage怎么處理數據,但是不直到下一個Stage如何處理數據,處理何種數據。JDK定義了Sink接口來協調Stage之間的調用關系。
Sink接口包含的方法如下表所示:
| 方法名 | 作用 |
| void begin(long size) | 開始遍歷元素之前調用該方法,通知Sink做好準備。 |
| void end() | 所有元素遍歷完成之后調用,通知Sink沒有更多的元素了。 |
| boolean cancellationRequested() | 是否可以結束操作,可以讓短路操作盡早結束。 |
| void accept(T t) | 遍歷元素時調用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法里,前一個Stage只需要調用當前Stage.accept(T t)方法就行了。 |
每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調用后一個Stage的accept()方法即可,并不需要知道其內部是如何處理的。當然對于有狀態的操作,Sink的begin()和end()方法也是必須實現的。比如Stream.sorted()是一個有狀態的中間操作,其對應的Sink.begin()方法可能創建一個乘放結果的容器,而accept()方法負責將元素添加到該容器,最后end()負責對容器進行排序。對于短路操作,Sink.cancellationRequested()也是必須實現的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者盡快結束查找。Sink的四個接口方法常常相互協作,共同完成計算任務。實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法。
有了Sink對操作的包裝,Stage之間的調用問題就解決了,執行時只需要從流水線的head開始對數據源依次調用每個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一種可能的Sink.accept()方法流程是這樣的:
void accept(U u){1. 使用當前Sink包裝的回調函數處理u2. 將處理結果傳遞給流水線下游的Sink }Sink接口的其他幾個方法也是按照這種[處理->轉發]的模型實現。下面我們結合具體例子看看Stream的中間操作是如何將自身的操作包裝成Sink以及Sink是如何將處理結果轉發給下一個Sink的。先看Stream.map()方法:
// Stream.map(),調用該方法將產生一個新的Stream public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {...return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@Override /*opWripSink()方法返回由回調函數包裝而成Sink*/Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {return new Sink.ChainedReference<P_OUT, R>(downstream) {@Overridepublic void accept(P_OUT u) {R r = mapper.apply(u);// 1. 使用當前Sink包裝的回調函數mapper處理udownstream.accept(r);// 2. 將處理結果傳遞給流水線下游的Sink}};}}; }上述代碼將回調函數mapper包裝到一個Sink當中。由于Stream.map()是一個無狀態的中間操作,所以map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將得到一個包裝了當前回調函數的Sink。
再來看一個復雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態的中間操作,因為讀取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進入問題本質,sorted()方法是如何將操作封裝成Sink的呢?sorted()一種可能封裝的Sink代碼如下:
// Stream.sort()方法用到的Sink實現 class RefSortingSink<T> extends AbstractRefSortingSink<T> {private ArrayList<T> list;// 存放用于排序的元素RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {super(downstream, comparator);}@Overridepublic void begin(long size) {...// 創建一個存放排序元素的列表list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();}@Overridepublic void end() {list.sort(comparator);// 只有元素全部接收之后才能開始排序downstream.begin(list.size());if (!cancellationWasRequested) {// 下游Sink不包含短路操作list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink}else {// 下游Sink包含短路操作for (T t : list) {// 每次都調用cancellationRequested()詢問是否可以結束處理。if (downstream.cancellationRequested()) break;downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink}}downstream.end();list = null;}@Overridepublic void accept(T t) {list.add(t);// 1. 使用當前Sink包裝動作處理t,只是簡單的將元素添加到中間列表當中} }上述代碼完美的展現了Sink的四個接口方法是如何協同工作的:
操作執行
操作的執行由結束操作啟動。
結束操作之后不能再有別的操作,所以結束操作不會創建新的Stage。結束操作會創建一個包裝了自己操作的Sink,這也是流水線中最后一個Sink,這個Sink只需要處理數據而不需要將結果傳遞給下游的Sink(因為沒有下游)。對于Sink的[處理->轉發]模型,結束操作的Sink就是調用鏈的出口。
上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。但Stream類庫的設計者沒有這么做,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個新的包含了當前Stage代表的操作以及能夠將結果傳遞給downstream的Sink對象。為什么要產生一個新對象而不是返回一個Sink字段?這是因為使用opWrapSink()可以將當前操作與下游Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最后一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,因為stage0代表數據源,不包含操作),就可以得到一個代表了流水線上所有操作的Sink,用代碼表示就是這樣:
// AbstractPipeline.wrapSink() // 從下游向上游不斷包裝Sink。如果最初傳入的sink代表結束操作, // 函數返回時就可以得到一個代表了流水線上所有操作的Sink。 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {...for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink; }現在流水線上從開始到結束的所有的操作都被包裝到了一個Sink里,執行這個Sink就相當于執行整個流水線,執行Sink的代碼如下:
// AbstractPipeline.copyInto(), 對spliterator代表的數據執行wrappedSink代表的操作。 final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {...if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷spliterator.forEachRemaining(wrappedSink);// 迭代wrappedSink.end();// 通知遍歷結束}... }上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,然后調用spliterator.forEachRemaining()方法對數據進行迭代(Spliterator是容器的一種迭代器,參閱),最后調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。
操作結果
不是所有的Stream結束操作都需要返回結果,有些操作只是為了使用其副作用(Side-effects),比如使用Stream.forEach()方法將結果打印出來就是常見的使用副作用的場景(事實上,除了打印之外其他場景都應避免使用副作用),對于真正需要返回結果的結束操作結果存在哪里呢?
特別說明:副作用不應該被濫用,也許你會覺得在Stream.forEach()里進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都無法保證,因為Stream可能會并行執行。大多數使用副作用的地方都可以使用歸約操作更安全和有效的完成。
// 錯誤的收集方式 ArrayList<String> results = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()).forEach(s -> results.add(s)); // Unnecessary use of side-effects! // 正確的收集方式 List<String>results =stream.filter(s -> pattern.matcher(s).matches()).collect(Collectors.toList()); // No side-effects!回到流水線執行結果的問題上來,需要返回結果的流水線結果存在哪里呢?這要分不同的情況討論,下表給出了各種有返回結果的Stream結束操作。
| 返回類型 | 對應的結束操作 |
| boolean | anyMatch() allMatch() noneMatch() |
| Optional | findFirst() findAny() |
| 歸約結果 | reduce() collect() |
| 數組 | toArray() |
?
總結
以上是生活随笔為你收集整理的Java8函数式编程(2)--流与管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java8函数式编程(1)--Princ
- 下一篇: Java Stream API性能测试