生活随笔
收集整理的這篇文章主要介紹了
SequenceFile文件
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
? ??SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。目前,也有不少人在該文件的基礎之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進行合并成一個大文件,同時對這些小文件的位置信息構建索引。不過,這類解決方案還涉及到Hadoop的另一種文件格式——MapFile文件。SequenceFile文件并不保證其存儲的key-value數據是按照key的某個順序存儲的,同時不支持append操作。
??????在SequenceFile文件中,每一個key-value被看做是一條記錄(Record),因此基于Record的壓縮策略,SequenceFile文件可支持三種壓縮類型(SequenceFile.CompressionType):
NONE: 對records不進行壓縮;
RECORD: 僅壓縮每一個record中的value值;
BLOCK: 將一個block中的所有records壓縮在一起;
那么,基于這三種壓縮類型,Hadoop提供了對應的三種類型的Writer:
SequenceFile.Writer? 寫入時不壓縮任何的key-value對(Record);
?
[java]?view plaincopy
public?static?class?Writer?implements?java.io.Closeable?{????...????????void?init(Path?name,?Configuration?conf,?FSDataOutputStream?out,?Class?keyClass,?Class?valClass,?boolean?compress,?CompressionCodec?codec,?Metadata?metadata)?throws?IOException?{????????this.conf?=?conf;????????this.out?=?out;????????this.keyClass?=?keyClass;????????this.valClass?=?valClass;????????this.compress?=?compress;????????this.codec?=?codec;????????this.metadata?=?metadata;??????????????????????SerializationFactory?serializationFactory?=?new?SerializationFactory(conf);????????this.keySerializer?=?serializationFactory.getSerializer(keyClass);????????this.keySerializer.open(buffer);????????this.uncompressedValSerializer?=?serializationFactory.getSerializer(valClass);????????this.uncompressedValSerializer.open(buffer);??????????????????????if?(this.codec?!=?null)?{??????????ReflectionUtils.setConf(this.codec,?this.conf);??????????this.compressor?=?CodecPool.getCompressor(this.codec);??????????this.deflateFilter?=?this.codec.createOutputStream(buffer,?compressor);??????????this.deflateOut?=?new?DataOutputStream(new?BufferedOutputStream(deflateFilter));??????????this.compressedValSerializer?=?serializationFactory.getSerializer(valClass);??????????this.compressedValSerializer.open(deflateOut);????????}??????}??????????????public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{????????if?(key.getClass()?!=?keyClass)??????????throw?new?IOException("wrong?key?class:?"+key.getClass().getName()?+"?is?not?"+keyClass);????????????????if?(val.getClass()?!=?valClass)??????????throw?new?IOException("wrong?value?class:?"+val.getClass().getName()?+"?is?not?"+valClass);??????????buffer.reset();????????????????keySerializer.serialize(key);????????int?keyLength?=?buffer.getLength();????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);????????????????if?(compress)?{??????????deflateFilter.resetState();??????????compressedValSerializer.serialize(val);??????????deflateOut.flush();??????????deflateFilter.finish();????????}?else?{??????????????????uncompressedValSerializer.serialize(val);????????}????????????????checkAndWriteSync();??????????????????????????????????????out.writeInt(buffer.getLength());?????????????????????????out.writeInt(keyLength);??????????????????????????????????out.write(buffer.getData(),?0,?buffer.getLength());?????}????????????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,?int?keyLength,?ValueBytes?val)?throws?IOException?{????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?keyLength);??????????int?valLength?=?val.getSize();??????????checkAndWriteSync();??????????????????????out.writeInt(keyLength+valLength);????????????????out.writeInt(keyLength);??????????????????????????out.write(keyData,?keyOffset,?keyLength);?????????val.writeUncompressedBytes(out);????????????????}????...????}?? ?
SequenceFile.RecordCompressWriter寫入時只壓縮key-value對(Record)中的value;
?
[java]?view plaincopy
static?class?RecordCompressWriter?extends?Writer?{??...???????public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{????????if?(key.getClass()?!=?keyClass)??????????throw?new?IOException("wrong?key?class:?"+key.getClass().getName()?+"?is?not?"+keyClass);????????????????if?(val.getClass()?!=?valClass)??????????throw?new?IOException("wrong?value?class:?"+val.getClass().getName()?+"?is?not?"+valClass);??????????buffer.reset();????????????????keySerializer.serialize(key);????????int?keyLength?=?buffer.getLength();????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);????????????????deflateFilter.resetState();????????compressedValSerializer.serialize(val);????????deflateOut.flush();????????deflateFilter.finish();????????????????checkAndWriteSync();??????????????????????????????????????out.writeInt(buffer.getLength());?????????????????????????out.writeInt(keyLength);??????????????????????????????????out.write(buffer.getData(),?0,?buffer.getLength());?????}????????????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,??????????int?keyLength,?ValueBytes?val)?throws?IOException?{??????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?keyLength);??????????int?valLength?=?val.getSize();????????????????checkAndWriteSync();??????????????????????????????out.writeInt(keyLength+valLength);????????????????out.writeInt(keyLength);??????????????????????????out.write(keyData,?keyOffset,?keyLength);?????????val.writeCompressedBytes(out);??????????????????}??????????}?????...??}?? SequenceFile.BlockCompressWriter 寫入時將一批key-value對(Record)壓縮成一個Block;
?
[java]?view plaincopy
static?class?BlockCompressWriter?extends?Writer?{??...???????void?init(int?compressionBlockSize)?throws?IOException?{????????this.compressionBlockSize?=?compressionBlockSize;????????keySerializer.close();????????keySerializer.open(keyBuffer);????????uncompressedValSerializer.close();????????uncompressedValSerializer.open(valBuffer);??????}????????????????private?synchronized?void?writeBuffer(DataOutputBuffer?uncompressedDataBuffer)?throws?IOException?{????????deflateFilter.resetState();????????buffer.reset();????????deflateOut.write(uncompressedDataBuffer.getData(),?0,?uncompressedDataBuffer.getLength());????????deflateOut.flush();????????deflateFilter.finish();????????????????WritableUtils.writeVInt(out,?buffer.getLength());????????out.write(buffer.getData(),?0,?buffer.getLength());??????}????????????????public?synchronized?void?sync()?throws?IOException?{????????if?(noBufferedRecords?>?0)?{??????????super.sync();????????????????????????????WritableUtils.writeVInt(out,?noBufferedRecords);????????????????????????????writeBuffer(keyLenBuffer);??????????writeBuffer(keyBuffer);????????????????????????????writeBuffer(valLenBuffer);??????????writeBuffer(valBuffer);????????????????????????????out.flush();????????????????????????????keyLenBuffer.reset();??????????keyBuffer.reset();??????????valLenBuffer.reset();??????????valBuffer.reset();??????????noBufferedRecords?=?0;????????}??????????????}????????????public?synchronized?void?append(Object?key,?Object?val)?throws?IOException?{????????if?(key.getClass()?!=?keyClass)??????????throw?new?IOException("wrong?key?class:?"+key+"?is?not?"+keyClass);????????????????if?(val.getClass()?!=?valClass)??????????throw?new?IOException("wrong?value?class:?"+val+"?is?not?"+valClass);????????????????int?oldKeyLength?=?keyBuffer.getLength();????????keySerializer.serialize(key);????????int?keyLength?=?keyBuffer.getLength()?-?oldKeyLength;????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed:?"?+?key);????????WritableUtils.writeVInt(keyLenBuffer,?keyLength);????????????????int?oldValLength?=?valBuffer.getLength();????????uncompressedValSerializer.serialize(val);????????int?valLength?=?valBuffer.getLength()?-?oldValLength;????????WritableUtils.writeVInt(valLenBuffer,?valLength);??????????????????????++noBufferedRecords;??????????????????????int?currentBlockSize?=?keyBuffer.getLength()?+?valBuffer.getLength();??????????????if?(currentBlockSize?>=?compressionBlockSize)?{??????????sync();????????}??????}????????????????public?synchronized?void?appendRaw(byte[]?keyData,?int?keyOffset,?int?keyLength,?ValueBytes?val)?throws?IOException?{????????????????if?(keyLength?<?0)??????????throw?new?IOException("negative?length?keys?not?allowed");??????????int?valLength?=?val.getSize();??????????????????????WritableUtils.writeVInt(keyLenBuffer,?keyLength);????????keyBuffer.write(keyData,?keyOffset,?keyLength);????????WritableUtils.writeVInt(valLenBuffer,?valLength);????????val.writeUncompressedBytes(valBuffer);????????????????++noBufferedRecords;????????????????int?currentBlockSize?=?keyBuffer.getLength()?+?valBuffer.getLength();?????????if?(currentBlockSize?>=?compressionBlockSize)?{??????????sync();????????}??????}??????????}?????...??}?? ?????源碼中,block的大小compressionBlockSize默認值為1000000,也可通過配置參數io.seqfile.compress.blocksize來指定。
?
?? 根據三種壓縮算法,共有三種類型的SequenceFile文件格式:
1). Uncompressed SequenceFile
????
?
?
2). Record-Compressed SequenceFile
3). Block-Compressed SequenceFile
?
轉載于:https://www.cnblogs.com/mfryf/p/7072446.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的SequenceFile文件的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。