mapreduce原理_Hbase Bulkload 原理面试必备
當(dāng)需要大批量的向Hbase導(dǎo)入數(shù)據(jù)時(shí),我們可以使用Hbase Bulkload的方式,這種方式是先生成Hbase的底層存儲(chǔ)文件 HFile,然后直接將這些 HFile 移動(dòng)到Hbase的存儲(chǔ)目錄下。它相比調(diào)用Hbase 的 put 接口添加數(shù)據(jù),處理效率更快并且對(duì)Hbase 運(yùn)行影響更小。
下面假設(shè)我們有一個(gè) CSV 文件,是存儲(chǔ)用戶購買記錄的。它一共有三列, order_id,consumer,product。我們需要將這個(gè)文件導(dǎo)入到Hbase里,其中 order_id 作為Hbase 的 row key。
| 12345 | bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=$'\x01'-Dimporttsv.columns=HBASE_ROW_KEY,cf:consumer,cf:product -Dimporttsv.bulk.output= bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles |
可以看到批量導(dǎo)入只需要上述兩部, 生成 HFile 文件 和 加載 HFile 文件。下面我們來深入了解其原理
底層實(shí)現(xiàn)原理
生成 HFile 是調(diào)用了 MapReduce 來實(shí)現(xiàn)的。它有兩種實(shí)現(xiàn)方式,雖然最后生成的 HFile 是一樣的,但中間過程卻是不一樣。現(xiàn)在我們先回顧下 MapReduce 的編程模型,主要分為下列組件:
InputFormat:負(fù)責(zé)讀取數(shù)據(jù)源,并且將數(shù)據(jù)源切割成多個(gè)分片,分片的數(shù)目等于Map的數(shù)目
Mapper:負(fù)責(zé)接收分片,生成中間結(jié)果,K 為數(shù)據(jù)的 key 值類型,V為數(shù)據(jù)的 value 值類型
Reducer:Mapper的數(shù)據(jù)會(huì)按照 key 值分組,Reducer接收的數(shù)據(jù)格式>
OutputFormat:負(fù)責(zé)將Reducer生成的數(shù)據(jù)持久化,比如存儲(chǔ)到 hdfs。
MapReduce 實(shí)現(xiàn) 一
MapReducer 程序中各個(gè)組件的實(shí)現(xiàn)類,如下所示:
InputFormat 類:TextInputFormat,數(shù)據(jù)輸出格式 LongWritable,Text(數(shù)據(jù)所在行號(hào),行數(shù)據(jù))
Mapper 類:TsvImporterTextMapper,數(shù)據(jù)輸出格式 ImmutableBytesWritable, Text(row key,行數(shù)據(jù))
Reduce 類:TextSortReducer,數(shù)據(jù)輸出格式 ImmutableBytesWritable, KeyValue (row key,單列數(shù)據(jù))
OutputFormat 類:HFileOutputFormat2,負(fù)責(zé)將結(jié)果持久化 HFile
執(zhí)行過程如下:
TextInputFormat 會(huì)讀取數(shù)據(jù)源文件,按照文件在 hdfs 的 Block 切割,每個(gè)Block對(duì)應(yīng)著一個(gè)切片
Mapper 會(huì)解析每行數(shù)據(jù),然后從中解析出 row key,生成(row key, 行數(shù)據(jù))
Reducer 會(huì)解析行數(shù)據(jù),為每列生成 KeyValue。這里簡單說下 KeyValue,它是 Hbase 存儲(chǔ)每列數(shù)據(jù)的格式, 詳細(xì)原理后面會(huì)介紹到。如果一個(gè) row key 對(duì)應(yīng)的列過多,它會(huì)將列分批處理。處理完一批數(shù)據(jù)之后,會(huì)寫入(null,null)這一條特殊的數(shù)據(jù),表示 HFileOutputFormat2 在持久化的過程中,需要新創(chuàng)建一個(gè) HFile。
這里簡單的說下 TextSortReducer,它的原理與下面的實(shí)現(xiàn)方式二,使用到的 PutSortReducer 相同,只不過從 Map 端接收到的數(shù)據(jù)為原始的行數(shù)據(jù)。如果 row key 對(duì)應(yīng)的數(shù)據(jù)過多時(shí),它也會(huì)使用 TreeSet 來去重,TreeSet 保存的數(shù)據(jù)最大字節(jié)數(shù),不能超過1GB。如果超過了,那么就會(huì)分批輸。
MapReduce 實(shí)現(xiàn) 二
MapReducer 程序中各個(gè)組件的實(shí)現(xiàn)類,如下所示:
InputFormat 類:TextInputFormat,數(shù)據(jù)輸出格式 LongWritable,Text(數(shù)據(jù)所在行號(hào),數(shù)據(jù))
Mapper 類:TsvImporterMapper,數(shù)據(jù)輸出格式 ImmutableBytesWritable,Put (row key,Put)
Combiner 類:PutCombiner
Reducer 類:PutSortReducer,數(shù)據(jù)輸出格式 ImmutableBytesWritable, KeyValue(row key,單列數(shù)據(jù))
OutputFormat 類:HFileOutputFormat2,負(fù)責(zé)將結(jié)果持久化 HFile
這里使用了 Combiner,它的作用是在 Map 端進(jìn)行一次初始的 reduce 操作,起到聚合的作用,這樣就減少了 Reduce 端與 Map 端的數(shù)據(jù)傳輸,提高了運(yùn)行效率。
執(zhí)行過程如下:
TextInputFormat 會(huì)讀取數(shù)據(jù)源文件,原理同實(shí)現(xiàn) 一
Mapper 會(huì)解析每行數(shù)據(jù),然后從中解析出 row key,并且生成 Put 實(shí)例。生成(row key, Put)
Combiner 會(huì)按照 row key 將多個(gè) Put 進(jìn)行合并,它也是分批合并的。
Reducer 會(huì)遍歷 Put 實(shí)例,為每列生成 KeyValue 并且去重。
這里講下PutSortReducer的具體實(shí)現(xiàn),下面的代碼經(jīng)過簡化,去掉了KeyValue中關(guān)于Tag的處理:
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | public class PutSortReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { // the cell creator private CellCreator kvCreator; @Override protected void reduce( ImmutableBytesWritable row, java.lang.Iterable puts, Reducer ImmutableBytesWritable, KeyValue>.Context context) throws java.io.IOException, InterruptedException { // 這里指定了一個(gè)閾值,默認(rèn)為10GB過大。如果puts中不重復(fù)的數(shù)據(jù)過大,就會(huì)按照這個(gè)閾值分批處理 long threshold = context.getConfiguration().getLong( "putsortreducer.row.threshold", 1L * (1<<30)); Iterator iter = puts.iterator(); // 開始遍歷 puts列表 while (iter.hasNext()) { // 這個(gè)TreeSet就是用來去重的,比如向同個(gè)qualifier添加值 TreeSet map = new TreeSet<>(CellComparator.getInstance()); // 記錄map里保存的數(shù)據(jù)長度 long curSize = 0; // 遍歷 puts列表,直到不重復(fù)的數(shù)據(jù)不超過閾值 while (iter.hasNext() && curSize < threshold) { // 從列表中獲取值 Put p = iter.next(); // 遍歷這個(gè)Put的所有列值,一個(gè)Put包含了多列,這些列由Cell表示 for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { KeyValue kv = null; kv = KeyValueUtil.ensureKeyValue(cell); } if (map.add(kv)) { // 如果這列值沒有重復(fù),那么添加到TreeSet中,并且更新curSize的值 curSize += kv.heapSize(); } } } } // 將map里的數(shù)據(jù),調(diào)用context.write方法輸出 int index = 0; for (KeyValue kv : map) { context.write(row, kv); if (++index % 100 == 0) context.setStatus("Wrote " + index); } // 如果還有,那么說明此行數(shù)據(jù)過大,那么就會(huì)輸出一條特殊的記錄(null, null) if (iter.hasNext()) { // force flush because we cannot guarantee intra-row sorted order context.write(null, null); } } }} |
從上面的代碼可以看到,PutSortReducer會(huì)使用到TreeSet去重,TreeSet會(huì)保存數(shù)據(jù),默認(rèn)不超過 1GB。如果當(dāng)Reducer的內(nèi)存設(shè)置過小時(shí),并且數(shù)據(jù)過大時(shí),是有可能會(huì)造成內(nèi)存溢出。如果遇到這種情況,可以通過減少閾值或者增大Reducer的內(nèi)存。
兩種實(shí)現(xiàn)方式比較
第一種方式實(shí)現(xiàn)簡單,它從Map 端傳遞到 Reduce 端的中間結(jié)果的數(shù)據(jù)格式很緊湊,如果是數(shù)據(jù)源重復(fù)的數(shù)據(jù)不多,建議使用這種。
第二種方式實(shí)現(xiàn)相對(duì)復(fù)雜,它從Map 端傳遞到 Reduce 端的中間結(jié)果的數(shù)據(jù)格式,使用 Put 來表示,它的數(shù)據(jù)存儲(chǔ)比原始的數(shù)據(jù)要大。但是它使用了 Combiner 來初步聚合,減小了 Map 端傳遞到 Reduce 端的數(shù)據(jù)大小。如果是數(shù)據(jù)源重復(fù)比較多,建議采用第二種方式。
Hbase 默認(rèn)采用第二種方式,如果用戶想使用第一種方式,需要在運(yùn)行命令時(shí),指定 importtsv.mapper.class 的值為 org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper。
數(shù)據(jù)源解析
Mapper 接收到數(shù)據(jù)后,需要解析每行數(shù)據(jù),從中讀取各列的值。它會(huì)按照分割符來切割數(shù)據(jù),然后根據(jù)指定的列格式,生成每列的數(shù)據(jù)。客戶在使用命令時(shí),通過 importtsv.separator 參數(shù)指定分隔符,通過 importtsv.columns 參數(shù)指定列格式。在客戶端指定的列名中, 有些會(huì)有著特殊含義,比如 HBASE_ROW_KEY 代表著該列是作為 row key,HBASE_TS_KEY 代表著該列作為數(shù)據(jù)的 timestamp,HBASE_ATTRIBUTES_KEY 代表著該列是屬性列等。
TsvParser 類負(fù)責(zé)解析數(shù)據(jù),它定義在 ImportTsv 類里。這里需要注意下,它不支持負(fù)責(zé)的 CSV 格式,只是簡單的根據(jù)分隔符作為列的劃分,根據(jù)換行符作為每條數(shù)據(jù)的劃分。
它的原理比較簡單,這里不再詳細(xì)介紹。
Reducer的數(shù)目選擇
我們知道MapReduce程序的一般瓶頸在于 reduce 階段,如果我們能夠適當(dāng)增加 reduce 的數(shù)目,一般能夠提高運(yùn)行效率(如果數(shù)據(jù)傾斜不嚴(yán)重)。我們還知道 Hbase 支持超大數(shù)據(jù)量的表,它會(huì)將表的數(shù)據(jù)自動(dòng)切割,分布在不同的服務(wù)上。這些數(shù)據(jù)切片在 Hbase 里,稱為Region, 每個(gè)Region只負(fù)責(zé)一段 row key 范圍的數(shù)據(jù)。
Hbase 在批量導(dǎo)入的時(shí)候,會(huì)去獲取表的 Region 分布情況,然后將 Reducer 的數(shù)目 設(shè)置為 Region 數(shù)目。如果在導(dǎo)入數(shù)據(jù)之前還沒有創(chuàng)建表,Hbase會(huì)自動(dòng)創(chuàng)建,但是創(chuàng)建的表的region數(shù)只有一個(gè)。所以在生成HFile之前,我們可以自行創(chuàng)建表,并指定 Reigion 的分布情況,那么就能提高 Reducer 的數(shù)目。
Reducer 的數(shù)目決定,是在 HFileOutputFormat2 的 configureIncrementalLoad 方法里。它會(huì)讀取表的 region 分布情況,然后調(diào)用 setNumReduceTasks 方法設(shè)置 reduce 數(shù)目。下面的代碼經(jīng)過簡化:
| 12345678910111213141516171819202122232425 | public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, RegionLocator regionLocator) throws IOException { ArrayList singleTableInfo = new ArrayList<>(); singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } static void configureIncrementalLoad(Job job, List multiTableInfo, Class extends OutputFormat, ?>> cls) throws IOException { // 這里雖然支持多表,但是批量導(dǎo)入時(shí)只會(huì)使用單表 List regionLocators = new ArrayList<>( multiTableInfo.size()); for( TableInfo tableInfo : multiTableInfo ) { // 獲取region分布情況 regionLocators.add(tableInfo.getRegionLocator()); ...... } // 獲取region的row key起始大小 List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); // 設(shè)置reduce的數(shù)目 job.setNumReduceTasks(startKeys.size()); } } |
Hbase 數(shù)據(jù)存儲(chǔ)格式
Hbase的每列數(shù)據(jù)都是單獨(dú)存儲(chǔ)的,都是以 KeyValue 的形式。KeyValue 的數(shù)據(jù)格式如下圖所示:
| 123 | ----------------------------------------------- keylength | valuelength | key | value | Tags----------------------------------------------- |
其中 key 的格式如下:
| 123 | ---------------------------------------------------------------------------------------------- rowlength | row | columnfamilylength | columnfamily | columnqualifier | timestamp | keytype ---------------------------------------------------------------------------------------------- |
Tags的格式如下:
| 123 | ------------------------- tagslength | tagsbytes ------------------------- |
tagsbytes 可以包含多個(gè) tag,每個(gè) tag 的格式如下:
| 123 | ---------------------------------- taglength | tagtype | tagbytes---------------------------------- |
Reducer 會(huì)使用 CellCreator 類,負(fù)責(zé)生成 KeyValue。CellCreator 的原理很簡單,這里不再詳細(xì)介紹。
生成 HFile
HFileOutputFormat2 負(fù)責(zé)將Reduce的結(jié)果,持久化成 HFile 文件。持久化目錄的格式如下:
| 1234567 | .|---- column_family_1| |---- uuid_1| `---- uuid_2|---- column_family_2| |---- uuid3| `---- uuid4 |
每個(gè) column family 對(duì)應(yīng)一個(gè)目錄,這個(gè)目錄會(huì)有多個(gè) HFile 文件。
HFileOutputFormat2 會(huì)創(chuàng)建 RecordWriter 實(shí)例,所有數(shù)據(jù)的寫入都是通過 RecordWriter。
| 12345678910111213141516171819 | public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { @Override public RecordWritergetRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { // 調(diào)用createRecordWriter方法創(chuàng)建 return createRecordWriter(context, this.getOutputCommitter(context)); } static RecordWriter createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) throws IOException { // 實(shí)例化一個(gè)匿名類 return new RecordWriter() { ...... } }} |
可以看到 createRecordWriter 方法,返回了一個(gè)匿名類。繼續(xù)看看這個(gè)匿名類的定義:
| 123456789101112 | // 封裝了StoreFileWriter,記錄了寫入的數(shù)據(jù)長度static class WriterLength { long written = 0; StoreFileWriter writer = null;}class RecordWriter<ImmutableBytesWritable, V>() { // key值為表名和column family組成的字節(jié),value為對(duì)應(yīng)的writer private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); // 是否需要?jiǎng)?chuàng)建新的HFile private boolean rollRequested = false;} |
從上面 WriterLength 類的定義,我們可以知道 RecordWriter的底層原理是調(diào)用了StoreFileWriter的接口。對(duì)于StoreFile,我們回憶下Hbase的寫操作,它接收客戶端的寫請(qǐng)求,首先寫入到內(nèi)存中MemoryStore,然后刷新到磁盤生成StoreFile。如果該表有兩個(gè)column family,就會(huì)有兩個(gè)MemoryStore和兩個(gè)StoreFile,對(duì)應(yīng)于不同的column family。所以 RecordWriter 類有個(gè)哈希表,記錄著每個(gè) column family 的 StoreFileWriter。(這里說的 StoreFile 也就是 HFile)
因?yàn)?HFile 支持不同的壓縮算法,不同的塊大小,RecordWriter 會(huì)根據(jù)配置,獲取HFile的格式,然后創(chuàng)建對(duì)應(yīng)的 StoreFileWriter。下面創(chuàng)建 StoreFileWriter 時(shí)只指定了文件目錄,StoreFileWriter會(huì)在這個(gè)目錄下,使用 uuid 生成一個(gè)唯一的文件名。
| 1234567891011121314151617181920212223242526272829303132333435363738 | class RecordWriter<ImmutableBytesWritable, V>() { // favoredNodes 表示創(chuàng)建HFile文件,希望盡可能在這些服務(wù)器節(jié)點(diǎn)上 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) throws IOException { // 根據(jù)表名和column family生成唯一字節(jié) byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); Path familydir = new Path(outputDir, Bytes.toString(family)); WriterLength wl = new WriterLength(); // 獲取HFile的壓縮算法 Algorithm compression = compressionMap.get(tableAndFamily); // 獲取bloom過濾器信息 BloomType bloomType = bloomTypeMap.get(tableAndFamily); // 獲取HFile其他的配置 ..... // 生成HFile的配置信息 HFileContextBuilder contextBuilder = new HFileContextBuilder() .withCompression(compression) .withChecksumType(HStore.getChecksumType(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) .withBlockSize(blockSize); HFileContext hFileContext = contextBuilder.build(); // 實(shí)例化 StoreFileWriter f (null == favoredNodes) { wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); } else { wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withOutputDir(familydir).withBloomType(bloomType) .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) .withFavoredNodes(favoredNodes).build(); } // 添加到 writers集合中 this.writers.put(tableAndFamily, wl); return wl; }} |
繼續(xù)看看 RecordWriter 的寫操作:
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | class RecordWriter<ImmutableBytesWritable, V>() { @Override public void write(ImmutableBytesWritable row, V cell) Cell kv = cell; // 收到空數(shù)據(jù),表示需要立即刷新到磁盤,并且創(chuàng)建新的HFile if (row == null && kv == null) { // 刷新到磁盤 rollWriters(null); return; } // 根據(jù)table和column family生成唯一值 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); // 獲取對(duì)應(yīng)的writer WriterLength wl = this.writers.get(tableAndFamily); if (wl == null) { // 如果為空,那么先創(chuàng)建對(duì)應(yīng)的文件目錄 Path writerPath = null; writerPath = new Path(outputDir, Bytes.toString(family)); fs.mkdirs(writerPath); } // 檢測(cè)當(dāng)前HFile的大小是否超過了最大值,默認(rèn)為10GB if (wl != null && wl.written + length >= maxsize) { this.rollRequested = true; } // 如果當(dāng)前HFile過大,那么需要將它刷新到磁盤 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { rollWriters(wl); } // 創(chuàng)建writer if (wl == null || wl.writer == null) { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { // 如果開啟了位置感知,那么就會(huì)去獲取row所在的region的地址 HRegionLocation loc = null; loc = locator.getRegionLocation(rowKey); InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); // 創(chuàng)建writer,指定了偏向節(jié)點(diǎn) wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa}) } else { // 創(chuàng)建writer wl = getNewWriter(tableNameBytes, family, conf, null); } } wl.writer.append(kv); wl.written += length; this.previousRow = rowKey; } private void rollWriters(WriterLength writerLength) throws IOException { if (writerLength != null) { // 關(guān)閉當(dāng)前writer closeWriter(writerLength); } else { // 關(guān)閉所有family對(duì)應(yīng)的writer for (WriterLength wl : this.writers.values()) { closeWriter(wl); } } this.rollRequested = false; } private void closeWriter(WriterLength wl) throws IOException { if (wl.writer != null) { close(wl.writer); } wl.writer = null; wl.written = 0; }} |
RecordWriter在寫入數(shù)據(jù)時(shí),如果遇到一條 row key 和 value 都為 null 的數(shù)據(jù)時(shí),這條數(shù)據(jù)有著特殊的含義,表示writer應(yīng)該立即 flush。在每次創(chuàng)建RecordWriter時(shí),它會(huì)根據(jù)此時(shí)row key 的值,找到所屬 Region 的服務(wù)器地址,然后盡量在這臺(tái)服務(wù)器上,創(chuàng)建新的HFile文件。
加載 HFile
上面生成完 HFile 之后,我們還需要調(diào)用第二條命令完成加載 HFile 過程。這個(gè)過程分為兩步,切割數(shù)據(jù)量大的 HFile 文件和發(fā)送加載請(qǐng)求讓服務(wù)器完成。
切割 HFile
首先它會(huì)遍歷目錄下的每個(gè) HFile ,
首先檢查 HFile 里面數(shù)據(jù)的 family 在 Hbase 表里是否存在。
獲取HFile 數(shù)據(jù)的起始 row key,找到 Hbase 里對(duì)應(yīng)的 Region,然后比較兩者之間的 row key 范圍
如果 HFile 的 row key 范圍比 Region 大,也就是 HFile 的結(jié)束 row key 比這個(gè) Region 的 結(jié)束 row Key 大,那么需要將這個(gè) HFile 切割成兩份,切割值為 Region 的結(jié)束 row key。
繼續(xù)從上一部切割生成的兩份HFile中,選擇第二份 HFile(它的row key 大于 Regioin 的結(jié)束 row key),將它繼續(xù)按照第二步切割,直到所有HFile的 row key范圍都能在一個(gè)Region里。
在割切HFile的過程中,還會(huì)檢查 column family 對(duì)應(yīng)的 HFile數(shù)目。如果一個(gè) column family 對(duì)應(yīng)的 HFile 數(shù)目過多,默認(rèn)數(shù)目為32,程序就會(huì)報(bào)錯(cuò)。但是這個(gè)值通過指定 hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily,來設(shè)置更大的值。
發(fā)送加載請(qǐng)求
當(dāng)完成了HFile的切割后,最后的導(dǎo)入動(dòng)作是發(fā)送 BulkLoadHFileRequest 請(qǐng)求給 Hbase 服務(wù)端。Hbase 服務(wù)端會(huì)處理該請(qǐng)求,完成HFile加載。
其他
至于我研究 Hbase Bulkload 的原因,是在使用過程中發(fā)生了 Out Of Memory 的錯(cuò)誤。雖然經(jīng)過排查,發(fā)現(xiàn)和 Hbase Bulkload 的原理沒什么關(guān)系,不過在此也順便提一下,希望能幫到遇到類似情況的人。首先說下我使用的Hadoop 版本是 CDH 5.12.2。
經(jīng)過排查,發(fā)現(xiàn)是因?yàn)?Hbase Bulkload 底層用的 MapReduce 模式為本地模式,而不是集群 Yarn 的方式。我們知道 MapReduce 程序選擇哪一種方式,可以通過 mapreduce.framework.name 配置項(xiàng)指定。雖然在 CDH 的 Yarn 配置頁面里,設(shè)置了該配置為 yarn,但是 Hbase Bulkload 仍然使用本地模式。后來發(fā)現(xiàn) Yarn 組件下有個(gè) Gateway 的角色實(shí)例,這是個(gè)特殊的角色,它負(fù)責(zé) Yarn 客戶端的配置部署。而恰好這臺(tái)主機(jī)沒有安裝,所以在使用 Hbase Bulkload 時(shí),沒有讀取到 Yarn 的配置。解決方法是在 CDH 界面添加 Gateway 實(shí)例就好了。
總結(jié)
以上是生活随笔為你收集整理的mapreduce原理_Hbase Bulkload 原理面试必备的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php用wordanalysis抓取姓名
- 下一篇: 电脑桌面归纳小窗口_电脑一分钟小技巧:如