ジョブを定期的に実行させるためには、cronだったりjenkinsがあるかと思いますが、
Airflowというものでジョブ管理ができることを知ったので、早速試してみました。
今回はAirflowの特徴についての調査と、簡単な操作をしてみました。
Airflowとは
Airbnbが提供しているOSSのタスクスケジューリング・モニタリングのフレームワークです。
バッチ処理で障害が発生した場合のリトライ処理やエラー通知、ログの収集といったことを、Airflowがしてくれます。
タスクの終了を待ってから次のタスクを実行するといった、タスクの依存関係をDAG(有向非巡回グラフ)を使って実現しています。
- DAG(Directed acyclic graph)
グラフ理論における閉路のない有向グラフのことです。
ある頂点から出発し、辺をたどって出発した頂点に戻ってこないものを有向非巡回グラフといいます。
AirflowではこのDAGによって、依存がないタスクが先に実行され、依存関係が解消されたタスクが順に実行されます。
タスクの依存関係の定義とスケジューリングをPythonプログラムで書くため、
スケジューリングの自由度が増すといったメリットがあります。
書いた依存関係に基づいてワークフローを動的に生成してくれます。
BigQueryやDataFlowといったGCPのコンポーネントが充実しているため、
簡単にデータベースを操作することができ、少ないコードで大量のETL処理を実行することができます。
つまり、特徴としては以下が挙げられます。
スケジュールとモニタリングに特化
タスクの依存関係をPythonで定義
依存関係に基づいたワークフローを動的に作成
未実行のタスクのみ実行するスケジューリングが可能
タスク毎の実行時間やエラーログが詳細に表示
参考
公式ドキュメント
Apache Airflow (incubating) Documentation — Airflow Documentation
自分の環境
Airflowのインストール
公式ドキュメントのクイックスタートに記載されている通りにコマンドを実行します。
Airflowをインストールして起動します。
$ export AIRFLOW_HOME=~/airflow
$ pip install airflow
$ airflow initdb
$ airflow webserver -p 8080
下記にアクセスするとAirflowが起動されていることを確認できます。
デフォルトの画面は依存関係の定義一覧が表示されています。
処理結果ツリー
依存関係の定義一覧からDAG
列のどれかを選択すると、「DAG:」の画面に遷移します。(今回はtutorial
を選択)
遷移後にデフォルトで表示されているTree View
では、どのようなワークフローになっているか確認することができます。
定義の確認
「DAG:」の画面でCode
のタブをクリックすると、Pythonのコードが表示されます。
ここで初期パラメータ、DAG、タスクの定義などを確認することができます。
# 初期パラメータの定義
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAGの定義
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
# タスクの定義
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
DAGの詳細確認
「DAG:」の画面でDetail
タブをクリックすると、DAGの詳細を確認することができます。
タスクの実行
tutorial.py が正しく実行できるか、ローカルで確認してみます。
$ python anaconda3/lib/python3.6/site-packages/airflow/example_dags/tutorial.py
コードが正しく実行できたら、タスクの手動実行テストをしてみます。
ターミナルでairflow test
コマンドを実行します。
$ airflow test {dag_id} {task_id} {実行日}
# 実際に実行したコマンド
$ airflow test tutorial sleep 2018-01-28
正常に実行できたら、画面から実行してみます。
依存関係の定義一覧からtutorialの実行ボタンを押します。
実行ボタンを押した後、Last Run
の列に日時が入るので、これをクリックします。
処理結果が表示されます。
補足
実行ボタンの右にある記号をクリックすると、
「DAG:」画面のタブの記号に紐づいた画面に遷移します。
ログの画面です。
実行したタスクです。
使ってみた感想
今までの職場ではこのようにジョブを可視化していませんでしたが、
ジョブの依存関係がフローで見れると非常にわかりやすいと感じました。
また、インストール方法や操作が簡単だったことも魅力でした。
今回使ってはいないですが、GCPやDBのコンポーネントが充実していて便利そうだったので、
業務で使うと便利なのではないかと思いました。
機会があったら業務でも使ってみたいと思います。