java mapreduce 实例_MapReduce -- JAVA 实例(一)计算总数
MapReduce
===========================
將任務(wù)細化,讓不同的節(jié)點處理不同部分。處理完后,再把各自的結(jié)果進行統(tǒng)一。它通過鍵值對來處理數(shù)據(jù)。但鍵和值的類型都有要求。
通過JAVA編寫的 MapReduce 程序中。值的類型必須實現(xiàn)了 Writable, 因為它是要被寫入文件中的。而實現(xiàn)了
WritableComparable
接口的類,即可以是鍵也可以是值。因為它是可寫的,所以可以是值;而它又是可進行比較的,所以作為鍵。
一個可執(zhí)行的 MapReduce JAVA 程序應(yīng)該包括如下內(nèi)容:
1.一個 main 方法,用來定義整個流程,接收的參數(shù)等。
2.定義 Mapper, 定義如何劃分數(shù)據(jù)生成的值也是鍵值對形式。
3.定義 Reducer, 定義如何處理結(jié)果。最后生成的值也是鍵值對形式。
4.定義一個 Job, 將 Mapper, Reducer 等必要的值設(shè)置進去。
MapReduce 通過操作 鍵/值 對來處理數(shù)據(jù),一般形式是:
map: 將 (K1, V1) 的輸入轉(zhuǎn)化成 list(K2, V2) 的輸出
reduce: 將 (K2, list(V2)) 的輸入轉(zhuǎn)化成 list(K3, V3) 的輸出
比如,一個網(wǎng)絡(luò)游戲,有多個區(qū),而角色又分幾個種族。這時要分析每個區(qū)每個種族分別有多少。
整個分析的輸入就是所有的這些數(shù)據(jù),可能是數(shù)據(jù)庫數(shù)據(jù),CVS 形式的數(shù)據(jù)表。
經(jīng)過 Map 方法后,數(shù)據(jù)分析任務(wù)分配給不同的 DataNode 。每個 DataNode 上的數(shù)據(jù)可能是:A區(qū) X 族:
5W 人; A區(qū) Y 族: 6W 人; B 區(qū) X 族 4W 人...
當然,這些數(shù)據(jù)是 鍵/值 對形式存儲的。
然后,這些數(shù)據(jù)再經(jīng)過一個被稱為洗牌的過程,將不同種族的 鍵/值 對放到不同的 DataNode 上。
再經(jīng)過 Reduce, 得到最終結(jié)果:X 族 8W; Y 族: 6W 當然,結(jié)果形式還是 鍵/值 對。
Mapper
-----------------------------------------------------
Mapper 的定義是由一個類來實現(xiàn)的。它必須繼承 MapReduceBase 基類并實現(xiàn) Mapper 接口。
Mapper 只有一個方法:map.用于處理單獨的鍵值對。運行的時候鍵值對是由 Task
傳遞過來的。所以這里只需要定義如何處理就行,不用關(guān)心誰調(diào)用。
Reducer
-----------------------------------------------------
Reducer 的定義是由一個類來實現(xiàn)的。它必須繼承 MapReduceBase 基類并實現(xiàn) Reducer
接口。
當 Reducer 任務(wù)接收來自各個 mapper
的輸出時,它按鍵值對中的鍵,對數(shù)據(jù)進行排序,并將相同的值的值歸并。然后調(diào)用 reduce() 方法。
案例:專利引用計算
************************************************************************
http://www.nber.org/patents/ 上有專利相關(guān)的數(shù)據(jù)。我們構(gòu)造 MapReduce
程序來分析相關(guān)的結(jié)果。
選用該數(shù)據(jù)是因為該案例的數(shù)據(jù)結(jié)構(gòu)和當前一些社會網(wǎng)絡(luò)圖差不多,數(shù)據(jù)形式較普遍。
專利引用數(shù)據(jù)集
http://www.nber.org/patents/acite75_99.zip
專利描述數(shù)據(jù)集:
http://www.nber.org/patents/apat63_99.zip
[root@localhost mapreduce1]# wget
http://www.nber.org/patents/acite75_99.zip
[root@localhost mapreduce1]# wget
http://www.nber.org/patents/apat63_99.zip
[root@localhost mapreduce1]# unzip
acite75_99.zip
[root@localhost mapreduce1]# unzip apat63_99.zip
[root@localhost mapreduce1]# cat cite75_99.txt | wc -l
16522439
[root@localhost mapreduce1]# cat apat63_99.txt | wc -l
2923923
專利引用數(shù)據(jù)有 16522439 條。專利描述有 2923923 條。
[root@localhost mapreduce1]# head -n 5
cite75_99.txt
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
上面顯示的是專利之間的引用關(guān)系。每行表示一條數(shù)據(jù)。前面的數(shù)字是專業(yè)號,后面是被引用的專利號。所以上面的數(shù)據(jù)可以看到,3858241
引用了 其它的四個專利。當然后面還有很多數(shù)據(jù),這其實就是一個多對多的關(guān)系。
[root@localhost mapreduce1]# head -n 5
apat63_99.txt
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
上面的數(shù)據(jù)是一個專利的描述。各字段分別是:PATENT 專利號,GYEAR 批準年,GDATE 批準日,APPYEAR
申請年,COUNTRY 第一發(fā)明人國家,POSTATE 第一發(fā)明人所在州(如果國家為美國),ASSIGNEE
專利權(quán)人,ASSCODE專 利權(quán)人類型,CLAIMS 聲明數(shù)目,NCLASS專利類型
需求:
解析各個專利被其它哪些專利引用了.
上傳要解析的文件到 HDFS:
[root@localhost sunyutest]# ../bin/hadoop fs -put
mapreduce1/cite75_99.txt .
查看是否上傳成功
[root@localhost sunyutest]# ../bin/hadoop fs -lsr .
-rw-r--r-- ?1 root supergroup
264075431 2013-08-19 12:21
/user/root/cite75_99.txt
-rw-r--r-- ?1 root supergroup
91014095 2013-08-15 14:44
/user/root/putmerg.txt
編寫代碼:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements
Mapper {
@Override
public void map(Text key, Text value,
OutputCollector output, Reporter repoter)
throws IOException {
output.collect(value, key);
}
}
public static class Reduce extends MapReduceBase
implements
Reducer {
@Override
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter repoter)
throws IOException {
String csv = "";
while (values.hasNext()) {
if (csv.length() > 0) {
csv += ",";
}
csv += values.next().toString();
}
output.collect(key, new Text(csv));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new MyJob(),
args);
System.exit(res);
}
}
編譯 JAVA 代碼
[root@localhost sunyutest]# javac -classpath
/home/hadoop/hadoop-core-1.2.0.jar -d mapreduce1/classes/
mapreduce1/src/MyJob.java
生成 JAR 包
[root@localhost sunyutest]# jar -cvf mapreduce1/mapreduce1.jar
-C mapreduce1/classes/ .
Hadoop 執(zhí)行 JAR
[root@localhost sunyutest]# /home/hadoop/bin/./hadoop jar
mapreduce1/mapreduce1.jar MyJob /user/root/cite75_99.txt
/user/root/mapreduceresult1
過程:
13/08/19 12:23:50 INFO util.NativeCodeLoader: Loaded the
native-hadoop library
13/08/19 12:23:50 WARN snappy.LoadSnappy: Snappy native
library not loaded
13/08/19 12:23:50 INFO mapred.FileInputFormat: Total input
paths to process : 1
13/08/19 12:23:50 INFO mapred.JobClient: Running job:
job_201308151016_0003
13/08/19 12:23:51 INFO mapred.JobClient: ?map
0% reduce 0%
13/08/19 12:24:02 INFO mapred.JobClient: ?map
19% reduce 0%
13/08/19 12:24:24 INFO mapred.JobClient: ?map
62% reduce 0%
13/08/19 12:24:27 INFO mapred.JobClient: ?map
73% reduce 0%
13/08/19 12:24:30 INFO mapred.JobClient: ?map
86% reduce 16%
13/08/19 12:24:33 INFO mapred.JobClient: ?map
98% reduce 16%
13/08/19 12:24:36 INFO mapred.JobClient: ?map
100% reduce 16%
13/08/19 12:24:45 INFO mapred.JobClient: ?map
100% reduce 25%
13/08/19 12:25:01 INFO mapred.JobClient: ?map
100% reduce 97%
13/08/19 12:25:03 INFO mapred.JobClient: ?map
100% reduce 100%
13/08/19 12:25:04 INFO mapred.JobClient: Job complete:
job_201308151016_0003
13/08/19 12:25:04 INFO mapred.JobClient: Counters: 29
13/08/19 12:25:04 INFO mapred.JobClient: ?Job Counters
13/08/19 12:25:04 INFO mapred.JobClient: ?Launched reduce tasks=1
13/08/19 12:25:04 INFO mapred.JobClient: ?SLOTS_MILLIS_MAPS=94446
13/08/19 12:25:04 INFO mapred.JobClient: ?Total time spent by all reduces waiting after
reserving slots (ms)=0
13/08/19 12:25:04 INFO mapred.JobClient: ?Total time spent by all maps waiting after
reserving slots (ms)=0
13/08/19 12:25:04 INFO mapred.JobClient: ?Launched map tasks=4
13/08/19 12:25:04 INFO mapred.JobClient: ?Data-local map tasks=4
13/08/19 12:25:04 INFO mapred.JobClient: ?SLOTS_MILLIS_REDUCES=47599
13/08/19 12:25:04 INFO mapred.JobClient: ?File Input Format Counters
13/08/19 12:25:04 INFO mapred.JobClient: ?Bytes Read=264087719
13/08/19 12:25:04 INFO mapred.JobClient: ?File Output Format Counters
13/08/19 12:25:04 INFO mapred.JobClient: ?Bytes Written=0
13/08/19 12:25:04 INFO mapred.JobClient: ?FileSystemCounters
13/08/19 12:25:04 INFO mapred.JobClient: ?FILE_BYTES_READ=735648003
13/08/19 12:25:04 INFO mapred.JobClient: ?HDFS_BYTES_READ=264088111
13/08/19 12:25:04 INFO mapred.JobClient: ?FILE_BYTES_WRITTEN=1033040702
13/08/19 12:25:04 INFO mapred.JobClient: ?Map-Reduce Framework
13/08/19 12:25:04 INFO mapred.JobClient: ?Map output materialized bytes=297120333
13/08/19 12:25:04 INFO mapred.JobClient: ?Map input records=16522439
13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce shuffle bytes=297120333
13/08/19 12:25:04 INFO mapred.JobClient: ?Spilled Records=57431615
13/08/19 12:25:04 INFO mapred.JobClient: ?Map output bytes=264075431
13/08/19 12:25:04 INFO mapred.JobClient: ?Total committed heap usage
(bytes)=875757568
13/08/19 12:25:04 INFO mapred.JobClient: ?CPU time spent (ms)=104660
13/08/19 12:25:04 INFO mapred.JobClient: ?Map input bytes=264075431
13/08/19 12:25:04 INFO mapred.JobClient: ?SPLIT_RAW_BYTES=392
13/08/19 12:25:04 INFO mapred.JobClient: ?Combine input records=0
13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce input records=16522439
13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce input groups=3258984
13/08/19 12:25:04 INFO mapred.JobClient: ?Combine output records=0
13/08/19 12:25:04 INFO mapred.JobClient: ?Physical memory (bytes)
snapshot=1098424320
13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce output records=0
13/08/19 12:25:04 INFO mapred.JobClient: ?Virtual memory (bytes) snapshot=5465493504
13/08/19 12:25:04 INFO mapred.JobClient: ?Map output records=16522439
執(zhí)行完后再看HDFS中的文件:
[root@localhost sunyutest]# ../bin/hadoop fs -lsr
/user/root/mapreduceresult1/
-rw-r--r-- ?1 root supergroup
0 2013-08-19 12:40
/user/root/mapreduceresult1/_SUCCESS
drwxr-xr-x ?- root supergroup
0 2013-08-19 12:39
/user/root/mapreduceresult1/_logs
drwxr-xr-x ?- root supergroup
0 2013-08-19 12:39
/user/root/mapreduceresult1/_logs/history
-rw-r--r-- ?1 root supergroup
23426
2013-08-19 12:39
/user/root/mapreduceresult1/_logs/history/job_201308151016_0006_1376887156678_root_MyJob
-rw-r--r-- ?1 root supergroup
47406
2013-08-19 12:39
/user/root/mapreduceresult1/_logs/history/job_201308151016_0006_conf.xml
-rw-r--r-- ?1 root supergroup
158078539 2013-08-19 12:40
/user/root/mapreduceresult1/part-00000
發(fā)現(xiàn)結(jié)果文件 mapreduceresult1 已經(jīng)有了,將它里面的
part-00000 下載到本地文件系統(tǒng)中:
[root@localhost sunyutest]# ../bin/hadoop fs -get
/user/root/mapreduceresult1/part-00000 .
查看前十行:
[root@localhost sunyutest]# head -n 10 part-00000
"CITED" "CITING"
1 3964859,4647229
10000 4539112
100000 5031388
1000006 4714284
1000007 4766693
1000011 5033339
1000017 3908629
1000026 4043055
1000033 4190903,4975983
可以看到,4190903, 4975983 兩個專利都引用了 1000033 專利。
這時,回過頭來看前面的 JAVA 代碼:
執(zhí)行整個邏輯的類是 MyJob, Hadoop 要求 Mapper 和 Reducer 必須是它們自身的靜態(tài)類,所以在
MyJob 里面分別定義了這兩個類。而且將它定義成了MyJob 的內(nèi)部類。這樣是為了簡單代碼,方便管理。
代碼邏輯中,核心代碼在 run() 中。它實例化一個 JobConf 對象,并通過
JobClient.runJob(job);啟動作業(yè)。實際上是 JobClient 和 JobTracker
通信,讓該作業(yè)在集群上啟動。JobConf 保存作業(yè)運行所需的全部配置參數(shù)。
Mapper 類的核心方法是 map(), Reducer 的是
reduce() 方法。
每個 map() 方法的調(diào)用都會傳入一個類型為 K1, V1 的鍵值對,它是由 mapper 生成,并通過
OutputCollector 對象的 collect() 方法來輸出,所以一定要調(diào)用:
output.collect((K2) k, (V2) v);
Reducer 中的 reduce() 方法的每次調(diào)用都被賦予
K2 類型的鍵以及 V2 類型的一組值。K2, V2 的類型必須和 mapper 中輸出的類型一樣。reduce() 可能會遍歷 V2
列表中的所有值:
while (values.hasNext()) {
if (csv.length() > 0) {
csv += ",";
}
csv += values.next().toString();
}
處理完后,reduce() 使用 OutputCollector 來輸出 K3/V3
的鍵/值對:output.collect(key, new Text(csv));
要注意的是 mapper 和 reducer
中使用的數(shù)據(jù)類型必須和定義 JobConf 時設(shè)置的一樣:
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
*************************************************************************
但從上面JAVA代碼可以看到,各類的參數(shù),設(shè)定任務(wù)還是有些麻煩。
Hadoop 0.2
版本中,將許多類重寫了。
上面代碼中看到的,大多數(shù)類或接口都是在包 org.apache.hadoop.mapred 中。
新API將許多都移到了 org.apache.hadoop.mapreduce 中。而且增加了上下文對象
Context,用來 OutputCollector, 和 Reporter
對象。
通過 Context.write 來輸出結(jié)果,而不再是 OutputCollector.collect()。
另外,之前 Mapper 和 Reduce 都是接口,所以我們自定義的類都要 extends MapReduceBase
然后 implements Mapper 或者 Reducer。
現(xiàn)在在 org.apache.hadoop.mapreduce 中增加了 Mapper 和 Reducer
抽象類,我們自定義的類直接繼承它就行了。另外,拋出的異常,也由 IOException
變?yōu)?IOException 和 InterruptedException。
MyJob 的新API代碼是:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJobNew extends Configured implements Tool
{
public static class MapClass extends
Mapper
Text> {
public void map(LongWritable key, Text value, Context
context)
throws IOException,
InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new
Text(citation[0]));
}
}
public static class Reduce extends
Reducer
Text> {
public void reduce(Text key,
Iterable values, Context
context)
throws IOException,
InterruptedException {
String csv = "";
for (Text val : values) {
if (csv.length() > 0) {
csv += ",";
}
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "MyJobNew");
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new MyJobNew(),
args);
System.exit(res);
}
}
總結(jié)
以上是生活随笔為你收集整理的java mapreduce 实例_MapReduce -- JAVA 实例(一)计算总数的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java idea 模块_IDEA搭建j
- 下一篇: java 异常 最佳实践_关于JAVA异