使用AirFlow调度MaxCompute
簡介:?airflow是Airbnb開源的一個用python編寫的調度工具,基于有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行,通過python代碼定義子任務,并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調度MaxCompute 任務
背景
airflow是Airbnb開源的一個用python編寫的調度工具,基于有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行,通過python代碼定義子任務,并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調度MaxCompute 任務
一、環境準備
- Python 2.7.5 ?PyODPS支持Python2.6以上版本
- Airflow apache-airflow-1.10.7
1.安裝MaxCompute需要的包
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 ?# 可選,安裝后能加速Tunnel上傳。
pip install cython>=0.19.0 ?# 可選,不建議Windows用戶安裝。
pip install pyodps
注意:如果requests包沖突,先卸載再安裝對應的版本
2.執行如下命令檢查安裝是否成功
python -c "from odps import ODPS"
二、開發步驟
1.在Airflow家目錄編寫python調度腳本Airiflow_MC.py
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#修改系統默認編碼。
# MaxCompute參數設置
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
? ?'owner': 'airflow',
? ?'depends_on_past': False,
? ?'retry_delay': timedelta(minutes=5),
? ?'start_date':datetime(2020,1,15)
? ?# 'email': ['airflow@example.com'],
? ?# 'email_on_failure': False,
? ?# 'email_on_retry': False,
? ?# 'retries': 1,
? ?# 'queue': 'bash_queue',
? ?# 'pool': 'backfill',
? ?# 'priority_weight': 10,
? ?# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
? ?'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
? ?with io.open(sqlfile, encoding='utf-8', mode='r') as f:
? ? ? ?sql=f.read()
? ?f.closed
? ?return sql
def get_time():
? ?print '當前時間是{}'.format(time.time())
? ?return time.time()
def mc_job ():
? ?project = odps.get_project() ?# 取到默認項目。
? ?instance=odps.run_sql("select * from long_chinese;")
? ?print(instance.get_logview_address())
? ?instance.wait_for_success()
? ?with instance.open_reader() as reader:
? ? ? ?count = reader.count
? ?print("查詢表數據條數:{}".format(count))
? ?for record in reader:
? ? ? ?print record
? ?return count
t1 = PythonOperator (
? ?task_id = 'get_time' ,
? ?provide_context = False ,
? ?python_callable = get_time,
? ?dag = dag )
t2 = PythonOperator (
? ?task_id = 'mc_job' ,
? ?provide_context = False ,
? ?python_callable = mc_job ,
? ?dag = dag )
t2.set_upstream(t1)
2.提交
python Airiflow_MC.py
3.進行測試
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks Airiflow_MC
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks Airiflow_MC --tree
#測試task
airflow test Airiflow_MC get_time 2010-01-16
airflow test Airiflow_MC mc_job 2010-01-16
4.運行調度任務
登錄到web界面點擊按鈕運行
5.查看任務運行結果
1.點擊view log
2.查看結果
?原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的使用AirFlow调度MaxCompute的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于质量标准化的思考和实践
- 下一篇: 阿里云RDS深度定制-XA Crash