大数据调度平台Airflow(五):Airflow使用
目錄
Airflow使用
一、Airflow調度Shell命令
1.首先我們需要創(chuàng)建一個python文件,導入需要的類庫
2.實例化DAG
3、定義Task
4、設置task依賴關系
5、上傳python配置腳本
6、重啟Airflow
7、執(zhí)行airflow
二、DAG調度觸發(fā)時間
三、DAG catchup 參數(shù)設置
四、DAG調度周期設置
五、DAG任務依賴設置
1、DAG任務依賴設置一
2、???????DAG任務依賴設置二
3、???????DAG任務依賴設置三
4、???????DAG任務依賴設置四
5、???????DAG任務依賴設置五
Airflow使用
上文說到使用Airflow進行任務調度大體步驟如下:
- 創(chuàng)建python文件,根據(jù)實際需要,使用不同的Operator
- 在python文件不同的Operator中傳入具體參數(shù),定義一系列task
- 在python文件中定義Task之間的關系,形成DAG
- 將python文件上傳執(zhí)行,調度DAG,每個task會形成一個Instance
- 使用命令行或者WEBUI進行查看和管理
以上python文件就是Airflow python腳本,使用代碼方式指定DAG的結構
一、Airflow調度Shell命令
下面我們以調度執(zhí)行shell命令為例,來講解Airflow使用。
1.首先我們需要創(chuàng)建一個python文件,導入需要的類庫
# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執(zhí)行流程
from airflow.operators.bash import BashOperator
注意:以上代碼可以在開發(fā)工具中創(chuàng)建,但是需要在使用的python3.7環(huán)境中導入安裝Airflow包。
D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
2.實例化DAG
from datetime import datetime, timedelta# default_args中定義一些參數(shù),在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)
注意:
- 實例化DAG有三種方式
第一種方式:
with DAG("my_dag_name") as dag:op=XXOperator(task_id="task")
第二種方式(以上采用這種方式):
my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)
第三種方式:
@dag(start_date=days_ago(2))
def generate_dag():op = XXOperator(task_id="task")
dag = generate_dag()
- baseoperator基礎參數(shù)說明:
可以參照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多參數(shù)。
- DAG參數(shù)說明
可以參照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html
查看DAG參數(shù)說明,也可以直接在開發(fā)工具點擊DAG進入源碼看下對應參數(shù)有哪些。
3、定義Task
當實例化Operator時會生成Task任務,從一個Operator中實例化出來對象的過程被稱為一個構造方法,每個構造方法中都有“task_id”充當任務的唯一標識符。
下面我們定義三個Operator,也就是三個Task,每個task_id 不能重復。
# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)
注意:
- 每個operator中可以傳入對應的參數(shù),覆蓋DAG默認的參數(shù),例如:last task中“retries”=3 就替代了默認的1。任務參數(shù)的優(yōu)先規(guī)則如下:①.顯示傳遞的參數(shù) ②.default_args字典中存在的值③.operator的默認值(如果存在)。
- BashOperator使用方式參照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator
4、設置task依賴關系
#使用 set_upstream、set_downstream 設置依賴關系,不能出現(xiàn)環(huán)形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執(zhí)行完成之后執(zhí)行
# last.set_upstream(middle) # last 會在 middle執(zhí)行完成之后執(zhí)行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執(zhí)行,middle次之,last最后
# first >> [middle,last] # first首先執(zhí)行,middle ,last并行執(zhí)行
注意:當執(zhí)行腳本時,如果在DAG中找到一條環(huán)形鏈路(例如:A->B->C-A)會引發(fā)異常。更多DAG task依賴關系可參照官網(wǎng):http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies?
5、上傳python配置腳本
到目前為止,python配置如下:
# 導入 DAG 對象,后面需要實例化DAG對象
from airflow import DAG# 導入BashOperator Operators,我們需要利用這個對象去執(zhí)行流程
from airflow.example_dags.example_bash_operator import dagfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args中定義一些參數(shù),在實例化DAG時可以使用,使用python dic 格式定義
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)# operator 支持多種類型, 這里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)#使用 set_upstream、set_downstream 設置依賴關系,不能出現(xiàn)環(huán)形鏈路,否則報錯
# middle.set_upstream(first) # middle會在first執(zhí)行完成之后執(zhí)行
# last.set_upstream(middle) # last 會在 middle執(zhí)行完成之后執(zhí)行#也可以使用位移符來設置依賴關系
first >> middle >>last # first 首先執(zhí)行,middle次之,last最后
# first >> [middle,last] # first首先執(zhí)行,middle ,last并行執(zhí)行
?將以上python配置文件上傳到$AIRFLOW_HOME/dags目錄下,默認$AIRFLOW_HOME為安裝節(jié)點的“/root/airflow”目錄,當前目錄下的dags目錄需要手動創(chuàng)建。
6、重啟Airflow
“ps aux|grep webserver”和“ps aux|grep scheduler”找到對應的airflow進程殺掉,重新啟動Airflow。重啟之后,可以在airflow webui看到對應的DAG ID ”myairflow_execute_bash”。
7、執(zhí)行airflow
按照如下步驟執(zhí)行DAG,首先打開工作流,然后“Trigger DAG”執(zhí)行,隨后可以看到任務執(zhí)行成功。
查看task執(zhí)行日志:
二、DAG調度觸發(fā)時間
在Airflow中,調度程序會根據(jù)DAG文件中指定的“start_date”和“schedule_interval”來運行DAG。特別需要注意的是Airflow計劃程序在計劃時間段的末尾觸發(fā)執(zhí)行DAG,而不是在開始時刻觸發(fā)DAG,例如:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2022, 3, 25), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(days=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)
以上配置的DAG是從世界標準時間2022年3月24號開始調度,每隔1天執(zhí)行一次,這個DAG的具體運行時間如下圖:?
| 自動調度DAG 執(zhí)行日期 | 自動調度DAG實際執(zhí)行觸發(fā)時間 |
| 2022-03-24,00:00:00+00:00 | 2022-03-25,00:00:00+00:00 |
| 2022-03-25,00:00:00+00:00 | 2022-03-26,00:00:00+00:00 |
| 2022-03-26,00:00:00+00:00 | 2022-03-27,00:00:00+00:00 |
| 2022-03-27,00:00:00+00:00 | 2022-03-28,00:00:00+00:00 |
| 2022-03-28,00:00:00+00:00 | 2022-03-29,00:00:00+00:00 |
| ... ... | ... ... |
以上表格中以第一條數(shù)據(jù)為例解釋,Airflow正常調度是每天00:00:00 ,假設當天日期為2022-03-24,正常我們認為只要時間到了2022-03-24 00:00:00 就會執(zhí)行,改調度時間所處于的調度周期為2022-03-24 00:00:00 ~ 2022-03-25 00:00:00 ,在Airflow中實際上是在調度周期末端觸發(fā)執(zhí)行,也就是說2022-03-24 00:00:00 自動觸發(fā)執(zhí)行時刻為 2022-03-25 00:00:00。?
如下圖,在airflow中,“execution_date”不是實際運行時間,而是其計劃周期的開始時間戳。例如:execution_date 是2021-09-04 00:00:00 的DAG 自動調度運行的實際時間為2021-09-05 00:00:00。當然除了自動調度外,我們還可以手動觸發(fā)執(zhí)行DAG執(zhí)行,要判斷DAG運行時計劃調度(自動調度)還是手動觸發(fā),可以查看“Run Type”。
三、DAG catchup 參數(shù)設置
在Airflow的工作計劃中,一個重要的概念就是catchup(追趕),在實現(xiàn)DAG具體邏輯后,如果將catchup設置為True(默認就為True),Airflow將“回填”所有過去的DAG run,如果將catchup設置為False,Airflow將從最新的DAG run時刻前一時刻開始執(zhí)行 DAG run,忽略之前所有的記錄。
例如:現(xiàn)在某個DAG每隔1分鐘執(zhí)行一次,調度開始時間為2001-01-01 ,當前日期為2021-10-01 15:23:21,如果catchup設置為True,那么DAG將從2001-01-01 00:00:00 開始每分鐘都會運行當前DAG。如果catchup 設置為False,那么DAG將從2021-10-01 15:22:20(當前2021-10-01 15:23:21前一時刻)開始執(zhí)行DAG run。
舉例:有first ,second,third三個shell命令任務,按照順序調度,每隔1分鐘執(zhí)行一次,首次執(zhí)行時間為2000-01-01。
設置catchup 為True(默認),DAG python配置如下:
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test1 ', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=True # 執(zhí)行DAG時,將開始時間到目前所有該執(zhí)行的任務都執(zhí)行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last
上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執(zhí)行調度如下:
設置catchup 為False,DAG python配置如下:
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2001, 1, 1), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}
dag = DAG(dag_id = 'catchup_test2', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1), # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒catchup=False # 執(zhí)行DAG時,將開始時間到目前所有該執(zhí)行的任務都執(zhí)行,默認為True
)first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)
middle = BashOperator(task_id='second',bash_command='echo "run second task"',dag=dag
)
last = BashOperator(task_id='third',bash_command='echo "run third task"',dag=dag,retries=3
)
first >> middle >>last
上傳python配置文件到$AIRFLOW_HOME/dags下,重啟airflow,DAG執(zhí)行調度如下:
有兩種方式在Airflow中配置catchup:
- 全局配置
在airflow配置文件airflow.cfg的scheduler部分下,設置catchup_by_default=True(默認)或False,這個設置是全局性的設置。
- DAG文件配置
在python代碼配置中設置DAG對象的參數(shù):dag.catchup=True或False。
dag = DAG(dag_id = 'myairflow_execute_bash',
default_args = default_args,
catchup=False,schedule_interval = timedelta(days=1))
四、DAG調度周期設置
每個DAG可以有或者沒有調度執(zhí)行周期,如果有調度周期,我們可以在python代碼DAG配置中設置“schedule_interval”參數(shù)來指定調度DAG周期,可以通過以下三種方式來設置。
- 預置的Cron調度
Airflow預置了一些Cron調度周期,可以參照:
DAG Runs — Airflow Documentation,如下圖:
?
在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = '@daily' # 使用預置的Cron調度,每天0點0分調度
- Cron
這種方式就是寫Linux系統(tǒng)的crontab定時任務命令,可以在https://crontab.guru/網(wǎng)站先生成對應的定時調度命令,其格式如下:
minute hour day month week
minute:表示分鐘,可以從0~59之間的任意整數(shù)。
hour:表示小時,可以是從0到23之間的任意整數(shù)。
day:表示日期,可以是1到31之間的任何整數(shù)。
month:表示月份,可以是從1到12之間的任何整數(shù)。
week:表示星期幾,可以是從0到7之間的任何整數(shù),這里的0或7代表星期日。
以上各個字段中還可以使用特殊符號代表不同意思:
星號(*):代表所有可能的值,例如month字段如果是星號,則表示在滿足其它字段的制約條件后每月都執(zhí)行該命令操作。
逗號(,):可以用逗號隔開的值指定一個列表范圍,例如,”1,2,5,7,8,9”
中杠(-):可以用整數(shù)之間的中杠表示一個整數(shù)范圍,例如”2-6”表示”2,3,4,5,6”
正斜線(/):可以用正斜線指定時間的間隔頻率,步長,例如”0-23/2”表示每兩小時執(zhí)行一次。
在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = '* * * * *' # 使用Crontab 定時任務命令,每分鐘運行一次
)
- datetime.timedelta
timedelta是使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒。在python配置文件中使用如下:
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 4), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'cron_test', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=5) # 使用python timedelta 設置調度周期,可以配置天、周、小時、分鐘、秒、毫秒
)
五、???????DAG任務依賴設置
1、???????DAG任務依賴設置一
- DAG調度流程圖
- task執(zhí)行依賴
A >> B >>C
- 完整代碼
'''
airflow 任務依賴關系設置一'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_1', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)A >> B >>C
2、???????DAG任務依賴設置二
- DAG調度流程圖
- task執(zhí)行依賴
[A,B] >>C >>D
- 完整代碼
'''
airflow 任務依賴關系設置二'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_2', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)[A,B] >>C >>D
3、???????DAG任務依賴設置三
- DAG調度流程圖
- task執(zhí)行依賴
[A,B,C] >>D >>[E,F]
- 完整代碼
'''
airflow 任務依賴關系設置三'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_3', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)[A,B,C] >>D >>[E,F]
4、???????DAG任務依賴設置四
- DAG調度流程圖
?
- task執(zhí)行依賴
A >>B>>C>>D
A >>E>>F
- 完整代碼
'''
airflow 任務依賴關系設置四'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_4', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)A >>[B,C,D]
A >>[E,F]
5、???????DAG任務依賴設置五
- DAG調度流程圖
?
- task執(zhí)行依賴
A >>B>>E
C >>D>>E
- 完整代碼
'''
airflow 任務依賴關系設置五'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 擁有者名稱'start_date': datetime(2021, 9, 22), # 第一次開始執(zhí)行的時間,為 UTC 時間'retries': 1, # 失敗重試次數(shù)'retry_delay': timedelta(minutes=5), # 失敗重試間隔
}dag = DAG(dag_id = 'dag_relation_5', #DAG id ,必須完全由字母、數(shù)字、下劃線組成default_args = default_args, #外部定義的 dic 格式的參數(shù)schedule_interval = timedelta(minutes=1) # 定義DAG運行的頻率,可以配置天、周、小時、分鐘、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)A >>B>>E
C >>D>>E
- 📢博客主頁:https://lansonli.blog.csdn.net
- 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創(chuàng),首發(fā)于 CSDN博客🙉
- 📢大數(shù)據(jù)系列文章會每天更新,停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活??
總結
以上是生活随笔為你收集整理的大数据调度平台Airflow(五):Airflow使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大数据调度平台Airflow(三):Ai
- 下一篇: 大数据调度平台Airflow(八):Ai