Hadoop辅助工具——Flume、Sqoop
前言
在一個(gè)完整的離線大數(shù)據(jù)處理系統(tǒng)中,除了hdfs+mapreduce+hive組成分析系統(tǒng)的核心之外,還需要數(shù)據(jù)采集、結(jié)果數(shù)據(jù)導(dǎo)出、任務(wù)調(diào)度等不可或缺的輔助系統(tǒng),而這些輔助工具在hadoop生態(tài)體系中都有便捷的開(kāi)源框架,如圖所示:
1. Flume日志采集框架
1.1 Flume介紹
1.1.1 概述
- Flume是一個(gè)分布式、可靠、和高可用的海量日志采集、聚合和傳輸?shù)南到y(tǒng)。
- Flume可以采集文件,socket數(shù)據(jù)包、文件、文件夾、kafka等各種形式源數(shù)據(jù),又可以將采集到的數(shù)據(jù)(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部存儲(chǔ)系統(tǒng)中
- 一般的采集需求,通過(guò)對(duì)flume的簡(jiǎn)單配置即可實(shí)現(xiàn)
- Flume針對(duì)特殊場(chǎng)景也具備良好的自定義擴(kuò)展能力
因此,flume可以適用于大部分的日常數(shù)據(jù)采集場(chǎng)景。
1.1.2 運(yùn)行機(jī)制
a)?Source:采集組件,用于跟數(shù)據(jù)源對(duì)接,以獲取數(shù)據(jù)
b)?Sink:下沉組件,用于往下一級(jí)agent傳遞數(shù)據(jù)或者往最終存儲(chǔ)系統(tǒng)傳遞數(shù)據(jù)
c)?Channel:傳輸通道組件,用于從source將數(shù)據(jù)傳遞到sink
?
1.1.3 Flume采集系統(tǒng)結(jié)構(gòu)圖
?1. 簡(jiǎn)單結(jié)構(gòu)——單個(gè)agent采集數(shù)據(jù)
?
2. 復(fù)雜結(jié)構(gòu)——多級(jí)agent之間串聯(lián)
?
?
1.2 Flume實(shí)戰(zhàn)案例
1.2.1 Flume的安裝部署
1、Flume的安裝非常簡(jiǎn)單,只需要解壓即可,當(dāng)然,前提是已有hadoop環(huán)境。上傳安裝包到數(shù)據(jù)源所在節(jié)點(diǎn)上
然后解壓 ?tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后進(jìn)入flume的目錄,修改conf下的flume-env.sh,在里面配置JAVA_HOME
2、根據(jù)數(shù)據(jù)采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)
3、指定采集方案配置文件,在相應(yīng)的節(jié)點(diǎn)上啟動(dòng)flume agent
先用一個(gè)最簡(jiǎn)單的例子來(lái)測(cè)試一下程序環(huán)境是否正常
1、先在flume的conf目錄下新建一個(gè)配置文件(采集方案)
vi ??netcat-logger.properties
# 定義這個(gè)agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 描述和配置source組件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# 描述和配置sink組件:k1 a1.sinks.k1.type = logger# 描述和配置channel組件,此處使用是內(nèi)存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之間的連接關(guān)系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12、啟動(dòng)agent去采集數(shù)據(jù)
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console-c conf ??指定flume自身的配置文件所在目錄
-f conf/netcat-logger.con ?指定我們所描述的采集方案
-n a1 ?指定我們這個(gè)agent的名字
3、測(cè)試
先要往agent的source所監(jiān)聽(tīng)的端口上發(fā)送數(shù)據(jù),讓agent有數(shù)據(jù)可采。隨便在一個(gè)能跟agent節(jié)點(diǎn)聯(lián)網(wǎng)的機(jī)器上。
telnet anget-hostname ?port ??(telnet localhost 44444)?
1.2.2 采集案例
1、采集日志目錄中的文件到HDFS
結(jié)構(gòu)示意圖:
采集需求:某服務(wù)器的某特定目錄下,會(huì)不斷產(chǎn)生新的文件,每當(dāng)有新文件出現(xiàn),就需要把文件采集到HDFS中去
根據(jù)需求,首先定義以下3大要素
spooldir特性:
1、監(jiān)視一個(gè)目錄,只要目錄中出現(xiàn)新文件,就會(huì)采集文件中的內(nèi)容
2、采集完成的文件,會(huì)被agent自動(dòng)添加一個(gè)后綴:COMPLETED
3、所監(jiān)視的目錄中不允許重復(fù)出現(xiàn)相同文件名的文件
配置文件編寫(xiě):
#定義三大組件的名稱(chēng) agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1# 配置source組件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false#配置攔截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname# 配置sink組件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1Channel參數(shù)解釋:
capacity:默認(rèn)該通道中最大的可以存儲(chǔ)的event數(shù)量
trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數(shù)量
keep-alive:event添加到通道中或者移出的允許時(shí)間
?
測(cè)試階段,啟動(dòng)flume agent的命令:
bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=DEBUG,console-D后面跟的是log4j的參數(shù),用于測(cè)試觀察
生產(chǎn)中,啟動(dòng)flume,應(yīng)該把flume啟動(dòng)在后臺(tái):
nohup bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 1>/dev/null 2>&1 &2、采集文件到HDFS
采集需求:比如業(yè)務(wù)系統(tǒng)使用log4j生成的日志,日志內(nèi)容不斷增加,需要把追加到日志文件中的數(shù)據(jù)實(shí)時(shí)采集到hdfs
根據(jù)需求,首先定義以下3大要素
配置文件編寫(xiě):
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1# Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1#configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname# Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel13、兩個(gè)agent級(jí)聯(lián)
1.3 更多source和sink組件
Flume支持眾多的source和sink類(lèi)型,詳細(xì)手冊(cè)可參考官方文檔?http://flume.apache.org/FlumeUserGuide.html
2. sqoop數(shù)據(jù)遷移工具
2.1 概述
sqoop是apache旗下一款“Hadoop和關(guān)系數(shù)據(jù)庫(kù)服務(wù)器之間傳送數(shù)據(jù)”的工具。
導(dǎo)入數(shù)據(jù):MySQL,Oracle導(dǎo)入數(shù)據(jù)到Hadoop的HDFS、HIVE、HBASE等數(shù)據(jù)存儲(chǔ)系統(tǒng);
導(dǎo)出數(shù)據(jù):從Hadoop的文件系統(tǒng)中導(dǎo)出數(shù)據(jù)到關(guān)系數(shù)據(jù)庫(kù)mysql等
2.2 工作機(jī)制
將導(dǎo)入或?qū)С雒罘g成mapreduce程序來(lái)實(shí)現(xiàn),在翻譯出的mapreduce中主要是對(duì)inputformat和outputformat進(jìn)行定制。
2.3 sqoop實(shí)戰(zhàn)及原理
2.3.1 sqoop安裝
安裝sqoop的前提是已經(jīng)具備java和hadoop的環(huán)境
1、下載并解壓
最新版下載地址http://ftp.wayne.edu/apache/sqoop/1.4.6/
2、修改配置文件
$ cd $SQOOP_HOME/conf
$ mv sqoop-env-template.sh sqoop-env.sh
打開(kāi)sqoop-env.sh并編輯下面幾行:
export HADOOP_COMMON_HOME=/home/hadoop/apps/hadoop-2.6.1/ export HADOOP_MAPRED_HOME=/home/hadoop/apps/hadoop-2.6.1/ export HIVE_HOME=/home/hadoop/apps/hive-1.2.13、加入mysql的jdbc驅(qū)動(dòng)包
cp ?~/app/hive/lib/mysql-connector-java-5.1.28.jar ??$SQOOP_HOME/lib/
4、驗(yàn)證啟動(dòng)
$ cd $SQOOP_HOME/bin
$ sqoop-version
預(yù)期的輸出:
15/12/17 14:52:32 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6
Sqoop 1.4.6?git commit id 5b34accaca7de251fc91161733f906af2eddbe83
Compiled by abe on Fri Aug 1 11:19:26 PDT 2015
到這里,整個(gè)Sqoop安裝工作完成。
?
驗(yàn)證sqoop到mysql業(yè)務(wù)庫(kù)之間的連通性:
bin/sqoop-list-databases --connect jdbc:mysql://localhost:3306 --username root --password root
bin/sqoop-list-tables --connect jdbc:mysql://localhost:3306/userdb?--username root --password root
2.4 Sqoop的數(shù)據(jù)導(dǎo)入
“導(dǎo)入工具”導(dǎo)入單個(gè)表從RDBMS到HDFS。表中的每一行被視為HDFS的記錄。所有記錄都存儲(chǔ)為文本文件的文本數(shù)據(jù)(或者Avro、sequence文件等二進(jìn)制數(shù)據(jù))?
2.4.1 語(yǔ)法
下面的語(yǔ)法用于將數(shù)據(jù)導(dǎo)入HDFS
$ sqoop import (generic-args) (import-args)2.4.2 示例
表數(shù)據(jù)
在mysql中有一個(gè)庫(kù)userdb中三個(gè)表:emp,?emp_add和emp_conn
表emp:
| id | name | deg | salary | dept |
| 1201 | gopal | manager | 50,000 | TP |
| 1202 | manisha | Proof reader | 50,000 | TP |
| 1203 | khalil | php dev | 30,000 | AC |
| 1204 | prasanth | php dev | 30,000 | AC |
| 1205 | kranthi | admin | 20,000 | TP |
表emp_add:
| id | hno | street | city |
| 1201 | 288A | vgiri | jublee |
| 1202 | 108I | aoc | sec-bad |
| 1203 | 144Z | pgutta | hyd |
| 1204 | 78B | old city | sec-bad |
| 1205 | 720X | hitec | sec-bad |
表emp_conn:
| id | phno | |
| 1201 | 2356742 | gopal@tp.com |
| 1202 | 1661663 | manisha@tp.com |
| 1203 | 8887776 | khalil@ac.com |
| 1204 | 9988774 | prasanth@ac.com |
| 1205 | 1231231 | kranthi@tp.com |
導(dǎo)入表表數(shù)據(jù)到HDFS
下面的命令用于從MySQL數(shù)據(jù)庫(kù)服務(wù)器中的emp表導(dǎo)入HDFS
如果成功執(zhí)行,那么會(huì)得到下面的輸出
14/12/22 15:24:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5 14/12/22 15:24:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/cebe706d23ebb1fd99c1f063ad51ebd7/emp.jar ----------------------------------------------------- O mapreduce.Job: map 0% reduce 0% 14/12/22 15:28:08 INFO mapreduce.Job: map 100% reduce 0% 14/12/22 15:28:16 INFO mapreduce.Job: Job job_1419242001831_0001 completed successfully ----------------------------------------------------- ----------------------------------------------------- 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Transferred 145 bytes in 177.5849 seconds (0.8165 bytes/sec) 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Retrieved 5 records.為了驗(yàn)證在HDFS導(dǎo)入的數(shù)據(jù),請(qǐng)使用以下命令查看導(dǎo)入的數(shù)據(jù)
$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-00000emp表的數(shù)據(jù)和字段之間用逗號(hào)(,)表示。
1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP導(dǎo)入到HDFS指定目錄
在導(dǎo)入表數(shù)據(jù)到HDFS使用Sqoop導(dǎo)入工具,我們可以指定目標(biāo)目錄。
以下是指定目標(biāo)目錄選項(xiàng)的Sqoop導(dǎo)入命令的語(yǔ)法。
--target-dir <new or exist directory in HDFS>下面的命令是用來(lái)導(dǎo)入emp_add表數(shù)據(jù)到'/queryresult'目錄。
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --target-dir /queryresult \ --fields-terminated-by ‘\001’ \ --table emp --split-by id --m 1注意:如果報(bào)錯(cuò),說(shuō)emp類(lèi)找不到,則可以手動(dòng)從sqoop生成的編譯目錄(/tmp/sqoop-root/compile)中,找到這個(gè)emp.class和emp.jar,拷貝到sqoop的lib目錄下:
如果設(shè)置了 --m 1,則意味著只會(huì)啟動(dòng)一個(gè)maptask執(zhí)行數(shù)據(jù)導(dǎo)入
如果不設(shè)置 --m 1,則默認(rèn)為啟動(dòng)4個(gè)map task執(zhí)行數(shù)據(jù)導(dǎo)入,則需要指定一個(gè)列來(lái)作為劃分map task任務(wù)的依據(jù)
?
下面的命令是用來(lái)驗(yàn)證 /queryresult?目錄中 emp_add表導(dǎo)入的數(shù)據(jù)形式。
$HADOOP_HOME/bin/hadoop fs -cat /queryresult/part-m-*它會(huì)用逗號(hào)(,)分隔emp_add表的數(shù)據(jù)和字段。
1201, 288A, vgiri, jublee 1202, 108I, aoc, sec-bad 1203, 144Z, pgutta, hyd 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad導(dǎo)入關(guān)系表到HIVE
bin/sqoop import --connect jdbc:mysql://hdp-node-01:3306/test --username root --password root --table emp --hive-import --split-by id --m 1導(dǎo)入表數(shù)據(jù)子集
我們可以導(dǎo)入表的使用Sqoop導(dǎo)入工具,"where"子句的一個(gè)子集。它執(zhí)行在各自的數(shù)據(jù)庫(kù)服務(wù)器相應(yīng)的SQL查詢(xún),并將結(jié)果存儲(chǔ)在HDFS的目標(biāo)目錄。
where子句的語(yǔ)法如下。
--where <condition>下面的命令用來(lái)導(dǎo)入emp_add表數(shù)據(jù)的子集。子集查詢(xún)檢索員工ID和地址,居住城市為:Secunderabad
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --where "city ='sec-bad'" \ --target-dir /wherequery \ --table emp_add \--m 1按需導(dǎo)入
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --target-dir /wherequery2 \ --query 'select id,name,deg from emp WHERE id>1207 and $CONDITIONS' \ --split-by id \ --fields-terminated-by '\t' \ --m 2下面的命令用來(lái)驗(yàn)證數(shù)據(jù)從emp_add表導(dǎo)入/wherequery目錄
$HADOOP_HOME/bin/hadoop fs -cat /wherequery/part-m-*它用逗號(hào)(,)分隔 emp_add表數(shù)據(jù)和字段。
1202, 108I, aoc, sec-bad 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad增量導(dǎo)入
增量導(dǎo)入是僅導(dǎo)入新添加的表中的行的技術(shù)。
sqoop支持兩種增量MySql導(dǎo)入到hive的模式,
一種是append,即通過(guò)指定一個(gè)遞增的列,比如:
--incremental append ?--check-column num_id --last-value 0
另種是可以根據(jù)時(shí)間戳,比如:
--incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'
就是只導(dǎo)入created 比'2012-02-01 11:0:00'更大的數(shù)據(jù)。
?
1/ append模式
它需要添加‘incremental’, ‘check-column’, 和 ‘last-value’選項(xiàng)來(lái)執(zhí)行增量導(dǎo)入。
下面的語(yǔ)法用于Sqoop導(dǎo)入命令增量選項(xiàng)。
--incremental <mode> --check-column <column name> --last value <last check column value>假設(shè)新添加的數(shù)據(jù)轉(zhuǎn)換成emp表如下:
1206, satish p, grp des, 20000, GR
下面的命令用于在EMP表執(zhí)行增量導(dǎo)入。
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp --m 1 \ --incremental append \ --check-column id \ --last-value 1208以下命令用于從emp表導(dǎo)入HDFS?emp/?目錄的數(shù)據(jù)驗(yàn)證。
$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-* 1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR下面的命令是從表emp 用來(lái)查看修改或新添加的行
$HADOOP_HOME/bin/hadoop fs -cat /emp/part-m-*1 1206, satish p, grp des, 20000, GR2.5 Sqoop的數(shù)據(jù)導(dǎo)出
1/ 將數(shù)據(jù)從HDFS把文件導(dǎo)出到RDBMS數(shù)據(jù)庫(kù)
導(dǎo)出前,目標(biāo)表必須存在于目標(biāo)數(shù)據(jù)庫(kù)中。
語(yǔ)法
以下是export命令語(yǔ)法。
$ sqoop export (generic-args) (export-args)示例
數(shù)據(jù)是在HDFS?中“EMP/”目錄的emp_data文件中。所述emp_data如下:
1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR1、首先需要手動(dòng)創(chuàng)建mysql中的目標(biāo)表
$ mysql mysql> USE db; mysql> CREATE TABLE employee ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20),salary INT,dept VARCHAR(10));2、然后執(zhí)行導(dǎo)出命令
bin/sqoop export \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table employee \ --export-dir /user/hadoop/emp/3、驗(yàn)證表mysql命令行。
mysql>select * from employee; 如果給定的數(shù)據(jù)存儲(chǔ)成功,那么可以找到數(shù)據(jù)在如下的employee表。 +------+--------------+-------------+-------------------+--------+ | Id | Name | Designation | Salary | Dept | +------+--------------+-------------+-------------------+--------+ | 1201 | gopal | manager | 50000 | TP | | 1202 | manisha | preader | 50000 | TP | | 1203 | kalil | php dev | 30000 | AC | | 1204 | prasanth | php dev | 30000 | AC | | 1205 | kranthi | admin | 20000 | TP | | 1206 | satish p | grp des | 20000 | GR | +------+--------------+-------------+-------------------+--------+?
轉(zhuǎn)載于:https://www.cnblogs.com/zhangchao162/p/9896805.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Hadoop辅助工具——Flume、Sqoop的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微信小程序+微信公众号开发总结
- 下一篇: php实现异步请求