App Engine の Pipelines API について調べたので、その内容を書き残しておきます。
*参考
*App Engine Pipelines API とは
複数で時間のかかる App Engine のワークフローを柔軟につなぎあわせて実行するためのAPIです。使用を最小限に抑えながら多数のワークフローを同時に実行することができます。Datastore や memcache などのデータソースで動作します。また、同期実行・非同期実行を指定することができ、非同期実行する場合は子パイプラインを生成することで実現できます。
パイプラインが失敗して再試行される可能性があるので、冪等性(何回やっても結果が同じ)である必要があります。
*インストール
下記コマンドを実行します。pip install GoogleAppEnginePipeline -t lib/
インストール後、app.yaml に下記を追加します。
handlers:
- url: /_ah/pipeline.*
script: pipeline.handlers._APP
*同期パイプラインの実行
パイプラインを開始するハンドラを定義すると、選択したURLでリクエストを受け取ります。パイプラインを定義する際は、Pipelineオブジェクトから run() を継承します。パイプラインはオブジェクト生成後に start() で実行します。
import logging
import webapp2
import pipeline
# ハンドラを定義
class RunPipelineHandler(webapp2.RequestHandler):
def get(self):
square_stage = SquarePipeline(10)
# パイプラインの実行
square_stage.start()
# パイプラインを定義
class SquarePipeline(pipeline.Pipeline):
def run(self, number):
return number * number
*パイプラインの実行確認
start()からpipeline_idが発行されるので、このpipeline_idを使ってhas_finalized
プロパティで実行結果を確認することができます。True だった場合は実行が完了しています。
import logging
import webapp2
import pipeline
class RunPipelineHandler(webapp2.RequestHandler):
def get(self):
square_stage = SquarePipeline(10)
# パイプラインの実行
square_stage.start()
pipeline_id = square_stage.pipeline_id
stage = SquarePipeline.from_id(pipeline_id)
# 実行結果を確認
if stage.has_finalized:
logging.info('Finalized')
else:
logging.info('Not finalized')
class SquarePipeline(pipeline.Pipeline):
def run(self, number):
return number * number
全ての処理が完了したことを確認する場合は、
finalized()
をオーバーライドすることで結果を取得することができます。import logging
import webapp2
import pipeline
class RunPipelineHandler(webapp2.RequestHandler):
def get(self):
square_stage = SquarePipeline(10)
square_stage.start()
class SquarePipeline(pipeline.Pipeline):
def run(self, number):
return number * number
# パイプライン作業完了後に呼び出される
def finalized(self):
logging.info('All done! Square is %s', self.outputs.default.value)
*パイプラインのテスト
start_test() で実行結果をテストすることができます。class RunPipelineHandler(webapp2.RequestHandler):
def get(self):
square_stage = SquarePipeline(10)
# テストを実行
square_stage.start_test()
assert stage.outputs.square.value == 100
*複数のパイプラインに接続
yield を使ってパイプラインを呼び出すことで、パイプラインを安全に接続することができます。yield 呼び出しによって開始されたパイプラインは、タスクキューで実行されるApp Engineの個別のタスクになります。import logging
import webapp2
import pipeline
class RunPipelineHandler(webapp2.RequestHandler):
def get(self):
stage = TwiceSquaredPipeline(10)
stage.start()
class SquarePipeline(pipeline.Pipeline):
def run(self, number):
return number * number
class TwiceSquaredPipeline(pipeline.Pipeline):
def run(self, number):
# 複数のパイプラインを接続
first_square = yield SquarePipeline(number)
second_square = yield SquarePipeline(first_square)
yield LogResult(second_square)
class LogResult(pipeline.Pipeline):
def run(self, number):
logging.info('All done! Value is %s', number)
routes = [
webapp2.Route('/pipeline-test/', handler='main.RunPipelineHandler')
]
APP = webapp2.WSGIApplication(routes)
*指定順序でパイプラインを実行
withキーワードを使って実行順序を強制することができます。最初のパイプラインの実行が完了したあと、次のパイプラインが実行されます。
Pipelineライブラリは InOrder() と After() のがあり、InOrder() を使う場合は順番にパイプラインを書くとその順序通りに実行します。
class FanOutFanInPipeline(pipeline.Pipeline):
def run(self, count):
results = []
for i in xrange(0, count):
result = yield SquarePipeline(i)
results.append(result)
result = yield Sum(*results)
with pipeline.InOrder():
# 順番に実行
yield UpdateDashboard()
yield EmailTeam()
After() を使う場合はネストさせ、特定の処理実行後に次のパイプラインを実行させるといったことができます。
class LogWaitLogAfter(pipeline.Pipeline):
def run(self, message1, message2, delay):
first = yield LogMessage(message1)
with pipeline.After(first):
delay = yield Delay(seconds=delay)
with pipeline.After(delay)
yield LogMessage(message2)
yield LogMessage('This would happen after the first message')
yield LogMessage('This would happen immediately on run')
*非同期パイプラインの実行
async プロパティを True に設定することで、パイプラインを非同期実行させることができます。パイプラインが実行されると、ステータスが変わるまでRun状態になります。complete()
は非同期実行でのみ使えるメソッドで、パイプラインの実行が全て完了した場合にパイプラインを完了の状態にするメソッドです。class AsyncPipeline(pipeline.Pipeline):
# 非同期実行に設定
async = True
def callback(self):
self.complete()
*コールバックURL
コールバックURLを生成するためにはget_callback_url()
を呼び出します。このメソッドを呼ぶことで下記形式のURLが生成されます。choiceを指定することでパラメーターを指定することもできます。
/_ah/pipeline/callback?choice=approve&pipeline_id=fd789852183b4310b5f1353205a967fe
コールバックURLはコンソールに記録され、URLにアクセスすると
callback()
が実行されます。class AsyncPipeline(pipeline.Pipeline):
async = True
public_callbacks = True
def run(self):
# URLを生成
url = self.get_callback_url(choice='approve')
logging.info('Callback URL: %s' % url)
def callback(self, choice):
if choice == 'approve':
logging.info('Pipeline Complete')
self.complete()
*所感
実際の動作確認はまだしていませんが、使い方を覚えてしまえば簡単にワークフローを実行させることができそうです。Pipeline UI で実行結果や子パイプラインの確認ができる点も便利です。業務で触ってより理解を深めていきたいと思います。
Sign up here with your email
ConversionConversion EmoticonEmoticon