ジョブ管理ツールである Airflow を試したかったのですが、以前に構築した環境とは別に、まっさらな状態で使いたかったので、
Dockerを使って環境構築をしてみました。
以前 Airflow を使った内容は下記にまとめてあります。
(ローカルに直接 Airflow をインストールしています)
*参考
*環境
- MacOS
- Python 2.7.14
- Docker 18.06.0-ce
- Airflow 1.8.0
*docker-compose をダウンロード
下記URLにある docker-airflow のリポジトリからdocker-compose-CeleryExecutor.yml
をダウンロードして、ローカルの任意のディレクトリに配置します。version: '2.1'
services:
redis:
image: 'redis:3.2.7'
# command: redis-server --requirepass redispass
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
# Uncomment these lines to persist data on the local filesystem.
# - PGDATA=/var/lib/postgresql/data/pgdata
# volumes:
# - ./pgdata:/var/lib/postgresql/data/pgdata
webserver:
image: puckel/docker-airflow:1.10.0-4
restart: always
depends_on:
- postgres
- redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
flower:
image: puckel/docker-airflow:1.10.0-4
restart: always
depends_on:
- redis
environment:
- EXECUTOR=Celery
# - REDIS_PASSWORD=redispass
ports:
- "5555:5555"
command: flower
scheduler:
image: puckel/docker-airflow:1.10.0-4
restart: always
depends_on:
- webserver
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
command: scheduler
worker:
image: puckel/docker-airflow:1.10.0-4
restart: always
depends_on:
- scheduler
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
environment:
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
command: worker
*Docker を起動
ダウンロードした yaml ファイルの任意のディレクトリに配置したら、そのディレクトリで下記コマンドを実行します。$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
しばらく時間がかかりますが、
done
と表示されれば完了です。...
Status: Downloaded newer image for puckel/docker-airflow:1.10.0-4
Creating airflow_postgres_1 ... done
Creating airflow_redis_1 ... done
Creating airflow_flower_1 ... done
Creating airflow_webserver_1 ... done
Creating airflow_scheduler_1 ... done
Creating airflow_worker_1 ... done
下記コマンドを実行して 6個のコンテナが起動していることを確認します。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7504be202f53 puckel/docker-airflow:1.10.0-4 "/entrypoint.sh work…" 22 seconds ago Up 21 seconds 5555/tcp, 8080/tcp, 8793/tcp airflow_worker_1
04d62b99498d puckel/docker-airflow:1.10.0-4 "/entrypoint.sh sche…" 23 seconds ago Up 22 seconds 5555/tcp, 8080/tcp, 8793/tcp airflow_scheduler_1
03ffe586438d puckel/docker-airflow:1.10.0-4 "/entrypoint.sh webs…" 24 seconds ago Up 22 seconds (health: starting) 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp airflow_webserver_1
f0df99f82a6b puckel/docker-airflow:1.10.0-4 "/entrypoint.sh flow…" 24 seconds ago Up 22 seconds 8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp airflow_flower_1
69f9e927018d postgres:9.6 "docker-entrypoint.s…" 25 seconds ago Up 23 seconds 5432/tcp airflow_postgres_1
767722108d21 redis:3.2.7 "docker-entrypoint.s…" 25 seconds ago Up 23 seconds 6379/tcp airflow_redis_1
コンテナの起動と同時に Airflow も起動されているので、下記にアクセスすると Airflow の画面が表示されます。(まだ何もジョブが登録されていない状態です)
http://localhost:8080
*DAG を作成
DAG(Directed acyclic graph)とはグラフ理論における閉路のない有向グラフのことです。AirflowではこのDAGによって依存がないタスクが先に実行され、依存関係が解消されたタスクが順に実行されます。Airflow を起動すると、ディレクトリに
dags
のフォルダが作成されるので、この配下に Python でDAGの定義ファイルを作成します。今回は、
hello.py
を作成しました。from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
"""
DAGを定義
+ 実行開始日やリトライ回数を指定
"""
default_args = {
'owner': 'root',
'depends_on_past': False,
'start_date': datetime(2018, 11, 3, 20, 0, 0),
'schedule_interval': timedelta(days=1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
"""
DAGのインスタンスを作成
"""
dag = DAG('first_dag', default_args=default_args)
"""
DAGに紐づくタスクを作成
+ BashOperatorでシェルを実行
"""
t1 = BashOperator(
task_id='t1',
bash_command='echo t1',
dag=dag)
t2 = BashOperator(
task_id='t2',
bash_command='echo t2',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='t3',
bash_command='echo "{{ params.message }}"',
params={'message': 'Hello AirFlow!!'},
dag=dag)
t4 = BashOperator(
task_id='t4',
bash_command='echo t4',
dag=dag)
"""
依存関係を定義
+ set_upstreamで待つタスクを指定
"""
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
*DAGを実行
Airflowの画面にアクセスすると、作成したDAGが表示されます。http://localhost:8080
Tree View で依存関係を確認することができます。
Graph View で依存関係をグラフで確認することができます。
スケジュール実行させたい場合は、DAG一覧に表示されているDAG名の左にあるボタンをクリックしてONにしておきます。
*Dockerコンテナの停止
Airflow の確認が終わったら下記コマンドでDockerコンテナを停止しておきます。$ docker-compose -f docker-compose-CeleryExecutor.yml down
*所感
Airflow 用のDockerが用意されていたので、簡単に環境を構築することができて便利でした。今回は簡単な定義ファイルの作成や動作確認しかしていませんが、触ってもっと詳しく調べて使いこなせるようにしたいと思います。