AirFlow官方入门DAG示例
經(jīng)過前兩篇文章的簡單介紹之后,我們安裝了自己的AirFlow以及簡單了解了DAG的定義文件.現(xiàn)在我們要實現(xiàn)自己的一個DAG.
1. 啟動Web服務(wù)器
使用如下命令啟用:
airflow webserver現(xiàn)在可以通過將瀏覽器導(dǎo)航到啟動Airflow的主機上的8080端口來訪問Airflow UI,例如:http://localhost:8080/admin/
備注
Airflow附帶了許多示例DAG。 請注意,在你自己的`dags_folder`中至少有一個DAG定義文件之前,這些示例可能無法正常工作。你可以通過更改`airflow.cfg`中的`load_examples`設(shè)置來隱藏示例DAG。2. 第一個AirFlow DAG
現(xiàn)在一切都準(zhǔn)備好了,我們開始寫一些代碼,來實現(xiàn)我們的第一個DAG。 我們將首先創(chuàng)建一個Hello World工作流程,其中除了向日志發(fā)送"Hello world!"之外什么都不做。
創(chuàng)建你的dags_folder,那就是你的DAG定義文件存儲目錄---$AIRFLOW_HOME/dags。在該目錄中創(chuàng)建一個名為hello_world.py的文件。
AIRFLOW_HOME ├── airflow.cfg ├── airflow.db ├── airflow-webserver.pid ├── dags │ ├── hello_world.py │ └── hello_world.pyc └── unittests.cfg將以下代碼添加到dags/hello_world.py中:
# -*- coding: utf-8 -*-import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import timedelta#------------------------------------------------------------------------------- # these args will get passed on to each operator # you can override them on a per-task basis during operator initializationdefault_args = {'owner': 'jifeng.si','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['1203745031@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5) }#------------------------------------------------------------------------------- # dagdag = DAG('example_hello_world_dag',default_args=default_args,description='my first DAG',schedule_interval=timedelta(days=1))#------------------------------------------------------------------------------- # first operatordate_operator = BashOperator(task_id='date_task',bash_command='date',dag=dag)#------------------------------------------------------------------------------- # second operatorsleep_operator = BashOperator(task_id='sleep_task',depends_on_past=False,bash_command='sleep 5',dag=dag)#------------------------------------------------------------------------------- # third operatordef print_hello():return 'Hello world!'hello_operator = PythonOperator(task_id='hello_task',python_callable=print_hello,dag=dag)#------------------------------------------------------------------------------- # dependenciessleep_operator.set_upstream(date_operator) hello_operator.set_upstream(date_operator)該文件創(chuàng)建一個簡單的DAG,只有三個運算符,兩個BaseOperator(一個打印日期一個休眠5秒),另一個為PythonOperator在執(zhí)行任務(wù)時調(diào)用print_hello函數(shù)。
3. 測試代碼
使用如下命令測試一下我們寫的代碼的正確性
python ~/opt/airflow/dags/hello_world.py如果你的腳本沒有拋出異常,這意味著你代碼中沒有錯誤,并且你的Airflow環(huán)境是健全的。
下面測試一下我們的DAG中的Task.使用如下命令查看我們example_hello_world_dagDAG下有什么Task:
xiaosi@yoona:~$ airflow list_tasks example_hello_world_dag可以看到我們有三個Task:
date_task hello_task sleep_task下面分別測試一下這幾個Task:
(1) 測試date_task
xiaosi@yoona:~$ airflow test example_hello_world_dag date_task 20170803(2) 測試hello_task
xiaosi@yoona:~$ airflow test example_hello_world_dag hello_task 20170803如果沒有問題,我們就可以運行我們的DAG了.
4. 運行DAG
為了運行你的DAG,打開另一個終端,并通過如下命令來啟動Airflow調(diào)度程序:
airflow scheduler備注
調(diào)度程序?qū)l(fā)送任務(wù)進行執(zhí)行。默認(rèn)Airflow設(shè)置依賴于一個名為`SequentialExecutor`的執(zhí)行器,它由調(diào)度程序自動啟動。在生產(chǎn)中,你可以使用更強大的執(zhí)行器,如`CeleryExecutor`。當(dāng)你在瀏覽器中重新加載Airflow UI時,應(yīng)該會在Airflow UI中看到你的hello_world?DAG。
為了啟動DAG Run,首先打開工作流(off鍵),然后單擊Trigger Dag按鈕(Links 第一個按鈕),最后單擊Graph View按鈕(Links 第三個按鈕)以查看運行進度:
你可以重新加載圖形視圖,直到兩個任務(wù)達(dá)到狀態(tài)成功。完成后,你可以單擊hello_task,然后單擊View Log查看日志。如果一切都按預(yù)期工作,日志應(yīng)該顯示一些行,其中之一是這樣的:
[2017-08-03 09:46:43,236] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,235] {python_operator.py:81} INFO - Done. Returned value was: Hello world![2017-08-03 09:46:47,378] {jobs.py:2083} INFO - Task exited with return code 0更多多資訊或疑問內(nèi)容請關(guān)注?微信公眾號 “讓夢飛起來”?或添加小編微信,?后臺回復(fù) “Python” ,領(lǐng)取更多資料哦
? ?? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ??
總結(jié)
以上是生活随笔為你收集整理的AirFlow官方入门DAG示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python新建txt文件,并逐行写入数
- 下一篇: yum搭建lnmp的最简单方法