十八、可视化任务调度系统airflow
最近工作需要,使用airflow搭建了公司的ETL系統(tǒng),順帶在公司分享了一次airflow,整理成文,Enjoy!
1. airflow 介紹
1.1 airflow 是什么
Airflow is a platform to programmatically author, schedule and monitor workflows.
airflow 是一個編排、調(diào)度和監(jiān)控workflow的平臺,由Airbnb開源,現(xiàn)在在Apache Software Foundation 孵化。airflow 將workflow編排為tasks組成的DAGs,調(diào)度器在一組workers上按照指定的依賴關(guān)系執(zhí)行tasks。同時,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作,并且airflow提供了監(jiān)控和報警系統(tǒng)。
1.2 airflow 核心概念
通過將DAGs和Operators結(jié)合起來,用戶就可以創(chuàng)建各種復雜的 workflow了。
1.3 其它概念
2. 示例
先來看一個簡單的DAG。圖中每個節(jié)點表示一個task,所有tasks組成一個DAG,各個tasks之間的依賴關(guān)系可以根據(jù)節(jié)點之間的線看出來。
DAGs
2.1 實例化DAG
# -*- coding: UTF-8 -*-## 導入airflow需要的modules from airflow import DAG from datetime import datetime, timedeltadefault_args = {'owner': 'lxwei','depends_on_past': False, # 如上文依賴關(guān)系所示'start_date': datetime(2018, 1, 17), # DAGs都有個參數(shù)start_date,表示調(diào)度器調(diào)度的起始時間'email': ['lxwei@github.com'], # 用于alert'email_on_failure': True,'email_on_retry': False,'retries': 3, # 重試策略'retry_delay': timedelta(minutes=5) }dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')在創(chuàng)建DAGs時,我們可以顯示的給每個Task傳遞參數(shù),但通過default_args,我們可以定義一個默認參數(shù)用于創(chuàng)建tasks。
注意,schedule_interval 跟官方文檔不一致,官方文檔的方式已經(jīng)被deprecated。
2.2 定義依賴關(guān)系
這個依賴關(guān)系是我自己定義的,key表示某個taskId,value里的每個元素也表示一個taskId,其中,key依賴value里的所有task。
"dependencies": {"goods_sale_2": ["goods_sale_1"], # goods_sale_2 依賴 goods_sale1"shop_sale_1_2": ["shop_sale_1_1"],"shop_sale_2_2": ["shop_sale_2_1"],"shop_sale_2_3": ["shop_sale_2_2"],"etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],"goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_info": ["timelySalesCheck", "productDaySalesCheck"] }2.3 定義tasks和依賴關(guān)系
首先,實例化operators,構(gòu)造tasks。如代碼所示,其中,EtlTask、MySQLToWebDataTransfer、MySQLSelector 是自定義的三種Operator,根據(jù)taskType實例化operator,并存放到taskDict中,便于后期建立tasks之間的依賴關(guān)系。
for taskConf in tasksConfs:taskType = taskConf.get("taskType")if taskType == "etlTask":task = EtlTask(task_id=taskConf.get("taskId"),httpConnId=httpConn,etlId=taskConf.get("etlId"),dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLToWebDataTransfer":task = MySqlToWebdataTransfer(task_id = taskConf.get("taskId"),sql= taskConf.get("sql"),tableName=taskConf.get("tableName"),mysqlConnId =mysqlConn,httpConnId=httpConn,dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLSelect":task = StatusChecker(task_id = taskConf.get("taskId"),mysqlConnId = mysqlConn,sql = taskConf.get("sql"),dag = dag)taskDict[taskConf.get("taskId")] = taskelse:logging.error("error. TaskType is illegal.")構(gòu)建tasks之間的依賴關(guān)系,其中,dependencies中定義了上面的依賴關(guān)系,A >> B 表示A是B的父節(jié)點,相應的,A << B 表示A是B的子節(jié)點。
for sourceKey in dependencies:destTask = taskDict.get(sourceKey)sourceTaskKeys = dependencies.get(sourceKey)for key in sourceTaskKeys:sourceTask = taskDict.get(key)if (sourceTask != None and destTask != None):sourceTask >> destTask3. 常用命令
命令行輸入airflow -h,得到幫助文檔
backfill Run subsections of a DAG for a specified date range list_tasks List the tasks within a DAG clear Clear a set of task instance, as if they never ran pause Pause a DAG unpause Resume a paused DAG trigger_dag Trigger a DAG run pool CRUD operations on pools variables CRUD operations on variables kerberos Start a kerberos ticket renewer render Render a task instance's template(s) run Run a single task instance initdb Initialize the metadata database list_dags List all the DAGs dag_state Get the status of a dag run task_failed_deps Returns the unmet dependencies for a task instancefrom the perspective of the scheduler. In other words,why a task instance doesn't get scheduled and thenqueued by the scheduler, and then run by an executor). task_state Get the status of a task instance serve_logs Serve logs generate by worker test Test a task instance. This will run a task withoutchecking for dependencies or recording it's state inthe database. webserver Start a Airflow webserver instance resetdb Burn down and rebuild the metadata database upgradedb Upgrade the metadata database to latest version scheduler Start a scheduler instance worker Start a Celery worker node flower Start a Celery Flower version Show the version connections List/Add/Delete connections其中,使用較多的是backfill、run、test、webserver、scheduler。其他操作在web界面操作更方便。另外,initdb 用于初始化metadata,使用一次即可;resetdb會重置metadata,清除掉數(shù)據(jù)(如connection數(shù)據(jù)), 需要慎用。
4. 問題
在使用airflow過程中,曾把DAGs里的task拆分得很細,這樣的話,如果某個task失敗,重跑的代價會比較低。但是,在實踐中發(fā)現(xiàn),tasks太多時,airflow在調(diào)度tasks會很低效,airflow一直處于選擇待執(zhí)行的task的過程中,會長時間沒有具體task在執(zhí)行,從而整體執(zhí)行效率大幅降低。
5. 總結(jié)
airflow 很好很強大。如果只是簡單的ETL之類的工作,可以很容易的編排。調(diào)度靈活,而且監(jiān)控和報警系統(tǒng)完備,可以很方便的投入生產(chǎn)環(huán)節(jié)。
6. 參閱
airflow 官網(wǎng)
github
總結(jié)
以上是生活随笔為你收集整理的十八、可视化任务调度系统airflow的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机汉字编码输入码,汉字编码、输入系统
- 下一篇: 【时序】M4竞赛冠军方案:一种用于时间序