部署Azkaban多节点分布式模式
簡單介紹:
Azkaban是由Linkedin公司推出的一個批量工作流任務調度器,用于在一個工作流內以一個特定的順序運行一組工作和流程。Azkaban使用job配置文件建立任務之間的依賴關系,并提供一個易于使用的web用戶界面維護和跟蹤你的工作流。 它有三個重要組件:
- 關系數據庫(目前僅支持mysql)
- web管理服務器-AzkabanWebServer
- 執行服務器-AzkabanExecutorServer
Azkaban使用MySQL來存儲它的狀態信息,Azkaban Executor Server和Azkaban Web Server均使用到了MySQL數據庫。
AzkabanExecutorServer在如下幾個方面使用到了數據庫:
- 獲取project的信息
- 執行工作流
- 存儲工作流運行日志
- 如果一個工作流在不同的執行器上運行,它將從DB中獲取狀態。
AzkabanWebServer在如下幾個方面使用到了數據庫:
- Project管理
- 跟蹤工作流執行進度
- 訪問歷史工作流的運行信息
- 定時執行工作流任務
- 記錄所有sla規則
?
AzkabanWebServer
AzkabanWebserver是整個Azkaban工作流系統的主要管理者,它負責project管理、用戶登錄認證、定時執行工作流、跟蹤工作流執 行進度等一系列任務。同時,它還提供Web服務操作的接口,利用該接口,用戶可以使用curl或其他ajax的方式,來執行azkaban的相關操作。操作包括:用戶登錄、創建project、上傳workflow、執行workflow、查詢workflow的執行進度、殺掉workflow等一系列操作,且這些操作的返回結果均是json的格式。
AzkabanExecutorServer
之所以將AzkabanWebServer和AzkabanExecutorServer分開,主要是因為在某個任務流失敗后,可以更方便的將重新執行。而且也更有利于Azkaban系統的升級。
?
注意:安裝sqoop的節點都要安裝azkaban
環境配置:由于azkaban3.0以上沒有相應的安裝包,需要從源碼進行編譯。編譯的環境需要安裝jdk8。
分布式模式:集群內應當安裝三個exec-server和一個web-server,相關組件分配如下:
bigdata243 ? ? ?azkaban-exec
bigdata244 ? ? ?azkaban-exec
bigdata245 ? ? ?azkaban-web-server azkaban-exec-server mysql
?
azkaban-web目錄
bin 啟動腳本存放目錄
conf 配置文件存放目錄(沒有的話從solo-server的目錄中拷貝過來)
lib 依賴jar包存放目錄
extlib 附加jar包存放目錄(沒有的話手動創建)
plugins 插件安裝目錄
web web資源文件
logs 日志存儲目錄
sql sql資源
?
azkaban-exec目錄
bin 啟動腳本存放目錄
conf 配置文件存放目錄(沒有的話從solo-server的目錄中拷貝過來)
lib 依賴jar包存放目錄
extlib 附加jar包存放目錄(沒有的話手動創建)
plugins 插件安裝目錄
?
編譯,安裝過程
官網下載:3.47版本
進入到azkaban下面編譯:[hadoop@bigdata245 azkaban-3.47.0]$ ./gradlew distTar
編譯結果為:
azkaban-common : 常用工具類。
azkaban-db : 對應的sql腳本
azkaban-hadoop-secutity-plugin : hadoop 有關kerberos插件
azkaban-solo-server: web和executor 一起的項目。
azkaban-web/executor-server:azkaban的 web和executor的server信息
azkaban-spi: azkaban存儲接口以及exception類
編譯完成后:db、web、exec、solo四個目錄的build/distributions/下生成其壓縮包
將壓縮包拷貝到:新建文件夾:mkdir azkaban
cp azkaban-db-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-web-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
?
解壓重命名
tar -zxvf azkaban-web-server-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-db-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz
mv azkaban-db-0.1.0-SNAPSHOT azkaban-db
mv azkaban-web-server-0.1.0-SNAPSHOT azkaban-web
mv azkaban-solo-server-0.1.0-SNAPSHOT azkaban-solo
mv azkaban-exec-server-0.1.0-SNAPSHOT azkaban-exec
創建Azkaban元數據庫:登錄mysql,執行如下語句
mysql> create database azkaban_matadata;
Query OK, 1 row affected (0.00 sec)
mysql> use azkaban_matadata;
Database changed
mysql> source /home/hadoop/app/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql (會創建所有表)
配置keystore
在azkaban-web/bin目錄下執行這條命令,在執行完這條命令之后,會生成一個文件:keystore.使用keytool創建SSL配置,keytool是JDK提供的一個工具,輸入如下命令,可以查看
[root@bigdata245 ~]# find / -name keytool
/home/hadoop/app/jdk1.8/bin/keytool
/home/hadoop/app/jdk1.8/jre/bin/keytool
?
執行命令創建SSL配置
[hadoop@bigdata245 bin]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
輸入密鑰庫口令: azkaban
再次輸入新口令: azkaban
您的名字與姓氏是什么? [Unknown]: 略過
您的組織單位名稱是什么? [Unknown]: 略過
您的組織名稱是什么? [Unknown]: 略過
您所在的城市或區域名稱是什么? [Unknown]: 略過
您所在的省/市/自治區名稱是什么? [Unknown]: 略過
該單位的雙字母國家/地區代碼是什么? [Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN是否正確?
[否]: Y
輸入 <jetty> 的密鑰口令 (如果和密鑰庫口令相同, 按回車):
?
將azkaban-solo下的conf plugins 和sql文件夾拷貝到azkaban-web目錄下
[hadoop@bigdata245 azkaban-solo]$ cp -a conf/ plugins/ sql/ /home/hadoop/app/azkaban/azkaban-web/
?
配置web-server
配置azkaban-web/conf/azkaban.properties
# Azkaban Personalization Settings azkaban.name=bigdata245 # 服務器UI名稱,用于服務器上方顯示的名字 azkaban.label=Aliyun bigdata245 Azkaban # 描述信息 azkaban.color=#FF3601 # 顏色 azkaban.default.servlet.path=/index web.resource.dir=/home/hadoop/app/azkaban/azkaban-web/web/ #默認跟web目錄,設置為絕對路徑 default.timezone.id=Asia/Shanghai # 時區,默認為美國America/Los_Angeles # Azkaban UserManager class user.manager.class=azkaban.user.XmlUserManager #用戶權限管理默認類 user.manager.xml.file=/home/hadoop/app/azkaban/azkaban-web/conf/azkaban-users.xml #用戶配置,具體配置參見下文 # Loader for projects executor.global.properties=/home/hadoop/app/azkaban/azkaban-web/conf/global.properties #globa配置文件所在位置 azkaban.project.dir=projectsdatabase.type=mysql # 數據庫類型 mysql.port=3306 # 端口 mysql.host=245 # 數據庫連接IP mysql.database=azkaban_matadata # 數據庫實例名 mysql.user=root # 數據庫用戶名 mysql.password=P@ssw0rd # 數據庫密碼 mysql.numconnections=100 # 最大連接數 h2.path=./h2 h2.create.tables=true # Velocity dev mode velocity.dev.mode=false # Azkaban Jetty server properties. jetty.use.ssl=false jetty.maxThreads=25 #最大線程數 jetty.port=8081 #jetty端口 jetty.ssl.port=8443 #jetty ssl端口號 jetty.keystore=/home/hadoop/app/azkaban/azkaban-web/bin/keystore #ssl的文件名,絕對路徑 jetty.password=azkaban #ssl文件密碼 jetty.keypassword=azkaban #jetty主密碼與keystore文件相同 jetty.truststore=keystore #SSL文件名 jetty.trustpassword=azkaban #SSL文件密碼 # Azkaban Executor settings executor.port=12321 #執行服務器端口 # mail settings mail.sender= #發送郵箱 mail.host= #發送郵箱smtp地址 # User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users. # enduser -> myazkabanhost:443 -> proxy -> localhost:8081 # when this parameters set then these parameters are used to generate email links. # if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used. # azkaban.webserver.external_hostname=myazkabanhost.com # azkaban.webserver.external_ssl_port=443 # azkaban.webserver.external_port=8081 job.failure.email= job.success.email= lockdown.create.projects=false cache.directory=cache #緩存目錄 # JMX stats jetty.connector.stats=true executor.connector.stats=true # Azkaban plugin settings azkaban.jobtype.plugin.dir=/home/hadoop/app/azkaban/azkaban-web/plugins/jobtypes 端口號使用規則:jetty.ssl.port > jetty.port。但是使用jetty.ssl.port的前提是jetty.use.ssl=true。這個配置表示開啟ssl【Secure Sockets Layer】安全套接層,否則使用jetty.port端口。?
在azkaban-web/conf目錄下添加log4j.properties
[hadoop@bigdata245 conf]$ touch log4j.propertieslog4j.rootLogger=INFO,C log4j.appender.C=org.apache.log4j.ConsoleAppender log4j.appender.C.Target=System.err log4j.appender.C.layout=org.apache.log4j.PatternLayout log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n?
添加MySQL驅動在azkaban-web目錄下創建文件夾:mkdir extlib?
將lib目錄下的mysql驅動復制到extlib目錄下
[hadoop@bigdata245 azkaban-web]$ cp lib/mysql-connector-java-5.1.28.jar extlib/
?
添加管理員用戶以及密碼
進入azkaban-web/conf目錄,修改azkaban-users.xml,這個文件存放用戶登錄信息以及權限信息。同時增加管理員用戶admin
<user username="admin" password="admin" roles="admin"/>
azkaban-web目錄下創建logs文件用于存放日志文件 # mkdir logs
注意:多個執行器模式也就是分布式執行模式下運行,需要在webserver配置中啟用多個執行器模式。確認在azkaban.properties中具有以下屬性。azkaban.use.multiple.executors和azkaban.executorselector.comparator。*是必需的屬性。
注意:azkaban.use.multiple.executors?多重執行模式不予以尊重
配置多節點執行服務器在azkaban-web/conf/azkaban.properties里添加
azkaban.use.multiple.executors =true azkaban.executorselector.filters = StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus azkaban.executorselector.comparator.NumberOfAssignedFlowComparator = 1 azkaban.executorselector.comparator.Memory = 1 azkaban.executorselector.comparator.LastDispatched = 1 azkaban.executorselector.comparator.CpuUsage = 1以確認使用的是分布式方式,隨后提交的job會根據情況自行選擇執行服務器,否則默認只使用本地執行服務器。?
?
?
?
?
?
?
?
?
配置exec-server
拷貝azkaban-web目錄下的conf和extlib到azkaban-web目錄下
cp -a conf/ extlib/ /home/hadoop/app/azkaban/azkaban-exec/
配置azkaban-web/conf/azkaban.properties
default.timezone.id=Asia/Shanghai # Loader for projects executor.global.properties=/home/hadoop/app/azkaban/azkaban-exec/conf/global.properties azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects # Azkaban plugin settings azkaban.jobtype.plugin.dir=plugins/jobtypes database.type=mysql mysql.port=3306 mysql.host=245 mysql.database=azkaban_matadata mysql.user=root mysql.password=P@ssw0rd mysql.numconnections=100 # Azkaban Executor settings executor.maxThreads=50 executor.port=12321 executor.flow.threads=25 #分布式節點必配 azkaban.use.multiple.executors=true azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1 azkaban.executorselector.comparator.Memory=1 azkaban.executorselector.comparator.LastDispatched=1 azkaban.executorselector.comparator.CpuUsage=1在azkaban-exec/conf目錄下添加log4j.properties
[hadoop@bigdata245 conf]$ touch log4j.propertieslog4j.rootLogger=INFO,Clog4j.appender.C=org.apache.log4j.ConsoleAppenderlog4j.appender.C.Target=System.errlog4j.appender.C.layout=org.apache.log4j.PatternLayoutlog4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n?
在mysql的azkaban庫中添加各個執行服務器的ip/域名和端口:
配置多執行器模式的執行程序,目前沒有執行程序管理UI。需要在數據庫中配置執行程序。需要將所有執行程序插入mysql DB以進行執行程序設置。驗證執行程序表中的正確執行程序是否處于活動狀態。
>insert into executors(host,port) values("bigdata245",3306);>insert into executors(host,port) values("bigdata244",3306);>insert into executors(host,port) values("bigdata243",3306);?
啟動,先啟動exec-server(執行器),然后啟動web-server(web服務)
cd azkaban-exec/bin:./start-exec.sh
cd azkaban-web/bin:./start-web.sh
注意:在bin目錄下啟動會生成一堆文件,如果用腳本啟動注意修改配置路勁
啟動完成后,三臺節點下可以查看到對應的進程
AzkabanExecutorServer 3
AzkabanWebServer 1
問題1;
The last packet sent successfully to the server was 0 milliseconds ago.?The driver has not received any packets from the server.)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
如果出現這兩個問題,去配置文件查看mysql鏈接是否出錯,還有mysql配置執行服務器的語句是否有問題
?
訪問Azkaban UI界面
http://bigdata245:8081/index
輸入用戶名密碼azkaban/azkaban登錄?
?
修改如下配置(azkaban默認啟動規則是在哪里啟動在哪里生成一堆文件)
exec/bin
[hadoop@bigdata243 bin]$ cat start-exec.sh
#!/bin/bash
script_dir=$(dirname $0)
# pass along command line arguments to the internal launch script.
${script_dir}/internal/internal-start-executor.sh "$@" >/home/hadoop/app/azkaban/azkaban-exec/bin/executorServerLog__`date +%F+%T`.out 2>&1 &
[hadoop@bigdata243 bin]$ pwd
/home/hadoop/app/azkaban/azkaban-exec/bin
web/bin
[hadoop@bigdata245 bin]$ pwd
/home/hadoop/app/azkaban/azkaban-web/bin
[hadoop@bigdata245 bin]$ cat start-web.sh
#!/bin/bash
script_dir=$(dirname $0)
${script_dir}/internal/internal-start-web.sh >/home/hadoop/app/azkaban/azkaban-web/bin/webServerLog_`date +%F+%T`.out 2>&1 &
?
配置azkaban-exec/conf/azkaban.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects
?
配置azkaban-web/conf/azkaban.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-web/bin/projects
?
Azkaban測試及使用
projects:最重要的部分,創建一個工程,所有flows將在工程中運行。?
Scheduling:顯示定時任務?
Executing:顯示當前運行的任務?
History:顯示歷史運行任務
主要介紹Projects部分,在創建工程前,我們先了解下之間的關系,一個工程包含一個或多個flows,一個flow包含多個job。job是你想在azkaban中運行的一個進程,可以是簡單的linux命令,可是java程序,也可以是復雜的shell腳本、或者python腳本,當然,如果你安裝相關插件,也可以運行插件。一個job可以依賴于另一個job,這種多個job和它們的依賴組成的圖表叫做flow。
web-server節點:負責項目作業管理(上傳和分發)?
exec-server節點:負責具體執行的executor會解析job文件
一、commond 類型單一Job
1.創建工程
Flows:工作流程,有多個job組成?
Permissions:權限管理?
Project Logs:工程日志
2.創建Job
job就是一個以.job結尾的文本文件,例如創建一個job,名為hello.job,用于打印hello azkaban
3.打包
將創建的job打包成.zip壓縮文件,注意只能是.zip格式?
4.使用Azkaban UI 界面創建project并上傳壓縮包
點擊Execute執行?
執行后,點擊Detail,查看日志
?
azkaban-exec/plugins/jobtypes/commonprivate.properties配置文件,內容中添加:azkaban.native.lib=false
關閉重啟服務
如果還不行,編譯源碼
源碼路徑:/home/hadoop/app/compile_azkaban3.47/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
修改如下:final boolean isExecuteAsUser = this.sysProps.getBoolean(EXECUTE_AS_USER, false);
?
重新編譯之后將azkaban/azkaban-exec-server/build/distributions目錄下的azkaban-exec-server-3.48.0-8-gdc851ec.tar.gz 解壓重命名,然后再修改配置替換舊的azkaban-exec-server,最后重啟exec和web服務即可
再次運行就好了
二、commond 類型多JOb 工作流 flow
1.創建項目
首先,創建一個項目,名為 Com_Job
2.job 創建
?
假設有這么一種場景:
(1).task1 依賴 task2
(2).task2 依賴 task3
(3).task3 依賴 task4
說明:假設task1是一個計算指標任務,task2 給 task1 提供執行需要的基礎數據
task3 給 task2 提供數據,以此類推。
?
3.flow 創建
?
多個jobs和它們的依賴組成flow。怎么創建依賴,只要指定dependencies參數就行了
定義4個job:
(1).run_task1.job:計算業務指標數據
(2).run_task2.job:計算task1所需要的數據
(3).run_task3.job:計算task2所需要的數據
(4).run_task4.job:從 slaves 中抽取源數據
?
依賴關系:
task1 依賴 task2,task2 依賴 task3,task3 依賴 task4
?
4個job文件內容如下(這里以執行python為例)
# run_task1.job
type = command
command = python /home/hadoop/pyshell/run_task1.py
dependencies = run_task2
?
# run_task2.job
type = command
command = python /home/hadoop/pyshell/run_task2.py
dependencies = run_task3
?
# run_task3.job
type = command
command = python /home/hadoop/pyshell/run_task3.py
dependencies = run_task4
?
# run_task4.job
type = command
command = python /home/hadoop/pyshell/run_task4.py
?
創建python腳本
[hadoop@bigdata245 pyshell]$ touch run_task1.py
[hadoop@bigdata245 pyshell]$ touch run_task2.py
[hadoop@bigdata245 pyshell]$ touch run_task3.py
[hadoop@bigdata245 pyshell]$ touch run_task4.py
?
4個文件內容如下
run_task1.py
?
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task1:計算業務指標數據...")
?
run_task2.py
?
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task2:計算基礎數據,為task1提供數據")
?
run_task3.py
?
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task3:數據清洗,為task2提供數據")
?
run_task4.py
?
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task4:從Slaves中抽取源數據")
3.將上述 job 打成zip包,上傳至 azkaban
上傳完成后,點擊右側Execute Flow按鈕,查看流程視圖?
Flow view:流程視圖??梢越?#xff0c;啟用某些job
Notification:定義任務成功或者失敗是否發送郵件
Failure Options:定義一個job失敗,剩下的job怎么執行
Concurrent:并行任務執行設置
Flow Parametters:參數設置。
4.執行
(1).執行一次,點擊右下角Execute?
(2).定時執行,點擊左下角Schedule?
設置完成后,執行右下角schedule,即完成調度配置,azkaban這里的配置與linux下的crontab類似?
想要查看job的調度列表,切換到Schedule菜單即可
5.查看項目flow中各個Job的執行情況
?
綠色代表成功,藍色是運行,紅色是失敗??梢圆榭磈ob運行時間,依賴和日志,點擊details可以查看各個job運行情況
三、MapReduce 任務
Azkaban 執行 MapReduce 任務,我們以 WordCount 為例
1.準備數據
[hadoop@bigdata245 ~]$ hadoop fs -mkdir -p /azkaban/input
[hadoop@bigdata245 data]$ hadoop fs -put words.txt /azkaban/input
使用hadoop提供的jar統計單詞數量
[hadoop@bigdata245 mapreduce]$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar wordcount /azkaban/input/* /azkaban/outputs/
運行結果
?
2.創建項目
3.job創建
job
# mapreduce_wordcount.job
type = command
command=sh /home/hadoop/pyshell/wordcount.sh
?
4.打包上傳,執行
5.查看運行結果
azkaban上打印的日志顯示已經成功?
四、Hive 腳本任務
1.創建項目
hive_export_to_mysql
2.job創建
我們要完成,hive中創建表,加載數據,然后導出數據到mysql,分為兩個job?
hive_task1:將hive中的數據導出到mysql中?
hive_task2:hive中創建表,加載數據?
依賴關系:hive_task1 依賴 hive_task2
3.flow創建
job 文件內容如下
# hive_task1.job
type = command
command = sh /home/hadoop/pyshell/hive_task1.sh
dependencies = hive_task2
?
# hive_task2.job
type = command
command = sh /home/hadoop/pyshell/hive_task2.sh
?
腳本內容如下
?
[hadoop@bigdata245 pyshell]$ cat hive_task1.sh
#!/bin/bash
/home/hadoop/app/sqoop1/bin/sqoop export \
--connect jdbc:mysql://bigdata245:3306/sqoop \
--username root --password P@ssw0rd \
--table EMP \
--export-dir /user/hive/warehouse/test.db/emp \
--input-fields-terminated-by ',' \
--input-null-string 'null' --input-null-non-string 'null' \
-m 1
?
[hadoop@bigdata245 pyshell]$ cat hive_task2
#!/bin/bash
hive -f /home/hadoop/pyshell/test.sql
?
sql文件 test.sql內容如下
[hadoop@bigdata245 pyshell]$ cat test.sql
create database if not exists test;
use test;
drop table if exists emp;
create table emp(
empno int,
ename string,
job string
)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/pyshell/emp.txt' overwrite into table emp;
?
emp.txt文件內容如下
[hadoop@bigdata245 pyshell]$ cat emp.txt
1001,Tom,Java
1002,Jack,PHP
1003,Harvey,BigData
1004,David,IOS
1005,Kett,DBA
4.打包上傳
5.執行,查看運行結果
執行前記得先在mysql中創建表emp,sql語句如下
DROP TABLE IF EXISTS `EMP`;
CREATE TABLE `EMP` (
`empno` int(11) DEFAULT NULL,
`ename` varchar(255) DEFAULT NULL,
`job` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS=1;
?
?
?
?
?
總結
以上是生活随笔為你收集整理的部署Azkaban多节点分布式模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux--文件结构体struct f
- 下一篇: Linux 字符设备驱动结构(二)——