IO流--转载
第 1 部分?從輸出流中讀取
http://www.ibm.com/developerworks/cn/java/j-io1/
自早期基于瀏覽器的 applet 和簡單應用程序以來,Java 平臺已有了巨大的發(fā)展。現(xiàn)在,我們有多個平臺和概要及許多新的 API,并且還在制作的差不多有數(shù)百種之多。盡管 Java 語言的復雜程度在不斷增加,但它對于日常的編程任務而言仍是一個出色的工具。雖然有時您會陷入那些日復一日的編程問題中,但偶爾您也能夠回過頭去,發(fā)現(xiàn)一個很棒的解決方案來處理您以前曾多次遇到過的問題。
就在前幾天,我想要壓縮一些通過網(wǎng)絡連接讀取的數(shù)據(jù)(我以壓縮格式將 TCP 數(shù)據(jù)中繼到一個 UDP 套接字)。記得 Java 平臺自版本 1.1 開始就支持壓縮,所以我直接求助于?java.util.zip?包,希望能找到一個適合于我的解決方案。然而,我發(fā)現(xiàn)一個問題:構(gòu)造的類都適用于常規(guī)情況,即在讀取時對數(shù)據(jù)解壓縮而在寫入時壓縮它們,沒有其它變通方法。雖然繞過 I/O 類是可能的,但我希望構(gòu)建一個基于流的解決方案,而不想偷懶直接使用壓縮程序。
不久以前,我在另一種情況下也遇到過完全相同的問題。我有一個 base-64 轉(zhuǎn)碼庫,與使用壓縮包一樣,它支持對從流中讀取的數(shù)據(jù)進行譯碼,并對寫入流中的數(shù)據(jù)進行編碼。然而,我需要的是一個在我從流中讀取數(shù)據(jù)的同時可以進行編碼的庫。
在我著手解決該問題時,我認識到我在另一種情況下也遇到過該問題:當序列化 XML 文檔時,通常會循環(huán)遍歷整個文檔,將節(jié)點寫入流中。然而,我遇到的情況是需要讀取序列化格式的文檔,以便將子集重新解析成一個新文檔。
回過頭想一下,我意識到這些孤立事件表示了一個共性的問題:如果有一個遞增地將數(shù)據(jù)寫入輸出流的數(shù)據(jù)源,那么我需要一個輸入流使我能夠讀取這些數(shù)據(jù),每當需要更多數(shù)據(jù)時,都能透明地訪問數(shù)據(jù)源。
在本文中,我們將研究對這一問題的三種可能的解決方案,同時決定一個實現(xiàn)最佳解決方案的新框架。然后,我們將針對上面列出的每個問題,檢驗該框架。我們將扼要地談及性能方面的問題,而把對此的大量討論留到下一篇文章中。
I/O 流基礎知識
首先,讓我們簡單回顧一下 Java 平臺的基本流類,如圖 1 所示。?OutputStream?表示對其寫入數(shù)據(jù)的流。通常,該流將直接連接至諸如文件或網(wǎng)絡連接之類的設備,或連接至另一個輸出流(在這種情況下,它稱為?過濾器(filter))。通常,輸出流過濾器在轉(zhuǎn)換了寫入其中的數(shù)據(jù)之后,才將轉(zhuǎn)換后產(chǎn)生的數(shù)據(jù)寫入相連的流中。?InputStream?表示可以從中讀取數(shù)據(jù)的流。同樣,該流也直接連接至設備或其它流。輸入流過濾器從相連的流中讀取數(shù)據(jù),轉(zhuǎn)換該數(shù)據(jù),然后允許從中讀取轉(zhuǎn)換后的數(shù)據(jù)。
圖 1. I/O 流基礎知識
就我最初的問題看,?GZIPOutputStream?類是一個輸出流過濾器,它壓縮寫入其中的數(shù)據(jù),然后將該壓縮數(shù)據(jù)寫入相連的流。我需要的輸入流過濾器應該能從流中讀取數(shù)據(jù),壓縮數(shù)據(jù),然后讓我讀取結(jié)果。
Java 平臺,版本 1.4 已引入了一個新的 I/O 框架?java.nio?。不過,該框架在很大程度上與提供對操作系統(tǒng) I/O 資源的有效訪問有關(guān);而且,雖然它確實為一些傳統(tǒng)的?java.io?類提供了類似功能,并可以表示同時支持輸入和輸出的雙重用途的資源,但它并不能完全替代標準流類,并且不能直接處理我需要解決的問題。
回頁首
蠻力解決方案
在著手尋找解決我問題的工程方案前,我根據(jù)標準 Java API 類的精致和有效性,研究了基于這些類的解決方案。
該問題的蠻力解決方案就是簡單地從輸入源中讀取所有數(shù)據(jù),然后通過轉(zhuǎn)換程序(即,壓縮流、編碼流或 XML 序列化器)將它們推進內(nèi)存緩沖區(qū)中。然后,我可以從該內(nèi)存緩沖區(qū)中打開要讀取的流,這樣我就解決了問題。
首先,我需要一個通用的 I/O 方法。清單 1 中的方法利用一個小緩沖區(qū)將?InputStream?中的所有數(shù)據(jù)復制到?OutputStream?。當?shù)竭_輸入的結(jié)尾(?read()?函數(shù)的返回值小于零)時,該方法就返回,但不關(guān)閉這兩個流。
清單 1. 通用的 I/O 方法
public static void io (InputStream in, OutputStream out)throws IOException {byte[] buffer = new byte[8192];int amount;while ((amount = in.read (buffer)) >= 0)out.write (buffer, 0, amount); }清單 2 顯示蠻力解決方案如何使我讀取壓縮格式的輸入流。我打開寫入內(nèi)存緩沖區(qū)的?GZIPOutputStream?(使用?ByteArrayOutputStream)。接著,將輸入流復制到壓縮流中,這樣將壓縮數(shù)據(jù)填入內(nèi)存緩沖區(qū)中。然后,我返回?ByteArrayInputStream?,它讓我從輸入流中讀取,如圖 2 所示。
圖 2. 蠻力解決方案
清單 2. 蠻力解決方案
public static InputStream bruteForceCompress (InputStream in)throws IOException {ByteArrayOutputStream sink = new ByteArrayOutputStream ():OutputStream out = new GZIPOutputStream (sink);io (in, out);out.close ();byte[] buffer = sink.toByteArray ();return new ByteArrayInputStream (buffer); }這個解決方案有一個明顯的缺點,它將整個壓縮文檔都存儲在內(nèi)存中。如果文檔很大,那么這種方法將不必要地浪費系統(tǒng)資源。使用流的主要特性之一是它們允許您操作比所用系統(tǒng)內(nèi)存要大的數(shù)據(jù):您可以在讀取數(shù)據(jù)時處理它們,或在寫入數(shù)據(jù)時生成數(shù)據(jù),而無需始終將所有數(shù)據(jù)保存在內(nèi)存中。
從效率上,讓我們對在緩沖區(qū)之間復制數(shù)據(jù)進行更深入研究。
通過?io()?方法,將數(shù)據(jù)從輸入源讀入至一個緩沖區(qū)中。然后,將數(shù)據(jù)從緩沖區(qū)寫入?ByteArrayOutputStream?中的緩沖區(qū)(通過我忽略的壓縮過程)。然而,?ByteArrayOutputStream?類對擴展的內(nèi)部緩沖區(qū)進行操作;每當緩沖區(qū)變滿時,就會分配一個大小是原來兩倍的新緩沖區(qū),接著將現(xiàn)有的數(shù)據(jù)復制到該緩沖區(qū)中。平均下來,這一過程每個字節(jié)復制兩次。(算術(shù)計算很簡單:當進入?ByteArrayOutputStream?時,對數(shù)據(jù)平均復制兩次;所有數(shù)據(jù)至少復制一次;有一半數(shù)據(jù)至少復制兩次;四分之一的數(shù)據(jù)至少復制三次,依次類推。)然后,將數(shù)據(jù)從該緩沖區(qū)復制到?ByteArrayInputStream?的一個新緩沖區(qū)中。現(xiàn)在,應用程序可以讀取數(shù)據(jù)了。總之,這個解決方案將通過四個緩沖區(qū)寫數(shù)據(jù)。這對于估計其它技術(shù)的效率是一個有用的基準。
回頁首
管道式流解決方案
管道式流?PipedOutputStream?和?PipedInputStream?在 Java 虛擬機的線程之間提供了基于流的連接。一個線程將數(shù)據(jù)寫入PipedOutputStream?中的同時,另一個線程可以從相關(guān)聯(lián)的?PipedInputStream?中讀取該數(shù)據(jù)。
就這樣,這些類提供了一個針對我問題的解決方案。清單 3 顯示了使用一個線程通過?GZIPOutputStream?將數(shù)據(jù)從輸入流復制到PipedOutputStream?的代碼。然后,相關(guān)聯(lián)的?PipedInputStream?將提供對來自另一個線程的壓縮數(shù)據(jù)的讀取權(quán),如圖 3 所示:
圖 3. 管道式流解決方案
清單 3. 管道式流解決方案
private static InputStream pipedCompress (final InputStream in)throws IOException {PipedInputStream source = new PipedInputStream ();final OutputStream out =new GZIPOutputStream (new PipedOutputStream (source));new Thread () {public void run () {try {Streams.io (in, out);out.close ();} catch (IOException ex) {ex.printStackTrace ();}}}.start ();return source; }理論上,這可能是個好技術(shù):通過使用線程(一個執(zhí)行壓縮,另一個處理產(chǎn)生的數(shù)據(jù)),應用程序可以從硬件 SMP(對稱多處理)或 SMT(對稱多線程)中受益。另外,這一解決方案僅涉及兩個緩沖區(qū)寫操作:I/O 循環(huán)將數(shù)據(jù)從輸入流讀入緩沖區(qū),然后通過壓縮流寫入PipedOutputStream?。接著,輸出流將數(shù)據(jù)存儲在內(nèi)部緩沖區(qū)中,與?PipedInputStream?共享緩沖區(qū)以供應用程序讀取。而且,因為數(shù)據(jù)通過固定緩沖區(qū)流動,所以從不需要將它們完全讀入內(nèi)存中。事實上,在任何給定時刻,緩沖區(qū)都只存儲小部分的工作集。
不過,實際上,它的性能很糟糕。管道式流需要利用同步,從而引起兩個線程之間激烈爭奪同步。它們的內(nèi)部緩沖區(qū)太小,無法有效地處理大量數(shù)據(jù)或隱藏鎖爭用。其次,持久共享緩沖區(qū)會阻礙許多簡單的高速緩存策略共享 SMP 機器上的工作負載。最后,線程的使用使得異常處理極其困難:沒有辦法將可能出現(xiàn)的任何?IOException?下推到管道中以便閱讀器處理。總之,這一解決方案太難處理,根本不實際。
工程解決方案
現(xiàn)在,我們將研究另一種解決該問題的工程方案。這種解決方案提供了一個特地為解決這類問題而設計的框架,該框架提供了對數(shù)據(jù)的InputStream?訪問,這些數(shù)據(jù)是從遞增地向?OutputStream?寫入數(shù)據(jù)的源中產(chǎn)生的。遞增地寫入數(shù)據(jù)這一事實很重要。如果源在單個原子操作中將所有數(shù)據(jù)都寫入?OutputStream?,而且如果不使用線程,則我們基本上又回到了蠻力技術(shù)的老路上。不過,如果可以訪問源以遞增地寫入其數(shù)據(jù),則我們就實現(xiàn)了在蠻力和管道式流解決方案之間的良好平衡。該解決方案不僅提供了在任何時候只在內(nèi)存中保存少量數(shù)據(jù)的管道式優(yōu)點,同時也提供了避免線程的蠻力技術(shù)的優(yōu)點。
圖 4 演示了完整的解決方案。我們將在本文的剩余部分研究?該解決方案的源代碼。
圖 4. 工程解決方案
輸出引擎
清單 4 提供了一個描述數(shù)據(jù)源的接口?OutputEngine?。正如我所說的,這些源遞增地將數(shù)據(jù)寫入輸出流:
清單 4. 輸出引擎
package org.merlin.io; import java.io.*; /*** An incremental data source that writes data to an OutputStream.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>** This program is free software; you can redistribute* it and/or modify it under the terms of the GNU* General Public License as published by the Free* Software Foundation; either version 2* of the License, or (at your option) any later version.*/ public interface OutputEngine {public void initialize (OutputStream out) throws IOException;public void execute () throws IOException;public void finish () throws IOException; }initialize()?方法向該引擎提供一個流,應該向這個流寫入數(shù)據(jù)。然后,重復調(diào)用?execute()?方法將數(shù)據(jù)寫入該流中。當數(shù)據(jù)寫完時,引擎會關(guān)閉該流。最后,當引擎應該關(guān)閉時,將調(diào)用?finish()?。這會發(fā)生在引擎關(guān)閉其輸出流的前后。
I/O 流引擎
輸出引擎解決了讓我費力處理的問題,它是一個通過輸出流過濾器將數(shù)據(jù)從輸入流復制到目標輸出流的引擎。這滿足了遞增性的特性,因為它可以一次讀寫單個緩沖區(qū)。
清單 5 到 10 中的代碼實現(xiàn)了這樣的一個引擎。通過輸入流和輸入流工廠來構(gòu)造它。清單 11 是一個生成過濾后的輸出流的工廠;例如,它會返回包裝了目標輸出流的?GZIPOutputStream?。
清單 5. I/O 流引擎
package org.merlin.io; import java.io.*; /*** An output engine that copies data from an InputStream through* a FilterOutputStream to the target OutputStream.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>** This program is free software; you can redistribute it and/or* modify it under the terms of the GNU General Public License* as published by the Free Software Foundation; either version 2* of the License, or (at your option) any later version.*/ public class IOStreamEngine implements OutputEngine {private static final int DEFAULT_BUFFER_SIZE = 8192;private InputStream in;private OutputStreamFactory factory;private byte[] buffer;private OutputStream out;該類的構(gòu)造器只初始化各種變量和將用于傳輸數(shù)據(jù)的緩沖區(qū)。
清單 6. 構(gòu)造器
public IOStreamEngine (InputStream in, OutputStreamFactory factory) {this (in, factory, DEFAULT_BUFFER_SIZE);}public IOStreamEngine(InputStream in, OutputStreamFactory factory, int bufferSize) {this.in = in;this.factory = factory;buffer = new byte[bufferSize];}在?initialize()?方法中,該引擎調(diào)用其工廠來封裝與其一起提供的?OutputStream?。該工廠通常將一個過濾器連接至?OutputStream?。
清單 7. initialize() 方法
public void initialize (OutputStream out) throws IOException {if (this.out != null) {throw new IOException ("Already initialised");} else {this.out = factory.getOutputStream (out);}}在?execute()?方法中,引擎從?InputStream?中讀取一個緩沖區(qū)的數(shù)據(jù),然后將它們寫入已封裝的?OutputStream?;或者,如果輸入結(jié)束,它會關(guān)閉?OutputStream?。
清單 8. execute() 方法
public void execute () throws IOException {if (out == null) {throw new IOException ("Not yet initialised");} else {int amount = in.read (buffer);if (amount < 0) {out.close ();} else {out.write (buffer, 0, amount);}}}最后,當關(guān)閉引擎時,它就關(guān)閉其?InputStream?。
清單 9. 關(guān)閉 InputStream
public void finish () throws IOException {in.close ();}內(nèi)部?OutputStreamFactory?接口(下面清單 10 中所示)描述可以返回過濾后的?OutputStream?的類。
清單 10. 內(nèi)部輸出流工廠接口
public static interface OutputStreamFactory {public OutputStream getOutputStream (OutputStream out)throws IOException;} }清單 11 顯示將提供的流封裝到?GZIPOutputStream?中的一個示例工廠:
清單 11. GZIP 輸出流工廠
public class GZIPOutputStreamFactoryimplements IOStreamEngine.OutputStreamFactory {public OutputStream getOutputStream (OutputStream out)throws IOException {return new GZIPOutputStream (out);} }該 I/O 流引擎及其輸出流工廠框架通常足以支持大多數(shù)的輸出流過濾需要。
輸出引擎輸入流
最后,我們還需要一小段代碼來完成這個解決方案。清單 12 到 16 中的代碼提供了讀取由輸出引擎所寫數(shù)據(jù)的輸入流。事實上,這段代碼有兩個部分:主類是一個從內(nèi)部緩沖區(qū)讀取數(shù)據(jù)的輸入流。與此緊密耦合的是一個輸出流(如清單 17 所示),它把輸出引擎所寫的數(shù)據(jù)填充到內(nèi)部讀緩沖區(qū)。
主輸入流類將用其內(nèi)部輸出流來初始化輸出引擎。然后,每當它的緩沖區(qū)為空時,它會自動執(zhí)行該引擎來接收更多數(shù)據(jù)。輸出引擎將數(shù)據(jù)寫入其輸出流中,這將重新填充輸入流的內(nèi)部緩沖區(qū),以允許需要內(nèi)部緩沖區(qū)數(shù)據(jù)的應用程序高效地讀取數(shù)據(jù)。
清單 12. 輸出引擎輸入流
package org.merlin.io; import java.io.*; /*** An input stream that reads data from an OutputEngine.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>** This program is free software; you can redistribute it and/or* modify it under the terms of the GNU General Public License* as published by the Free Software Foundation; either version 2* of the License, or (at your option) any later version.*/ public class OutputEngineInputStream extends InputStream {private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;private OutputEngine engine;private byte[] buffer;private int index, limit, capacity;private boolean closed, eof;該輸入流的構(gòu)造器獲取一個輸出引擎以從中讀取數(shù)據(jù)和一個可選的緩沖區(qū)大小。該流首先初始化其本身,然后初始化輸出引擎。
清單 13. 構(gòu)造器
public OutputEngineInputStream (OutputEngine engine) throws IOException {this (engine, DEFAULT_INITIAL_BUFFER_SIZE); } public OutputEngineInputStream (OutputEngine engine, int initialBufferSize)throws IOException {this.engine = engine;capacity = initialBufferSize;buffer = new byte[capacity];engine.initialize (new OutputStreamImpl ()); }代碼的主要讀部分是一個相對簡單的基于字節(jié)數(shù)組的輸入流,與?ByteArrayInputStream?類非常相似。然而,每當需要數(shù)據(jù)而該流為空時,它都會調(diào)用輸出引擎的?execute()?方法來重新填寫讀緩沖區(qū)。然后,將這些新數(shù)據(jù)返回給調(diào)用程序。因而,這個類將對輸出引擎所寫的數(shù)據(jù)反復讀取,直到它讀完為止,此時將設置?eof?標志并且該流將返回已到達文件末尾的信息。
清單 14. 讀取數(shù)據(jù)
private byte[] one = new byte[1];public int read () throws IOException {int amount = read (one, 0, 1);return (amount < 0) ? -1 : one[0] & 0xff;}public int read (byte data[], int offset, int length)throws IOException {if (data == null) {throw new NullPointerException ();} else if((offset < 0) || (length < 0) || (offset + length > data.length)) {throw new IndexOutOfBoundsException ();} else if (closed) {throw new IOException ("Stream closed");} else {while (index >= limit) {if (eof)return -1;engine.execute ();}if (limit - index < length)length = limit - index;System.arraycopy (buffer, index, data, offset, length);index += length;return length;}}public long skip (long amount) throws IOException {if (closed) {throw new IOException ("Stream closed");} else if (amount <= 0) {return 0;} else {while (index >= limit) {if (eof)return 0;engine.execute ();}if (limit - index < amount)amount = limit - index;index += (int) amount;return amount;}}public int available () throws IOException {if (closed) {throw new IOException ("Stream closed");} else {return limit - index;}}當操作數(shù)據(jù)的應用程序關(guān)閉該流時,它調(diào)用輸出引擎的?finish()?方法,以便可以釋放其正在使用的任何資源。
清單 15. 釋放資源
public void close () throws IOException {if (!closed) {closed = true;engine.finish ();}}當輸出引擎將數(shù)據(jù)寫入其輸出流時,調(diào)用?writeImpl()?方法。它將這些數(shù)據(jù)復制到讀緩沖區(qū),并更新讀限制索引;這將使新數(shù)據(jù)可自動地用于讀方法。
在單次循環(huán)中,如果輸出引擎寫入的數(shù)據(jù)比緩沖區(qū)中可以保存的數(shù)據(jù)多,則緩沖區(qū)的容量會翻倍。然而,這不能頻繁發(fā)生;緩沖區(qū)應該快速擴展到足夠的大小,以便進行狀態(tài)穩(wěn)定的操作。
清單 16. writeImpl() 方法
private void writeImpl (byte[] data, int offset, int length) {if (index >= limit)index = limit = 0;if (limit + length > capacity) {capacity = capacity * 2 + length;byte[] tmp = new byte[capacity];System.arraycopy (buffer, index, tmp, 0, limit - index);buffer = tmp;limit -= index;index = 0;}System.arraycopy (data, offset, buffer, limit, length);limit += length;}下面清單 17 中顯示的內(nèi)部輸出流實現(xiàn)表示了一個流將數(shù)據(jù)寫入內(nèi)部輸出流緩沖區(qū)。該代碼驗證參數(shù)都是可接受的,并且如果是這樣的話,它調(diào)用?writeImpl()?方法。
清單 17. 內(nèi)部輸出流實現(xiàn)
private class OutputStreamImpl extends OutputStream {public void write (int datum) throws IOException {one[0] = (byte) datum;write (one, 0, 1);}public void write (byte[] data, int offset, int length)throws IOException {if (data == null) {throw new NullPointerException ();} else if((offset < 0) || (length < 0) || (offset + length > data.length)) {throw new IndexOutOfBoundsException ();} else if (eof) {throw new IOException ("Stream closed");} else {writeImpl (data, offset, length);}}最后,當輸出引擎關(guān)閉其輸出流,表明它已寫入了所有的數(shù)據(jù)時,該輸出流設置輸入流的?eof?標志,表明已經(jīng)讀取了所有的數(shù)據(jù)。
清單 18. 設置輸入流的 eof 標志
public void close () {eof = true;}} }敏感的讀者可能注意到我應該將?writeImpl()?方法的主體直接放在輸出流實現(xiàn)中:內(nèi)部類有權(quán)訪問所有包含類的私有成員。然而,對這些字段的內(nèi)部類訪問比由包含類的直接方法的訪問在效率方面稍許差一些。所以,考慮到效率以及為了使類之間的相關(guān)性最小化,我使用額外的助手方法。
回頁首
應用工程解決方案:在讀取期間壓縮數(shù)據(jù)
清單 19 演示了這個類框架的使用來解決我最初的問題:在我讀取數(shù)據(jù)時壓縮它們。該解決方案歸結(jié)為創(chuàng)建一個與輸入流相關(guān)聯(lián)的IOStreamEngine?和一個?GZIPOutputStreamFactory?,然后將?OutputEngineInputStream?與這個?GZIPOutputStreamFactory?相連。自動執(zhí)行流的初始化和連接,然后可以直接從結(jié)果流中讀取壓縮數(shù)據(jù)。當處理完成且關(guān)閉流時,輸出引擎自動關(guān)閉,并且它關(guān)閉初始輸入流。
清單 19. 應用工程解決方案
private static InputStream engineCompress (InputStream in)throws IOException {return new OutputEngineInputStream(new IOStreamEngine (in, new GZIPOutputStreamFactory ()));}雖然為解決這類問題而設計的解決方案應該產(chǎn)生十分清晰的代碼,這一點沒有什么可驚奇的,但是通常要充分留意以下教訓:無論問題大小,應用良好的設計技術(shù)都幾乎肯定會產(chǎn)生更為清晰、更便于維護的代碼。
回頁首
測試性能
從效率看,?IOStreamEngine?將數(shù)據(jù)讀入其內(nèi)部緩沖區(qū),然后通過壓縮過濾器將它們寫入?OutputStreamImpl?。這將數(shù)據(jù)直接寫入OutputEngineInputStream?,以便它們可供讀取。總共只執(zhí)行兩次緩沖區(qū)復制,這意味著我應該從管道式流解決方案的緩沖區(qū)復制效率和蠻力解決方案的無線程效率的結(jié)合中獲益。
要測試實際的性能,我編寫了一個簡單的測試工具(請參閱所附?資源中的?test.PerformanceTest?),它使用這三個推薦的解決方案,通過使用一個空過濾器來讀取一塊啞元數(shù)據(jù)。在運行 Java 2 SDK,版本 1.4.0 的 800 MHz Linux 機器上,達到了下列性能:
管道式流解決方案?
15KB:23ms;15MB:22100ms?
蠻力解決方案?
15KB:0.35ms;15MB:745ms?
工程解決方案?
15KB:0.16ms;15MB:73ms
該問題的工程解決方案很明顯比基于標準 Java API 的另兩個方法都更有效。
順便提一下,考慮到如果輸出引擎能夠遵守這樣的約定:在將數(shù)據(jù)寫入其輸出流后,它不修改從中寫入數(shù)據(jù)的數(shù)組而返回,那么我能提供一個只使用一次緩沖區(qū)復制操作的解決方案。可是,輸出引擎很少會遵守這種約定。如果需要,輸出引擎只要通過實現(xiàn)適當?shù)臉擞洺绦蚪涌?#xff0c;就能宣稱它支持這種方式的操作。
回頁首
應用工程解決方案:讀取編碼的字符數(shù)據(jù)
任何可以用“提供對將數(shù)據(jù)反復寫入?OutputStream?的實體的讀訪問權(quán)”表述的問題,都可以用這一框架解決。在這一節(jié)和下一節(jié)中,我們將研究這樣的問題示例及其有效的解決方案。
首先,考慮要讀取 UTF-8 編碼格式的字符流的情況:?InputStreamReader?類讓您將以二進制編碼的字符數(shù)據(jù)作為一系列 Unicode 字符讀取;它表示了從字節(jié)輸入流到字符輸入流的關(guān)口。?OutputStreamWriter?類讓您將一系列二進制編碼格式的 Unicode 字符寫入輸出流;它表示從字符輸出流到字節(jié)輸入流的關(guān)口。?String?類的?getBytes()?方法將字符串轉(zhuǎn)換成經(jīng)編碼的字節(jié)數(shù)組。然而,這些類中沒有一個能直接讓您讀取 UTF-8 編碼格式的字符流。
清單 20 到 24 中的代碼演示了以與?IOStreamEngine?類極其相似的方式使用?OutputEngine?框架的一種解決方案。我們并不是從輸入流讀取和通過輸出流過濾器進行寫操作,而是從字符流讀取,并通過所選的字符進行編碼的?OutputStreamWriter?進行寫操作。
清單 20. 讀取編碼的字符數(shù)據(jù)
package org.merlin.io; import java.io.*; /*** An output engine that copies data from a Reader through* a OutputStreamWriter to the target OutputStream.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>** This program is free software; you can redistribute it and/or* modify it under the terms of the GNU General Public License* as published by the Free Software Foundation; either version 2* of the License, or (at your option) any later version.*/ public class ReaderWriterEngine implements OutputEngine {private static final int DEFAULT_BUFFER_SIZE = 8192;private Reader reader;private String encoding;private char[] buffer;private Writer writer;該類的構(gòu)造器接受要從中讀取的字符流、要使用的編碼以及可選的緩沖區(qū)大小。
清單 21. 構(gòu)造器
public ReaderWriterEngine (Reader in, String encoding) {this (in, encoding, DEFAULT_BUFFER_SIZE);}public ReaderWriterEngine(Reader reader, String encoding, int bufferSize) {this.reader = reader;this.encoding = encoding;buffer = new char[bufferSize];}當該引擎初始化時,它將以所選編碼格式寫字符的?OutputStreamWriter?連接至提供的輸出流。
清單 22. 初始化輸出流寫程序
public void initialize (OutputStream out) throws IOException {if (writer != null) {throw new IOException ("Already initialised");} else {writer = new OutputStreamWriter (out, encoding);}}當執(zhí)行該引擎時,它從輸入字符流中讀取數(shù)據(jù),然后將它們寫入?OutputStreamWriter?,接著 OutputStreamWriter 將它們以所選的編碼格式傳遞給相連的輸出流。至此,該框架使數(shù)據(jù)可供讀取。
清單 23. 讀取數(shù)據(jù)
public void execute () throws IOException {if (writer == null) {throw new IOException ("Not yet initialised");} else {int amount = reader.read (buffer);if (amount < 0) {writer.close ();} else {writer.write (buffer, 0, amount);}}}當引擎執(zhí)行完時,它關(guān)閉其輸入。
清單 24. 關(guān)閉輸入
public void finish () throws IOException {reader.close ();} }在這種與壓縮不同的情況中,Java I/O 包不提供對?OutputStreamWriter?之下的字符編碼類的低級別訪問。因此,這是在 Java 平臺 1.4 之前的發(fā)行版上讀取編碼格式的字符流的唯一有效解決方案。從版本 1.4 開始,?java.nio.charset?包確實提供了與流無關(guān)的字符編碼和譯碼能力。然而,這個包不能滿足我們對基于輸入流的解決方案的要求。
回頁首
應用工程解決方案:讀取序列化的 DOM 文檔
最后,讓我們研究該框架的最后一種用法。清單 25 到 29 中的代碼提供了一個用來讀取序列化格式的 DOM 文檔或文檔子集的解決方案。這一代碼的潛在用途可能是對部分 DOM 文檔執(zhí)行確認性重新解析。
清單 25. 讀取序列化的 DOM 文檔
package org.merlin.io; import java.io.*; import java.util.*; import org.w3c.dom.*; import org.w3c.dom.traversal.*; /*** An output engine that serializes a DOM tree using a specified* character encoding to the target OutputStream.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>** This program is free software; you can redistribute it and/or* modify it under the terms of the GNU General Public License* as published by the Free Software Foundation; either version 2* of the License, or (at your option) any later version.*/ public class DOMSerializerEngine implements OutputEngine {private NodeIterator iterator;private String encoding;private OutputStreamWriter writer;構(gòu)造器獲取要在上面進行循環(huán)的 DOM 節(jié)點,或預先構(gòu)造的節(jié)點迭代器(這是 DOM 2 的一部分),以及一個用于序列化格式的編碼。
清單 26. 構(gòu)造器
public DOMSerializerEngine (Node root) {this (root, "UTF-8");}public DOMSerializerEngine (Node root, String encoding) {this (getIterator (root), encoding);}private static NodeIterator getIterator (Node node) {DocumentTraversal dt= (DocumentTraversal)(node.getNodeType () ==Node.DOCUMENT_NODE) ? node : node.getOwnerDocument ();return dt.createNodeIterator (node, NodeFilter.SHOW_ALL, null, false);}public DOMSerializerEngine (NodeIterator iterator, String encoding) {this.iterator = iterator;this.encoding = encoding;}初始化期間,該引擎將適當?shù)?OutputStreamWriter?連接至目標輸出流。
清單 27. initialize() 方法
public void initialize (OutputStream out) throws IOException {if (writer != null) {throw new IOException ("Already initialised");} else {writer = new OutputStreamWriter (out, encoding);}}在執(zhí)行階段,該引擎從節(jié)點迭代器中獲得下一個節(jié)點,然后將其序列化至?OutputStreamWriter?。當獲取了所有節(jié)點后,引擎關(guān)閉它的流。
清單 28. execute() 方法
public void execute () throws IOException {if (writer == null) {throw new IOException ("Not yet initialised");} else {Node node = iterator.nextNode ();closeElements (node);if (node == null) {writer.close ();} else {writeNode (node);writer.flush ();}}}當該引擎關(guān)閉時,沒有要釋放的資源。
清單 29. 關(guān)閉
public void finish () throws IOException {}// private void closeElements (Node node) throws IOException ...// private void writeNode (Node node) throws IOException ... }序列化每個節(jié)點的其它內(nèi)部細節(jié)不太有趣;這一過程主要涉及根據(jù)節(jié)點的類型和 XML 1.0 規(guī)范寫出節(jié)點,所以我將在本文中省略這一部分的代碼。請參閱附帶的?源代碼,獲取完整的詳細信息。
回頁首
結(jié)束語
我所提供的是一個有用的框架,它利用標準輸入流 API 讓您能有效讀取由只能寫入輸出流的系統(tǒng)產(chǎn)生的數(shù)據(jù)。它讓我們讀取經(jīng)壓縮或編碼的數(shù)據(jù)及序列化文檔等。雖然可以使用標準 Java API 實現(xiàn)這一功能,但使用這些類的效率根本不行。應該充分注意到,這種解決方案比最簡單的蠻力解決方案更有效(即使在數(shù)據(jù)不大的情況下)。將數(shù)據(jù)寫入?ByteArrayOutputStream?以便進行后續(xù)處理的任何應用程序都可能從這一框架中受益。
字節(jié)數(shù)組流的拙劣性能和管道式流難以置信的蹩腳性能,實際上都是我下一篇文章的主題。在那篇文章中,我將研究重新實現(xiàn)這些類,并比這些類的原創(chuàng)者更加關(guān)注它們的性能。只要 API 約定稍微寬松一點,性能就可能改進一百倍了。
我討厭洗碗。不過,正如大多數(shù)我自認為是較好(雖然常常還是微不足道)的想法一樣,這些類背后的想法都是在我洗碗時冒出來的。我時常發(fā)現(xiàn)撇開實際代碼,回頭看看并且把問題的范圍考慮得更廣些,可能會得出一個更好的解決方案,它最終為您提供的方法可能比您找出的容易方法更好。這些解決方案常常會產(chǎn)生更清晰、更有效而且更可維護的代碼。
我真的擔心我們有了洗碗機的那一天。
?
徹底轉(zhuǎn)變流,第 2 部分:優(yōu)化 Java 內(nèi)部 I/O
http://www.ibm.com/developerworks/cn/java/j-io2/
在?本系列的第一篇文章中,您學習了解決從只能寫出數(shù)據(jù)的源讀取數(shù)據(jù)的問題的一些不同方法。在可能的解決方案中,我們研究了怎樣使用字節(jié)數(shù)組流、管道流以及直接處理該問題的定制框架。定制方法顯然是最有效率的解決方案;但是,分析其它幾種方法有助于看清標準 Java 流的一些問題。具體地說,字節(jié)數(shù)組輸出流并不提供可提供對它的內(nèi)容進行只讀訪問的高效機制,管道流的性能通常很差。
為了處理這些問題,我們將在本文中實現(xiàn)功能同樣齊全的替換類,但在實現(xiàn)時更強調(diào)性能。讓我們先來簡要地討論一下同步問題,因為它與 I/O 流有關(guān)。
同步問題
一般來說,我推薦在不是特別需要同步的情況下避免不必要地使用同步。顯然,如果多個線程需并發(fā)地訪問一個類,那么這個類需確保線程安全。但是,在許多情況下并不需要并發(fā)的訪問,同步成了不必要的開銷。例如,對流的并發(fā)訪問自然是不確定的 ― 您無法預測哪些數(shù)據(jù)被先寫入,也無法預測哪個線程讀了哪些數(shù)據(jù) ― 也就是說,在多數(shù)情況下,對流的并發(fā)訪問是沒用的。所以,對所有的流強制同步是不提供實際好處的花費。如果某個應用程序要求線程安全,那么通過應用程序自己的同步原語可以強制線程安全。
事實上,Collection 類的 API 作出了同樣的選擇:在缺省的情況下,set、list 等等都不是線程安全的。如果應用程序想使用線程安全的 Collection,那么它可以使用?Collections?類來創(chuàng)建一個線程安全的包裝器來包裝非線程安全的 Collection。如果這種作法是有用的,那么應用程序可以使用完全相同的機制來包裝流,以使它線程安全;例如,?OutputStream out = Streams.synchronizedOutputStream (byteStream)?。請參閱附帶的?源代碼中的?Streams?類,這是一個實現(xiàn)的示例。
所以,對于我所認為的多個并發(fā)線程無法使用的類,我沒用同步來為這些類提供線程安全。在您廣泛采用這種方式前,我推薦您研究一下 Java 語言規(guī)范(Java Language Specification)的?Threads and Locks那一章(請參閱?參考資料),以理解潛在的缺陷;具體地說,在未使用同步的情況下無法確保讀寫的順序,所以,對不同步的只讀方法的并發(fā)訪問可能導致意外的行為,盡管這種訪問看起來是無害的。
回頁首
更好的字節(jié)數(shù)組輸出流
當您需要把未知容量的數(shù)據(jù)轉(zhuǎn)儲到內(nèi)存緩沖區(qū)時,?ByteArrayOutputStream?類是使用效果很好的流。當我為以后再次讀取而存儲一些數(shù)據(jù)時,我經(jīng)常使用這個類。但是,使用?toByteArray()?方法來取得對結(jié)果數(shù)據(jù)的讀訪問是很低效的,因為它實際返回的是內(nèi)部字節(jié)數(shù)組的副本。對于小容量的數(shù)據(jù),使用這種方式不會有太大問題;然而,隨著容量增大,這種方式的效率被不必要地降低了。這個類必須復制數(shù)據(jù),因為它不能強制對結(jié)果字節(jié)數(shù)組進行只讀訪問。如果它返回它的內(nèi)部緩沖區(qū),那么在一般的情況下,接收方無法保證該緩沖區(qū)未被同一數(shù)組的另一個接收方并發(fā)地修改。
StringBuffer?類已解決了類似的問題;它提供可寫的字符緩沖區(qū),它還支持高效地返回能從內(nèi)部字符數(shù)組直接讀取的只讀?String?。因為StringBuffer?類控制著對它的內(nèi)部數(shù)組的寫訪問,所以它僅在必要時才復制它的數(shù)組;也就是說,當它導出了?String?且后來調(diào)用程序修改了StringBuffer?的時候。如果沒有發(fā)生這樣的修改,那么任何不必要的復制都不會被執(zhí)行。通過支持能夠強制適當?shù)脑L問控制的字節(jié)數(shù)組的包裝器,新的 I/O 框架以類似的方式解決了這個問題。
我們可以使用相同的通用機制為需要使用標準流 API 的應用程序提供高效的數(shù)據(jù)緩沖和再次讀取。我們的示例給出了可替代ByteArrayOutputStream?類的類,它能高效地導出對內(nèi)部緩沖區(qū)的只讀訪問,方法是返回直接讀取內(nèi)部字節(jié)數(shù)組的只讀?InputStream?。
我們來看一下代碼。清單 1 中的構(gòu)造函數(shù)分配了初始緩沖區(qū),以存儲寫到這個流的數(shù)據(jù)。為了存儲更多的數(shù)據(jù),該緩沖區(qū)將按需自動地擴展。
清單 1. 不同步的字節(jié)數(shù)組輸出流
package org.merlin.io; import java.io.*; /*** An unsynchronized ByteArrayOutputStream alternative that efficiently* provides read-only access to the internal byte array with no * unnecessary copying.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>*/ public class BytesOutputStream extends OutputStream {private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;// internal bufferprivate byte[] buffer;private int index, capacity;// is the stream closed?private boolean closed;// is the buffer shared?private boolean shared;public BytesOutputStream () {this (DEFAULT_INITIAL_BUFFER_SIZE);}public BytesOutputStream (int initialBufferSize) {capacity = initialBufferSize;buffer = new byte[capacity];}清單 2 顯示的是寫方法。這些方法按需擴展內(nèi)部緩沖區(qū),然后把新數(shù)據(jù)復制進來。在擴展內(nèi)部緩沖區(qū)時,我們使緩沖區(qū)的大小增加了一倍再加上存儲新數(shù)據(jù)所需的容量;這樣,為了存儲任何所需的數(shù)據(jù),緩沖區(qū)的容量成指數(shù)地增長。為了提高效率,如果您知道您將寫入的數(shù)據(jù)的預期容量,那么您應該指定相應的初始緩沖區(qū)的大小。?close()?方法只是設置了一個合適的標志。
清單 2. 寫方法
public void write (int datum) throws IOException {if (closed) {throw new IOException ("Stream closed");} else {if (index >= capacity) {// expand the internal buffercapacity = capacity * 2 + 1;byte[] tmp = new byte[capacity];System.arraycopy (buffer, 0, tmp, 0, index);buffer = tmp;// the new buffer is not sharedshared = false;}// store the bytebuffer[index ++] = (byte) datum;}}public void write (byte[] data, int offset, int length) throws IOException {if (data == null) {throw new NullPointerException ();} else if ((offset < 0) || (offset + length > data.length) || (length < 0)) {throw new IndexOutOfBoundsException ();} else if (closed) {throw new IOException ("Stream closed");} else {if (index + length > capacity) {// expand the internal buffercapacity = capacity * 2 + length;byte[] tmp = new byte[capacity];System.arraycopy (buffer, 0, tmp, 0, index);buffer = tmp;// the new buffer is not sharedshared = false;}// copy in the subarraySystem.arraycopy (data, offset, buffer, index, length);index += length;}}public void close () {closed = true;}清單 3 中的字節(jié)數(shù)組抽取方法返回內(nèi)部字節(jié)數(shù)組的副本。因為我們無法防止調(diào)用程序把數(shù)據(jù)寫到結(jié)果數(shù)組,所以我們無法安全地返回對內(nèi)部緩沖區(qū)的直接引用。
清單 3. 轉(zhuǎn)換成字節(jié)數(shù)組
public byte[] toByteArray () {// return a copy of the internal bufferbyte[] result = new byte[index];System.arraycopy (buffer, 0, result, 0, index);return result;}當方法提供對存儲的數(shù)據(jù)的只讀訪問的時候,它們可以安全地高效地直接使用內(nèi)部字節(jié)數(shù)組。清單 4 顯示了兩個這樣的方法。?writeTo()?方法把這個流的內(nèi)容寫到輸出流;它直接從內(nèi)部緩沖區(qū)進行寫操作。?toInputStream()?方法返回了可被高效地讀取數(shù)據(jù)的輸入流。它所返回的BytesInputStream?(這是?ByteArrayInputStream?的非同步替代品。)能直接從我們的內(nèi)部字節(jié)數(shù)組讀取數(shù)據(jù)。在這個方法中,我們還設置了標志,以表示內(nèi)部緩沖區(qū)正被輸入流共享。這一點很重要,因為這樣做可以防止在內(nèi)部緩沖區(qū)正被共享時這個流被修改。
清單 4. 只讀訪問方法
public void writeTo (OutputStream out) throws IOException {// write the internal buffer directlyout.write (buffer, 0, index);}public InputStream toInputStream () {// return a stream reading from the shared internal buffershared = true;return new BytesInputStream (buffer, 0, index);}可能會覆蓋共享數(shù)據(jù)的唯一的一個方法是顯示在清單 5 中的?reset()?方法,該方法清空了這個流。所以,如果?shared?等于 true 且?reset()?被調(diào)用,那么我們創(chuàng)建新的內(nèi)部緩沖區(qū),而不是重新設置寫索引。
清單 5. 重新設置流
public void reset () throws IOException {if (closed) {throw new IOException ("Stream closed");} else {if (shared) {// create a new buffer if it is sharedbuffer = new byte[capacity];shared = false;}// reset indexindex = 0;}} }回頁首
更好的字節(jié)數(shù)組輸入流
用?ByteArrayInputStream?類來提供對內(nèi)存中的二進制數(shù)據(jù)基于流的讀訪問是很理想的。但是,有時候,它的兩個設計特點使我覺得需要一個替代它的類。第一,這個類是同步的;我已講過,對于多數(shù)應用程序來說沒有這個必要。第二,如果在執(zhí)行?mark()?前調(diào)用它所實現(xiàn)的reset()?方法,那么 reset() 將忽略初始讀偏移。這兩點都不是缺陷;但是,它們不一定總是人們所期望的。
清單 6 中的?BytesInputStream?類是不同步的較為普通的字節(jié)數(shù)組輸入流類。
清單 6. 不同步的字節(jié)數(shù)組輸入流
package org.merlin.io; import java.io.*; /*** An unsynchronized ByteArrayInputStream alternative.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>*/ public class BytesInputStream extends InputStream {// buffer from which to readprivate byte[] buffer;private int index, limit, mark;// is the stream closed?private boolean closed;public BytesInputStream (byte[] data) {this (data, 0, data.length);}public BytesInputStream (byte[] data, int offset, int length) {if (data == null) {throw new NullPointerException ();} else if ((offset < 0) || (offset + length > data.length) || (length < 0)) {throw new IndexOutOfBoundsException ();} else {buffer = data;index = offset;limit = offset + length;mark = offset;}}public int read () throws IOException {if (closed) {throw new IOException ("Stream closed");} else if (index >= limit) {return -1; // EOF} else {return buffer[index ++] & 0xff;}}public int read (byte data[], int offset, int length) throws IOException {if (data == null) {throw new NullPointerException ();} else if ((offset < 0) || (offset + length > data.length) || (length < 0)) {throw new IndexOutOfBoundsException ();} else if (closed) {throw new IOException ("Stream closed");} else if (index >= limit) {return -1; // EOF} else {// restrict length to available dataif (length > limit - index)length = limit - index;// copy out the subarraySystem.arraycopy (buffer, index, data, offset, length);index += length;return length;}}public long skip (long amount) throws IOException {if (closed) {throw new IOException ("Stream closed");} else if (amount <= 0) {return 0;} else {// restrict amount to available dataif (amount > limit - index)amount = limit - index;index += (int) amount;return amount;}}public int available () throws IOException {if (closed) {throw new IOException ("Stream closed");} else {return limit - index;}}public void close () {closed = true;}public void mark (int readLimit) {mark = index;}public void reset () throws IOException {if (closed) {throw new IOException ("Stream closed");} else {// reset indexindex = mark;}}public boolean markSupported () {return true;} }回頁首
使用新的字節(jié)數(shù)組流
清單 7 中的代碼演示了怎樣使用新的字節(jié)數(shù)組流來解決第一篇文章中處理的問題(讀一些壓縮形式的數(shù)據(jù)):
清單 7. 使用新的字節(jié)數(shù)組流
public static InputStream newBruteForceCompress (InputStream in) throws IOException {BytesOutputStream sink = new BytesOutputStream ();OutputStream out = new GZIPOutputStream (sink);Streams.io (in, out);out.close ();return sink.toInputStream (); }回頁首
更好的管道流
雖然標準的管道流既安全又可靠,但在性能方面不能令人滿意。幾個因素導致了它的性能問題:
- 對于不同的使用情況,大小為 1024 字節(jié)的內(nèi)部緩沖區(qū)并不都適用;對于大容量的數(shù)據(jù),該緩沖區(qū)太小了。
- 基于數(shù)組的操作只是反復調(diào)用低效的一個字節(jié)一個字節(jié)地復制操作。該操作本身是同步的,從而導致非常嚴重的鎖爭用。
- 如果管道變空或變滿而在這種狀態(tài)改變時一個線程阻塞了,那么,即使僅有一個字節(jié)被讀或?qū)?#xff0c;該線程也被喚醒。在許多情況下,線程將使用這一個字節(jié)并立即再次阻塞,這將導致只做了很少有用的工作。
最后一個因素是 API 提供的嚴格的約定的后果。對于最通用的可能的應用程序中使用的流來說,這種嚴格的約定是必要的。但是,對于管道流實現(xiàn),提供一種更寬松的約定是可能的,這個約定犧牲嚴格性以換取性能的提高:
- 僅當緩沖區(qū)的可用數(shù)據(jù)(對阻塞的讀程序而言)或可用空間(對寫程序而言)達到指定的某個?滯后閾值或發(fā)生異常事件(例如管道關(guān)閉)時,阻塞的讀程序和寫程序才被喚醒。這將提高性能,因為僅當線程能完成適度的工作量時它們才被喚醒。
- 只有一個線程可以從管道讀取數(shù)據(jù),只有一個線程可以把數(shù)據(jù)寫到管道。否則,管道無法可靠地確定讀程序線程或?qū)懗绦蚓€程何時意外死亡。
這個約定可完全適合典型應用程序情形中獨立的讀程序線程和寫程序線程;需要立即喚醒的應用程序可以使用零滯后級別。我們將在后面看到,這個約定的實現(xiàn)的操作速度比標準 API 流的速度快兩個數(shù)量級(100 倍)。
我們可以使用幾個可能的 API 中的一個來開發(fā)這些管道流:我們可以模仿標準類,顯式地連接兩個流;我們也可以開發(fā)一個?Pipe?類并從這個類抽取輸出流和輸入流。我們不使用這兩種方式而是使用更簡單的方式:創(chuàng)建一個?PipeInputStream?,然后抽取關(guān)聯(lián)的輸出流。
這些流的一般操作如下:
- 我們把內(nèi)部數(shù)組用作環(huán)緩沖區(qū)(請看圖 1):這個數(shù)組中維護著一個讀索引和一個寫索引;數(shù)據(jù)被寫到寫索引所指的位置,數(shù)據(jù)從讀索引所指的位置被讀取;當兩個索引到達緩沖區(qū)末尾時,它們回繞到緩沖區(qū)起始點。任一個索引不能超越另一個索引。當寫索引到達讀索引時,管道是滿的,不能再寫任何數(shù)據(jù)。當讀索引到達寫索引時,管道是空的,不能再讀任何數(shù)據(jù)。
- 同步被用來確保兩個協(xié)作線程看到管道狀態(tài)的最新值。Java 語言規(guī)范對內(nèi)存訪問的順序的規(guī)定是很寬容的,因此,無法使用無鎖緩沖技術(shù)。
圖 1. 環(huán)緩沖區(qū)
在下面的代碼清單中給出的是實現(xiàn)這些管道流的代碼。清單 8 顯示了這個類所用的構(gòu)造函數(shù)和變量。您可以從這個?InputStream?中抽取相應的OutputStream?(請看清單 17 中的代碼)。在構(gòu)造函數(shù)中您可以指定內(nèi)部緩沖區(qū)的大小和滯后級別;這是緩沖區(qū)容量的一部分,在相應的讀程序線程或?qū)懗绦蚓€程被立即喚醒前必須被使用或可用。我們維護兩個變量,?reader?和?writer?,它們與讀程序線程和寫程序線程相對應。我們用它們來發(fā)現(xiàn)什么時候一個線程已死亡而另一個線程仍在訪問流。
清單 8. 一個替代的管道流實現(xiàn)
package org.merlin.io; import java.io.*; /*** An efficient connected stream pair for communicating between* the threads of an application. This provides a less-strict contract* than the standard piped streams, resulting in much-improved* performance. Also supports non-blocking operation.** @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>*/ public class PipeInputStream extends InputStream {// default valuesprivate static final int DEFAULT_BUFFER_SIZE = 8192;private static final float DEFAULT_HYSTERESIS = 0.75f;private static final int DEFAULT_TIMEOUT_MS = 1000;// flag indicates whether method applies to reader or writerprivate static final boolean READER = false, WRITER = true;// internal pipe bufferprivate byte[] buffer;// read/write indexprivate int readx, writex;// pipe capacity, hysteresis levelprivate int capacity, level;// flagsprivate boolean eof, closed, sleeping, nonBlocking;// reader/writer threadprivate Thread reader, writer;// pending exceptionprivate IOException exception;// deadlock-breaking timeoutprivate int timeout = DEFAULT_TIMEOUT_MS;public PipeInputStream () {this (DEFAULT_BUFFER_SIZE, DEFAULT_HYSTERESIS);}public PipeInputStream (int bufferSize) {this (bufferSize, DEFAULT_HYSTERESIS);}// e.g., hysteresis .75 means sleeping reader/writer is not// immediately woken until the buffer is 75% full/emptypublic PipeInputStream (int bufferSize, float hysteresis) {if ((hysteresis < 0.0) || (hysteresis > 1.0))throw new IllegalArgumentException ("Hysteresis: " + hysteresis);capacity = bufferSize;buffer = new byte[capacity];level = (int) (bufferSize * hysteresis);}清單 9 中的配置方法允許您配置流的超時值和非阻塞模式。超時值的單位是毫秒,它表示阻塞的線程在過了這段時間后將被自動喚醒;這對于打破在一個線程死亡的情況下可能發(fā)生的死鎖是必要的。在非阻塞模式中,如果線程阻塞,那么?InterruptedIOException?將被拋出。
清單 9. 管道配置
public void setTimeout (int ms) {this.timeout = ms;}public void setNonBlocking (boolean nonBlocking) {this.nonBlocking = nonBlocking;}清單 10 中的讀方法都遵循相當標準的模式:如果我們還沒有讀線程的引用,那么我們先取得它,然后我們驗證輸入?yún)?shù),核對流未被關(guān)閉或沒有異常待處理,確定可以讀取多少數(shù)據(jù),最后把數(shù)據(jù)從內(nèi)部的環(huán)緩沖區(qū)復制到讀程序的緩沖區(qū)。清單 12 中的?checkedAvailable()?方法在返回前自動地等待,直到出現(xiàn)一些可用的數(shù)據(jù)或流被關(guān)閉。
清單 10. 讀數(shù)據(jù)
private byte[] one = new byte[1];public int read () throws IOException {// read 1 byteint amount = read (one, 0, 1);// return EOF / the bytereturn (amount < 0) ? -1 : one[0] & 0xff;}public synchronized int read (byte data[], int offset, int length) throws IOException {// take a reference to the reader threadif (reader == null)reader = Thread.currentThread ();// check parametersif (data == null) {throw new NullPointerException ();} else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { // check indicesthrow new IndexOutOfBoundsException ();} else {// throw an exception if the stream is closedclosedCheck ();// throw any pending exceptionexceptionCheck ();if (length <= 0) {return 0;} else {// wait for some data to become available for readingint available = checkedAvailable (READER);// return -1 on EOFif (available < 0)return -1;// calculate amount of contiguous data in pipe bufferint contiguous = capacity - (readx % capacity);// calculate how much we will read this timeint amount = (length > available) ? available : length;if (amount > contiguous) {// two array copies needed if data wrap around the buffer endSystem.arraycopy (buffer, readx % capacity, data, offset, contiguous);System.arraycopy (buffer, 0, data, offset + contiguous, amount - contiguous);} else {// otherwise, one array copy neededSystem.arraycopy (buffer, readx % capacity, data, offset, amount);}// update indices with amount of data readprocessed (READER, amount);// return amount readreturn amount;}}}public synchronized long skip (long amount) throws IOException {// take a reference to the reader threadif (reader == null)reader = Thread.currentThread ();// throw an exception if the stream is closedclosedCheck ();// throw any pending exceptionexceptionCheck ();if (amount <= 0) {return 0;} else {// wait for some data to become available for skippingint available = checkedAvailable (READER);// return 0 on EOFif (available < 0)return 0;// calculate how much we will skip this timeif (amount > available)amount = available;// update indices with amount of data skippedprocessed (READER, (int) amount);// return amount skippedreturn amount;}}當數(shù)據(jù)從這個管道被讀取或數(shù)據(jù)被寫到這個管道時,清單 11 中的方法被調(diào)用。該方法更新有關(guān)的索引,如果管道達到它的滯后級別,該方法自動地喚醒阻塞的線程。
清單 11. 更新索引
private void processed (boolean rw, int amount) {if (rw == READER) {// update read index with amount readreadx = (readx + amount) % (capacity * 2);} else {// update write index with amount writtenwritex = (writex + amount) % (capacity * 2);}// check whether a thread is sleeping and we have reached the// hysteresis thresholdif (sleeping && (available (!rw) >= level)) {// wake sleeping threadnotify ();sleeping = false;}}在管道有可用空間或可用數(shù)據(jù)(取決于?rw?參數(shù))前,清單 12 中的?checkedAvailable()?方法一直等待,然后把空間的大小或數(shù)據(jù)的多少返回給調(diào)用程序。在這個方法內(nèi)還核對流未被關(guān)閉、管道未被破壞等。
清單 12. 檢查可用性
public synchronized int available () throws IOException {// throw an exception if the stream is closedclosedCheck ();// throw any pending exceptionexceptionCheck ();// determine how much can be readint amount = available (READER);// return 0 on EOF, otherwise the amount readablereturn (amount < 0) ? 0 : amount;}private int checkedAvailable (boolean rw) throws IOException {// always called from synchronized(this) methodtry {int available;// loop while no data can be read/writtenwhile ((available = available (rw)) == 0) {if (rw == READER) { // reader// throw any pending exceptionexceptionCheck ();} else { // writer// throw an exception if the stream is closedclosedCheck ();}// throw an exception if the pipe is brokenbrokenCheck (rw);if (!nonBlocking) { // blocking mode// wake any sleeping threadif (sleeping)notify ();// sleep for timeout ms (in case of peer thread death)sleeping = true;wait (timeout);// timeout means that hysteresis may not be obeyed} else { // non-blocking mode// throw an InterruptedIOExceptionthrow new InterruptedIOException ("Pipe " + (rw ? "full" : "empty"));}}return available;} catch (InterruptedException ex) {// rethrow InterruptedException as InterruptedIOExceptionthrow new InterruptedIOException (ex.getMessage ());}}private int available (boolean rw) {// calculate amount of space used in pipeint used = (writex + capacity * 2 - readx) % (capacity * 2);if (rw == WRITER) { // writer// return amount of space available for writing return capacity - used;} else { // reader// return amount of data in pipe or -1 at EOFreturn (eof && (used == 0)) ? -1 : used;}}清單 13 中的方法關(guān)閉這個流;該方法還提供對讀程序或?qū)懗绦蜿P(guān)閉流的支持。阻塞的線程被自動喚醒,該方法還檢查各種其它情況是否正常。
清單 13. 關(guān)閉流
public void close () throws IOException {// close the read end of this pipeclose (READER);}private synchronized void close (boolean rw) throws IOException {if (rw == READER) { // reader// set closed flagclosed = true;} else if (!eof) { // writer// set eof flageof = true;// check if data remain unreadif (available (READER) > 0) {// throw an exception if the reader has already closed the pipeclosedCheck ();// throw an exception if the reader thread has diedbrokenCheck (WRITER);}}// wake any sleeping threadif (sleeping) {notify ();sleeping = false;}}清單 14 中的方法檢查這個流的狀態(tài)。如果有異常待處理,那么流被關(guān)閉或管道被破壞(也就是說,讀程序線程或?qū)懗绦蚓€程已死亡),異常被拋出。
清單 14. 檢查流狀態(tài)
private void exceptionCheck () throws IOException {// throw any pending exceptionif (exception != null) {IOException ex = exception;exception = null;throw ex; // could wrap ex in a local exception}}private void closedCheck () throws IOException {// throw an exception if the pipe is closedif (closed)throw new IOException ("Stream closed");}private void brokenCheck (boolean rw) throws IOException {// get a reference to the peer threadThread thread = (rw == WRITER) ? reader : writer;// throw an exception if the peer thread has diedif ((thread != null) && !thread.isAlive ())throw new IOException ("Broken pipe");}當數(shù)據(jù)被寫入這個管道時,清單 15 中的方法被調(diào)用。總的來說,它類似于讀方法:我們先取得寫程序線程的副本,然后檢查流是否被關(guān)閉,接著進入把數(shù)據(jù)復制到管道的循環(huán)。和前面一樣,該方法使用?checkedAvailable()?方法,checkedAvailable() 自動阻塞,直到管道中有可用的容量。
清單 15. 寫數(shù)據(jù)
private synchronized void writeImpl (byte[] data, int offset, int length) throws IOException {// take a reference to the writer threadif (writer == null)writer = Thread.currentThread ();// throw an exception if the stream is closedif (eof || closed) {throw new IOException ("Stream closed");} else {int written = 0;try {// loop to write all the datado {// wait for space to become available for writingint available = checkedAvailable (WRITER);// calculate amount of contiguous space in pipe bufferint contiguous = capacity - (writex % capacity);// calculate how much we will write this timeint amount = (length > available) ? available : length;if (amount > contiguous) {// two array copies needed if space wraps around the buffer endSystem.arraycopy (data, offset, buffer, writex % capacity, contiguous);System.arraycopy (data, offset + contiguous, buffer, 0, amount - contiguous);} else {// otherwise, one array copy neededSystem.arraycopy (data, offset, buffer, writex % capacity, amount);}// update indices with amount of data writtenprocessed (WRITER, amount);// update amount written by this methodwritten += amount;} while (written < length);// data successfully written} catch (InterruptedIOException ex) {// write operation was interrupted; set the bytesTransferred// exception field to reflect the amount of data writtenex.bytesTransferred = written;// rethrow exceptionthrow ex;}}}如清單 16 所示,這個管道流實現(xiàn)的特點之一是寫程序可設置一個被傳遞給讀程序的異常。
清單 16. 設置異常
private synchronized void setException (IOException ex) throws IOException {// fail if an exception is already pendingif (exception != null)throw new IOException ("Exception already set: " + exception);// throw an exception if the pipe is brokenbrokenCheck (WRITER);// take a reference to the pending exceptionthis.exception = ex;// wake any sleeping threadif (sleeping) {notify ();sleeping = false;}}清單 17 給出這個管道的有關(guān)輸出流的代碼。?getOutputStream()?方法返回?OutputStreamImpl?,OutputStreamImpl 是使用前面給出的方法來把數(shù)據(jù)寫到內(nèi)部管道的輸出流。OutputStreamImpl 類繼承了?OutputStreamEx?,OutputStreamEx 是允許為讀線程設置異常的輸出流類的擴展。
清單 17. 輸出流
public OutputStreamEx getOutputStream () {// return an OutputStreamImpl associated with this pipereturn new OutputStreamImpl ();}private class OutputStreamImpl extends OutputStreamEx {private byte[] one = new byte[1];public void write (int datum) throws IOException {// write one byte using internal arrayone[0] = (byte) datum;write (one, 0, 1);}public void write (byte[] data, int offset, int length) throws IOException {// check parametersif (data == null) {throw new NullPointerException ();} else if ((offset < 0) || (offset + length > data.length) || (length < 0)) {throw new IndexOutOfBoundsException ();} else if (length > 0) {// call through to writeImpl()PipeInputStream.this.writeImpl (data, offset, length);}}public void close () throws IOException {// close the write end of this pipePipeInputStream.this.close (WRITER);}public void setException (IOException ex) throws IOException {// set a pending exceptionPipeInputStream.this.setException (ex);}}// static OutputStream extension with setException() methodpublic static abstract class OutputStreamEx extends OutputStream {public abstract void setException (IOException ex) throws IOException;} }回頁首
使用新的管道流
清單 18 演示了怎樣使用新的管道流來解決上一篇文章中的問題。請注意,寫程序線程中出現(xiàn)的任何異常均可在流中被傳遞。
清單 18. 使用新的管道流
public static InputStream newPipedCompress (final InputStream in) throws IOException {PipeInputStream source = new PipeInputStream ();final PipeInputStream.OutputStreamEx sink = source.getOutputStream ();new Thread () {public void run () {try {GZIPOutputStream gzip = new GZIPOutputStream (sink);Streams.io (in, gzip);gzip.close ();} catch (IOException ex) {try {sink.setException (ex);} catch (IOException ignored) {}}}}.start ();return source; }回頁首
性能結(jié)果
在下面的表中顯示的是這些新的流和標準流的性能,測試環(huán)境是運行 Java 2 SDK,v1.4.0 的 800MHz Linux 機器。性能測試程序與我在上一篇文章中用的相同:
管道流?
15KB:21ms;15MB:20675ms?
新的管道流?
15KB:0.68ms;15MB:158ms?
字節(jié)數(shù)組流?
15KB:0.31ms;15MB:745ms?
新的字節(jié)數(shù)組流?
15KB:0.26ms;15MB:438ms
與上一篇文章中的性能差異只反映了我的機器中不斷變化的環(huán)境負載。您可以從這些結(jié)果中看到,在大容量數(shù)據(jù)方面,新的管道流的性能遠好于蠻力解決方案;但是,新的管道流的速度仍然只有我們分析的工程解決方案的速度的一半左右。顯然,在現(xiàn)代的 Java 虛擬機中使用多個線程的開銷遠比以前小得多。
回頁首
結(jié)束語
我們分析了兩組可替代標準 Java API 的流的流:?BytesOutputStream?和?BytesInputStream?是字節(jié)數(shù)組流的非同步替代者。因為這些類的預期的用例涉及單個線程的訪問,所以不采用同步是合理的選擇。實際上,執(zhí)行時間的縮短(最多可縮短 40%)很可能與同步的消滅沒有多大關(guān)系;性能得到提高的主要原因是在提供只讀訪問時避免了不必要的復制。第二個示例?PipeInputStream?可替代管道流;為了減少超過 99% 的執(zhí)行時間,這個流使用寬松的約定、改進的緩沖區(qū)大小和基于數(shù)組的操作。在這種情況下無法使用不同步的代碼;Java 語言規(guī)范排除了可靠地執(zhí)行這種代碼的可能性,否則,在理論上是可以實現(xiàn)最少鎖定的管道。
字節(jié)數(shù)組流和管道流是基于流的應用程序內(nèi)部通信的主要選擇。雖然新的 I/O API 提供了一些其它選擇,但是許多應用程序和 API 仍然依賴標準流,而且對于這些特殊用途來說,新的 I/O API 并不一定有更高的效率。通過適當?shù)販p少同步的使用、有效地采用基于數(shù)組的操作以及最大程度地減少不必要的復制,性能結(jié)果得到了很大的提高,從而提供了完全適應標準流框架的更高效的操作。在應用程序開發(fā)的其它領(lǐng)域中采用相同的步驟往往能取得類似地性能提升。
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4076648.html
總結(jié)
- 上一篇: RabbitMQ和kafka从几个角度简
- 下一篇: Java 7之多线程- Semaphor