Docker環境でAirflowを使ってみました




ジョブ管理ツールである Airflow を試したかったのですが、以前に構築した環境とは別に、まっさらな状態で使いたかったので、
Dockerを使って環境構築をしてみました。

以前 Airflow を使った内容は下記にまとめてあります。
(ローカルに直接 Airflow をインストールしています)


*参考



*環境

  • MacOS
  • Python 2.7.14
  • Docker 18.06.0-ce
  • Airflow 1.8.0


*docker-compose をダウンロード

下記URLにある docker-airflow のリポジトリからdocker-compose-CeleryExecutor.ymlをダウンロードして、ローカルの任意のディレクトリに配置します。
version: '2.1'
services:
    redis:
        image: 'redis:3.2.7'
        # command: redis-server --requirepass redispass

    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
        # Uncomment these lines to persist data on the local filesystem.
        #     - PGDATA=/var/lib/postgresql/data/pgdata
        # volumes:
        #     - ./pgdata:/var/lib/postgresql/data/pgdata

    webserver:
        image: puckel/docker-airflow:1.10.0-4
        restart: always
        depends_on:
            - postgres
            - redis
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

    flower:
        image: puckel/docker-airflow:1.10.0-4
        restart: always
        depends_on:
            - redis
        environment:
            - EXECUTOR=Celery
            # - REDIS_PASSWORD=redispass
        ports:
            - "5555:5555"
        command: flower

    scheduler:
        image: puckel/docker-airflow:1.10.0-4
        restart: always
        depends_on:
            - webserver
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        command: scheduler

    worker:
        image: puckel/docker-airflow:1.10.0-4
        restart: always
        depends_on:
            - scheduler
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        environment:
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        command: worker


*Docker を起動

ダウンロードした yaml ファイルの任意のディレクトリに配置したら、そのディレクトリで下記コマンドを実行します。
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d

しばらく時間がかかりますが、doneと表示されれば完了です。
...
Status: Downloaded newer image for puckel/docker-airflow:1.10.0-4
Creating airflow_postgres_1 ... done
Creating airflow_redis_1    ... done
Creating airflow_flower_1    ... done
Creating airflow_webserver_1 ... done
Creating airflow_scheduler_1 ... done
Creating airflow_worker_1    ... done

下記コマンドを実行して 6個のコンテナが起動していることを確認します。
$ docker ps

CONTAINER ID        IMAGE                            COMMAND                  CREATED             STATUS                             PORTS                                        NAMES
7504be202f53        puckel/docker-airflow:1.10.0-4   "/entrypoint.sh work…"   22 seconds ago      Up 21 seconds                      5555/tcp, 8080/tcp, 8793/tcp                 airflow_worker_1
04d62b99498d        puckel/docker-airflow:1.10.0-4   "/entrypoint.sh sche…"   23 seconds ago      Up 22 seconds                      5555/tcp, 8080/tcp, 8793/tcp                 airflow_scheduler_1
03ffe586438d        puckel/docker-airflow:1.10.0-4   "/entrypoint.sh webs…"   24 seconds ago      Up 22 seconds (health: starting)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   airflow_webserver_1
f0df99f82a6b        puckel/docker-airflow:1.10.0-4   "/entrypoint.sh flow…"   24 seconds ago      Up 22 seconds                      8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp   airflow_flower_1
69f9e927018d        postgres:9.6                     "docker-entrypoint.s…"   25 seconds ago      Up 23 seconds                      5432/tcp                                     airflow_postgres_1
767722108d21        redis:3.2.7                      "docker-entrypoint.s…"   25 seconds ago      Up 23 seconds                      6379/tcp                                     airflow_redis_1

コンテナの起動と同時に Airflow も起動されているので、下記にアクセスすると Airflow の画面が表示されます。(まだ何もジョブが登録されていない状態です)
http://localhost:8080


*DAG を作成

DAG(Directed acyclic graph)とはグラフ理論における閉路のない有向グラフのことです。AirflowではこのDAGによって依存がないタスクが先に実行され、依存関係が解消されたタスクが順に実行されます。

Airflow を起動すると、ディレクトリにdagsのフォルダが作成されるので、この配下に Python でDAGの定義ファイルを作成します。
今回は、hello.pyを作成しました。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

"""
DAGを定義
+ 実行開始日やリトライ回数を指定
"""
default_args = {
    'owner': 'root',
    'depends_on_past': False,
    'start_date': datetime(2018, 11, 3, 20, 0, 0),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

"""
DAGのインスタンスを作成
"""
dag = DAG('first_dag', default_args=default_args)

"""
DAGに紐づくタスクを作成
+ BashOperatorでシェルを実行
"""
t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='echo t2',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.message }}"',
    params={'message': 'Hello AirFlow!!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4',
    dag=dag)

"""
依存関係を定義
+ set_upstreamで待つタスクを指定
"""
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])


*DAGを実行

Airflowの画面にアクセスすると、作成したDAGが表示されます。
http://localhost:8080











Linkの1番左の Trigger Dag ボタンを押すとすぐに実行することができます。正常に実行できると、DAG Runs に結果が緑色で表示されます。












Tree View で依存関係を確認することができます。
















Graph View で依存関係をグラフで確認することができます。

















スケジュール実行させたい場合は、DAG一覧に表示されているDAG名の左にあるボタンをクリックしてONにしておきます。


*Dockerコンテナの停止

Airflow の確認が終わったら下記コマンドでDockerコンテナを停止しておきます。
$ docker-compose -f docker-compose-CeleryExecutor.yml down


*所感

Airflow 用のDockerが用意されていたので、簡単に環境を構築することができて便利でした。
今回は簡単な定義ファイルの作成や動作確認しかしていませんが、触ってもっと詳しく調べて使いこなせるようにしたいと思います。