mapreduce 算法_MapReduce算法–了解数据联接第二部分
mapreduce 算法
自從我上一次發布以來已經有一段時間了,就像我上一次大休息一樣,我正在Coursera上一些課程。 這次是Scala中的函數式編程 原理和React式編程原理 。 我發現它們都是不錯的課程,如果有時間的話,建議您選一門。 在這篇文章中,我們將繼續介紹如何使用MapReduce實現數據密集型文本處理中的算法的系列,這次涵蓋了地圖端連接。 從名稱可以猜出,映射側聯接只在映射階段連接數據,而完全跳過簡化階段。 在上一篇有關數據聯接的文章中,我們介紹了減少側聯接 。 減少端連接很容易實現,但缺點是所有數據都通過網絡發送到減少器。 由于我們避免了跨網絡發送數據的開銷,因此地圖端連接可顯著提高性能。 但是,與減少側聯接不同,映射側聯接需要滿足非常特定的條件。 今天,我們將討論地圖端連接的要求以及如何實現它們。
地圖端加入條件
要利用地圖側聯接,我們的數據必須滿足以下條件之一:
我們將考慮第一種情況,其中有兩個(或更多)數據集需要連接,但是太大而無法容納到內存中。 我們將假設最壞的情況是,文件沒有按相同的順序排序或分區。
資料格式
在開始之前,讓我們看一下正在使用的數據。 我們將有兩個數據集:
兩個數據集均以逗號分隔,并且聯接鍵(GUID)位于第一位置。 加入后,我們希望將數據集2中的雇主信息附加到數據集1的末尾。 此外,我們希望將GUID保持在數據集1的第一個位置,但要從數據集2刪除GUID。
數據集1:
數據集2:
de68186a-1004-4211-a866-736f414eac61,Jacobs6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Partsaef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms合并結果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms 6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs現在,我們繼續介紹如何連接兩個數據集。
Map-Side連接具有大數據集
為了能夠執行地圖端連接,我們需要將數據按相同的鍵排序并具有相同數量的分區,這意味著任何記錄的所有鍵都在同一分區中。 盡管這似乎是一個艱巨的要求,但很容易解決。 Hadoop對所有鍵進行排序,并保證將具有相同值的鍵發送到相同的reducer。 因此,只需運行一個MapReduce作業,該作業只不過要通過您要連接的鍵輸出數據,并為所有數據集指定完全相同數量的化簡器,我們將以正確的形式獲取數據。 考慮到能夠進行地圖側連接所帶來的效率提高,可能值得花費額外的MapReduce作業。 在這一點上需要重復,至關重要的是,在“準備”階段,將對數據進行排序和分區時,所有數據集都必須指定完全相同數量的化簡。 在本文中,我們將獲取兩個數據集,并在兩個數據集上運行初始MapReduce作業以進行排序和分區,然后運行最終作業以執行地圖端聯接。 首先,讓我們介紹一下MapReduce作業,以相同的方式對數據進行排序和分區。
第一步:排序和分區
首先,我們需要創建一個Mapper ,該Mapper將簡單地選擇要根據給定索引進行排序的鍵:
public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private Text joinKey = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String separator = context.getConfiguration().get("separator");keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));splitter = Splitter.on(separator);joiner = Joiner.on(separator);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Iterable<String> values = splitter.split(value.toString());joinKey.set(Iterables.get(values,keyIndex));if(keyIndex != 0){value.set(reorderValue(values,keyIndex));}context.write(joinKey,value);}private String reorderValue(Iterable<String> value, int index){List<String> temp = Lists.newArrayList(value);String originalFirst = temp.get(0);String newFirst = temp.get(index);temp.set(0,newFirst);temp.set(index,originalFirst);return joiner.join(temp);} }SortByKeyMapper只需通過從在配置參數keyIndex給定位置找到的給定文本行中提取值來簡單地設置joinKey的值。 同樣,如果keyIndex不等于零,我們交換在第一個位置和keyIndex位置中找到的值的順序。 盡管這是一個有問題的功能,但是我們稍后將討論為什么要這樣做。 接下來,我們需要一個Reducer :
public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {private static final NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(nullKey,value);}} }SortByKeyReducer寫出給定鍵的所有值,但是會NullWritable鍵并寫一個NullWritable 。 在下一節中,我們將解釋為什么不使用密鑰。
第二步:Map-Side聯接
在執行地圖側連接時,記錄在到達映射器之前會被合并。 為此,我們使用CompositeInputFormat 。 我們還需要設置一些配置屬性。 讓我們看一下如何配置地圖側連接:
private static Configuration getMapJoinConfiguration(String separator, String... paths) {Configuration config = new Configuration();config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);config.set("mapred.join.expr", joinExpression);config.set("separator", separator);return config;}首先,我們通過設置mapreduce.input.keyvaluelinerecordreader.key.value.separator屬性來指定用于分隔鍵和值的字符。 接下來,我們使用CompositeInputFormat.compose方法創建一個“聯接表達式”,通過使用單詞“ inner”指定內部聯接 ,然后指定要使用的輸入格式, KeyValueTextInput類以及最后一個String varargs,它們表示文件的路徑。 join(運行map-reduce作業以對數據進行排序和分區的輸出路徑)。 KeyValueTextInputFormat類將使用分隔符將第一個值設置為鍵,其余的將用作該值。
映射器的加入
連接源文件中的值后,將Mapper.map方法,該方法將接收鍵的Text對象(連接記錄中的鍵相同)和一個TupleWritable ,該TupleWritable由輸入文件中的連接值組成對于給定的密鑰。 請記住,我們希望最終輸出的第一個位置具有join-key,然后在一個定界的String中包含所有連接的值。 為此,我們有一個自定義的映射器,將我們的數據以正確的格式放置:
public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {private static final NullWritable nullKey = NullWritable.get();private Text outValue = new Text();private StringBuilder valueBuilder = new StringBuilder();private String separator;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {separator = context.getConfiguration().get("separator");}@Overrideprotected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {valueBuilder.append(key).append(separator);for (Writable writable : value) {valueBuilder.append(writable.toString()).append(separator);}valueBuilder.setLength(valueBuilder.length() - 1);outValue.set(valueBuilder.toString());context.write(nullKey, outValue);valueBuilder.setLength(0);} }在CombineValuesMapper我們將鍵和所有聯接的值附加到一個定界的String 。 在這里,我們終于可以看到為什么在以前的MapReduce作業中丟棄了join鍵的原因。 由于鍵是要連接的所有數據集的值中的第一個位置,因此我們的映射器自然會從連接的數據集中消除重復的鍵。 我們需要做的就是將給定的鍵插入StringBuilder ,然后附加包含在TupleWritable的值。
放在一起
現在,我們擁有所有代碼,可以在大型數據集上運行地圖端聯接。 讓我們看一下我們將如何一起運行所有作業。 如前所述,我們假設我們的數據未按相同的順序進行排序和分區,因此我們將需要運行N(在本例中為2)MapReduce作業,以獲取正確格式的數據。 在運行初始排序/分區作業之后,將執行執行實際聯接的最終作業。
public class MapSideJoinDriver {public static void main(String[] args) throws Exception {String separator = ",";String keyIndex = "0";int numReducers = 10;String jobOneInputPath = args[0];String jobTwoInputPath = args[1];String joinJobOutPath = args[2];String jobOneSortedPath = jobOneInputPath + "_sorted";String jobTwoSortedPath = jobTwoInputPath + "_sorted";Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);mapJoin.setInputFormatClass(CompositeInputFormat.class);List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);int exitStatus = 0;for (Job job : jobs) {boolean jobSuccessful = job.waitForCompletion(true);if (!jobSuccessful) {System.out.println("Error with job " + job.getJobName() + " " + job.getStatus().getFailureInfo());exitStatus = 1;break;}}System.exit(exitStatus);}MapSideJoinDriver對運行MapReduce作業進行基本配置。 有趣的一點是,排序/分區作業每個都指定10個化簡器,而最后一個作業明確將化簡器的數量設置為0,因為我們是在地圖端加入的,不需要化簡階段。 由于我們沒有任何復雜的依賴關系,因此將作業放入ArrayList并以線性順序運行作業(第24-33行)。
結果
最初,我們有2個文件; 第一個文件中的姓名和地址信息,第二個文件中的就業信息。 這兩個文件在第一列中都有唯一的ID。
文件一:
文件二:
.... 08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms ....結果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms如我們在這里看到的,我們已經成功地將記錄合并在一起,并保持了文件格式,而結果中沒有重復的鍵。
結論
在本文中,我們演示了當兩個數據集都很大且無法容納到內存中時如何執行地圖端連接。 如果您覺得這需要大量工作才能完成,那么您是正確的。 盡管在大多數情況下,我們希望使用諸如Pig或Hive之類的高級工具,但了解對大型數據集執行地圖側聯接的機制很有幫助。 當您需要從頭開始編寫解決方案時,尤其如此。 謝謝你的時間。
資源資源
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼和測試
- 編程蜂巢愛德華卡普里奧羅,院長Wampler和Jason拉瑟格倫
- 通過Alan Gates 編程Pig
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
翻譯自: https://www.javacodegeeks.com/2014/02/mapreduce-algorithms-understanding-data-joins-part-ii.html
mapreduce 算法
總結
以上是生活随笔為你收集整理的mapreduce 算法_MapReduce算法–了解数据联接第二部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 思特威推出两颗高分辨率高速工业CMOS图
- 下一篇: 李想回应原iQOO产品经理宋紫薇加盟:理