ES-Hadoop学习之ES和HDFS数据交换
ES作為強大的搜索引擎,HDFS是分布式文件系統。ES可以將自身的Document導入到HDFS中用作備份,ES也可以將存儲在HDFS上的結構化文件導入為ES的中的Document。而ES-Hadoop正是這兩者之間的一個connector
1,將數據從ES導出到HDFS
1.1,數據準備,在ES中創建Index和Type,并創建document。在我的例子中,Index是mydata,type是person,創建了兩條如下圖所示的document
1.2 在項目中引入ES-Hadoop庫
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>5.5.2</version> </dependency>值得注意的是,上面的dependency只會引入ES-Hadoop相關的Jar包,和Hadoop相關的包,例如hadoop-common, hadoop-hdfs等等,依然還需要添加依賴。
1.3,創建從ES到Hadoop的數據遷移的Mapper類
這個Mapper非常簡單,它并沒有對從ES獲取的數據進行任何的處理,只是寫到了context中。map方法中,參數key的值,就是ES中document的id的值,參數value是一個LinkedMapWritable,它包含的就是一個document的內容。只是在這個mapper中,我們沒有處理document,而是直接輸出。
1.4,創建從ES到Hadoop的數據遷移的Job類
package com.wjm.es_hadoop.example1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.elasticsearch.hadoop.mr.EsInputFormat; import org.elasticsearch.hadoop.mr.LinkedMapWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class E2HJob01 {private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class); public static void main(String[] args) {try {Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); //ElasticSearch節點 conf.set("es.nodes", "192.168.8.194:9200"); //ElaticSearch Index/Type conf.set("es.resource", "mydata/person/"); if (args.length != 1) {LOG.error("error : " + args.length); System.exit(2); }Job job = Job.getInstance(conf, "JOBE2H01"); job.setJarByClass(E2HJob01.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(E2HMapper01.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LinkedMapWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); System.out.println(job.waitForCompletion(true)); } catch (Exception e) {LOG.error(e.getMessage(), e); }} }這個Job有兩點需要注意一下:
1,它沒有reducer,因為就是數據的透傳,不需要reduce過程。
2,InputFormatClass被設置為EsInputFormat,正是這個類,負責將從ES讀出的數據,轉換成mapper的輸入參數(Text,LinkedMapWritable)
1.5,打包運行
以下面的命令來啟動MapReduce任務:
hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.E2HJob01 hdfs://bigdata-191:8020/wangjinming
執行完這個命令之后,看到/wangjinming目錄下面產生了文件
查看其中一個文件,會發現數據被分為兩列,第一列為id,第二列為document的內容
另外,在運行hadoop jar命令的時候,需要把es-hadoop的jar包放到hadoop jar能訪問到的classpath下面。我查了一些方法都沒成功,最后使用了一個笨方法,用hadoop classpath方法查看hadoop的classpath有哪些,然后將es-hadoop相關的jar包copy到其中一個目錄下。
2,將數據從HDFS中導入到ES中。
2.1,數據準備。創建下面的這樣一個文件并put到hdfs文件系統中(我放在hdfs://bigdata-191:8020/input/perosn)
{"id":"3", "name":"jerry", "age":"23", "info":"hello hadoop"}
{"id":"4", "name":"russell", "age":"15", "info":"hello elasticsearch"}
2.2,Mapper編寫
package com.wjm.es_hadoop.example1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; class H2EMapper01 extends Mapper<LongWritable, Text, NullWritable, Text> {@Override protected void setup(Context context) throws IOException, InterruptedException {super.setup(context); }@Override public void run(Context context) throws IOException, InterruptedException {super.run(context); }@Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {context.write(NullWritable.get(), value); }@Override protected void cleanup(Context context) throws IOException,InterruptedException {super.cleanup(context); }}這個Mapper也很簡單,只是把從HDFS中讀取到的數據透傳給ES。因為Mapper的input是一個HDFS文件,所以,mapper的入參跟其他從hdfs多數據的mapper沒有任何區別。寫入到context的是,入參的key值是沒有意義的,所以忽略掉,直接把Text類型的value寫入到context就可以了。
2.3,編寫job
這個Job有幾個需要注意的地方
es.input.json參數設置為true告訴ES-Hadoop,mapper輸出的結果是一個json格式的Text。
es.mapping.id參數指定json對象中那種field對應的值為es中document的id
OutputFormatClass被設置為EsOutputFormat,正是這個類負責將MapReduce的輸出結果(一個json格式的Text)轉換為ES的ID和document的內容
2.4,執行命令
hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.H2EJob01 hdfs://bigdata-191:8020/input/person
命令成功執行之后,可以通過ES的命令看到數據已經在ES中創建了相應的document
總結
以上是生活随笔為你收集整理的ES-Hadoop学习之ES和HDFS数据交换的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高性能线程间队列 DISRUPTOR 简
- 下一篇: elasticsearch-jdbc实现