背压加载文件– RxJava常见问题解答
事實證明,將文件作為流進行處理非常有效且方便。 許多人似乎忘記了,自Java 8(3年多!)以來,我們可以很容易地將任何文件變成一行代碼:
String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {reader.lines().filter(line -> !line.startsWith("#")).map(String::toLowerCase).flatMap(line -> Stream.of(line.split(" "))).forEach(System.out::println); }reader.lines()返回Stream<String> ,您可以對其進行進一步轉(zhuǎn)換。 在此示例中,我們丟棄以"#"開頭的行,并通過將其拆分為單詞來爆炸每行。 通過這種方式,我們可以實現(xiàn)單詞流而不是行流。 使用文本文件幾乎與使用普通Java集合一樣簡單。 在RxJava中, 我們已經(jīng)學(xué)習(xí)了generate()運算符。 它也可以在這里用于從文件創(chuàng)建健壯的行流:
Flowable<String> file = Flowable.generate(() -> new BufferedReader(new FileReader(filePath)),(reader, emitter) -> {final String line = reader.readLine();if (line != null) {emitter.onNext(line);} else {emitter.onComplete();}},reader -> reader.close() );在上述示例中, generate()運算符稍微復(fù)雜一些。 第一個參數(shù)是狀態(tài)工廠。 每次有人訂閱此流時,都會調(diào)用工廠并創(chuàng)建有狀態(tài)的BufferedReader 。 然后,當下游運營商或訂戶希望接收某些數(shù)據(jù)時,將調(diào)用第二個lambda(帶有兩個參數(shù))。 此lambda表達式嘗試從文件中精確提取一行,然后將其發(fā)送到下游( onNext() )或在遇到文件結(jié)尾時完成。 這很簡單。 generate()的第三個可選參數(shù)是一個lambda表達式,可以對state進行一些清理。 在我們的情況下這非常方便,因為我們不僅必須在到達文件末尾時關(guān)閉文件,而且還必須在使用者過早取消訂閱時關(guān)閉文件。
認識Flowable.using()運算符
這似乎需要做很多工作,尤其是當我們已經(jīng)有了來自JDK 8的一行代碼時。事實證明,有一個類似的工廠運算符using()很方便。 的翻譯的所有最簡單的方法首先Stream從Java到Flowable是通過轉(zhuǎn)換Stream成Iterator (checked異常處理忽略):
Flowable.fromIterable(new Iterable<String>() {@Overridepublic Iterator<String> iterator() {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();} });可以簡化為:
Flowable.<String>fromIterable(() -> {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator(); });但是我們忘了關(guān)閉BufferedReader從而關(guān)閉FileReader從而關(guān)閉文件句柄。 因此,我們引入了資源泄漏。 在這種情況下, using()運算符的作用就像是一種魅力。 在某種程度上,它類似于try-with-resources語句。 您可以基于某些外部資源創(chuàng)建流。 當有人訂閱或取消訂閱時,將為您管理此資源的生命周期(創(chuàng)建和處置):
Flowable.using(() -> new BufferedReader(new FileReader(filePath)),reader -> Flowable.fromIterable(() -> reader.lines().iterator()),reader -> reader.close() );它與上一個generate()示例非常相似,但是中間最重要的lambda表達式卻大不相同。 我們獲得一個資源( reader )作為參數(shù),并假設(shè)返回一個Flowable (而不是單個元素)。 該lambda僅被調(diào)用一次,而不是在每次下游請求新項時調(diào)用。 using()運算符給我們的是管理BufferedReaders的生命周期。 當我們有一個狀態(tài)(可以一次生成整個Flowable ,而不是一次generate()一個using()時, using()很有用。
流XML文件
…或JSON。 假設(shè)您有一個非常大的XML文件,其中包含以下條目,其中包括數(shù)十萬個條目:
<trkpt lat="52.23453" lon="21.01685"><ele>116</ele> </trkpt> <trkpt lat="52.23405" lon="21.01711"><ele>116</ele> </trkpt> <trkpt lat="52.23397" lon="21.0166"><ele>116</ele> </trkpt>這是標準GPS交換格式的片段,可以描述任意長度的地理路線。 每個<trkpt>是具有緯度,經(jīng)度和海拔的單個點。 我們希望有一個跟蹤點流(為簡單起見忽略高程),以便可以部分使用文件,而不是一次加載所有文件。 我們有三個選擇:
- DOM / JAXB –必須將所有內(nèi)容加載到內(nèi)存中并映射到Java對象。 不適用于無限長的文件(甚至非常大的文件)
- SAX –基于推送的庫,一旦發(fā)現(xiàn)XML標簽打開或關(guān)閉,就會調(diào)用回調(diào)。 似乎好一點,但可能無法支持背壓–由庫決定何時調(diào)用回調(diào),并且無法減慢其速度
- StAX –與SAX相似,但是我們必須積極地從XML文件中提取數(shù)據(jù)。 這對于支持背壓至關(guān)重要-我們決定何時讀取下一個數(shù)據(jù)塊
讓我們嘗試使用StAX和RxJava實現(xiàn)可能很大的XML文件的解析和流傳輸。 首先,我們必須首先學(xué)習(xí)如何使用StAX 。 該解析器稱為XMLStreamReader ,它是按照以下咒語和詛咒序列創(chuàng)建的:
XMLStreamReader staxReader(String name) throws XMLStreamException {final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));return XMLInputFactory.newInstance().createXMLStreamReader(inputStream); }只需閉上眼睛,并確保您始終有一個地方可以復(fù)制粘貼上面的代碼片段。 情況變得更糟。 為了讀取第一個<trkpt>標記及其屬性,我們必須編寫一些復(fù)雜的代碼:
import lombok.Value;@Value class Trackpoint {private final BigDecimal lat;private final BigDecimal lon; }Trackpoint nextTrackpoint(XMLStreamReader r) {while (r.hasNext()) {int event = r.next();switch (event) {case XMLStreamConstants.START_ELEMENT:if (r.getLocalName().equals("trkpt")) {return parseTrackpoint(r);}break;case XMLStreamConstants.END_ELEMENT:if (r.getLocalName().equals("gpx")) {return null;}break;}}return null; }Trackpoint parseTrackpoint(XMLStreamReader r) {return new Trackpoint(new BigDecimal(r.getAttributeValue("", "lat")),new BigDecimal(r.getAttributeValue("", "lon"))); }API是低級報價,并且?guī)缀跏枪哦?一切都發(fā)生在一個巨大的循環(huán)中,該循環(huán)讀取... int類型的東西 。 此int可以是START_ELEMENT , END_ELEMENT或我們不感興趣的其他一些東西。請記住,我們正在讀取XML文件,但不是逐行或逐字符,而是通過邏輯XML標記(標記)。 因此,如果發(fā)現(xiàn)打開<trkpt>元素,則將其解析,否則繼續(xù)。 第二個重要條件是當我們發(fā)現(xiàn)關(guān)閉</gpx> ,這應(yīng)該是GPX文件中的最后一件事。 在這種情況下,我們返回null ,表示XML文件結(jié)束。
感覺復(fù)雜嗎? 實際上,這是讀取具有恒定內(nèi)存使用量的大型XML(與文件大小無關(guān))的最簡單方法。 所有這些與RxJava有何關(guān)系? 在這一點上,我們可以很容易地構(gòu)建Flowable<Trackpoint> 。 是的, Flowable ,沒有Observable (見: Obsevable與Observable )。 這樣的流將完全支持背壓,這意味著它將以適當?shù)乃俣茸x取文件:
Flowable<Trackpoint> trackpoints = generate(() -> staxReader("track.gpx"),this::pushNextTrackpoint,XMLStreamReader::close);void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {final Trackpoint trkpt = nextTrackpoint(reader);if (trkpt != null) {emitter.onNext(trkpt);} else {emitter.onComplete();} }哇,如此簡單,如此反壓! [1]我們首先創(chuàng)建一個XMLStreamReader ,并確保在文件結(jié)束或有人取消訂閱時將其關(guān)閉。 請記住,每個訂戶將一次又一次打開并開始解析相同的文件。 中間的lambda表達式僅使用狀態(tài)變量( XMLStreamReader )并發(fā)出另一個跟蹤點。 所有這些似乎都很晦澀,事實是! 但是,現(xiàn)在我們有了一個使用很少的資源就可以從一個可能很大的文件中提取回溯感知流。 我們可以同時處理跟蹤點,也可以將它們與其他數(shù)據(jù)源組合在一起。 在下一篇文章中,我們將學(xué)習(xí)如何以非常相似的方式加載JSON。
翻譯自: https://www.javacodegeeks.com/2017/09/loading-files-backpressure-rxjava-faq.html
總結(jié)
以上是生活随笔為你收集整理的背压加载文件– RxJava常见问题解答的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linuxu盘制作工具(linux u盘
- 下一篇: 保安资质备案在哪里备案(保安资质备案)