ジョブ管理ツール Airflowを使ってみました

ジョブを定期的に実行させるためには、cronだったりjenkinsがあるかと思いますが、
Airflowというものでジョブ管理ができることを知ったので、早速試してみました。

今回はAirflowの特徴についての調査と、簡単な操作をしてみました。






Airflowとは



Airbnbが提供しているOSSのタスクスケジューリング・モニタリングのフレームワークです。
バッチ処理で障害が発生した場合のリトライ処理やエラー通知、ログの収集といったことを、Airflowがしてくれます。
タスクの終了を待ってから次のタスクを実行するといった、タスクの依存関係をDAG(有向非巡回グラフ)を使って実現しています。




  • DAG(Directed acyclic graph)




グラフ理論における閉路のない有向グラフのことです。
ある頂点から出発し、辺をたどって出発した頂点に戻ってこないものを有向非巡回グラフといいます。

AirflowではこのDAGによって、依存がないタスクが先に実行され、依存関係が解消されたタスクが順に実行されます。





タスクの依存関係の定義とスケジューリングをPythonプログラムで書くため、
スケジューリングの自由度が増すといったメリットがあります。
書いた依存関係に基づいてワークフローを動的に生成してくれます。





BigQueryやDataFlowといったGCPコンポーネントが充実しているため、
簡単にデータベースを操作することができ、少ないコードで大量のETL処理を実行することができます。





つまり、特徴としては以下が挙げられます。




  • スケジュールとモニタリングに特化


  • タスクの依存関係をPythonで定義


  • 依存関係に基づいたワークフローを動的に作成


  • 未実行のタスクのみ実行するスケジューリングが可能


  • タスク毎の実行時間やエラーログが詳細に表示


  • DBやGCPコンポーネントが充実








参考



公式ドキュメント



Apache Airflow (incubating) Documentation — Airflow Documentation













自分の環境









Airflowのインストール



公式ドキュメントのクイックスタートに記載されている通りにコマンドを実行します。
Airflowをインストールして起動します。




$ export AIRFLOW_HOME=~/airflow

$ pip install airflow

$ airflow initdb

$ airflow webserver -p 8080





下記にアクセスするとAirflowが起動されていることを確認できます。

デフォルトの画面は依存関係の定義一覧が表示されています。



http://localhost:8080/admin/






f:id:mtomitomi:20180128193455p:plain






処理結果ツリー



依存関係の定義一覧からDAG列のどれかを選択すると、「DAG:」の画面に遷移します。(今回はtutorialを選択)

遷移後にデフォルトで表示されているTree Viewでは、どのようなワークフローになっているか確認することができます。



f:id:mtomitomi:20180128203915p:plain






定義の確認



「DAG:」の画面でCodeのタブをクリックすると、Pythonのコードが表示されます。

ここで初期パラメータ、DAG、タスクの定義などを確認することができます。




# 初期パラメータの定義
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# DAGの定義
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))

# タスクの定義
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)






DAGの詳細確認



「DAG:」の画面でDetailタブをクリックすると、DAGの詳細を確認することができます。



f:id:mtomitomi:20180128204846p:plain






タスクの実行



tutorial.py が正しく実行できるか、ローカルで確認してみます。




$ python anaconda3/lib/python3.6/site-packages/airflow/example_dags/tutorial.py





コードが正しく実行できたら、タスクの手動実行テストをしてみます。

ターミナルでairflow testコマンドを実行します。




$ airflow test {dag_id} {task_id} {実行日}

# 実際に実行したコマンド
$ airflow test tutorial sleep 2018-01-28





正常に実行できたら、画面から実行してみます。

依存関係の定義一覧からtutorialの実行ボタンを押します。

実行ボタンを押した後、Last Runの列に日時が入るので、これをクリックします。





f:id:mtomitomi:20180128210718p:plain





処理結果が表示されます。



f:id:mtomitomi:20180128210542p:plain






補足



実行ボタンの右にある記号をクリックすると、
「DAG:」画面のタブの記号に紐づいた画面に遷移します。



f:id:mtomitomi:20180128223116p:plain






f:id:mtomitomi:20180128225121p:plain







ログの画面です。



f:id:mtomitomi:20180128223152p:plain





実行したタスクです。
f:id:mtomitomi:20180128223308p:plain






使ってみた感想



今までの職場ではこのようにジョブを可視化していませんでしたが、
ジョブの依存関係がフローで見れると非常にわかりやすいと感じました。
また、インストール方法や操作が簡単だったことも魅力でした。

今回使ってはいないですが、GCPやDBのコンポーネントが充実していて便利そうだったので、
業務で使うと便利なのではないかと思いました。

機会があったら業務でも使ってみたいと思います。