天気予報でも使われる! ワークフローエンジンecflowのすすめ
事業開発部所属エンジニアの籏野です。
フォルシアではデータの取り込み・DBの構築といったバッチ処理についてフォルシア独自のツールを開発し、管理・実行していました。この独自ツールは、あらかじめ決められたフローを、設定を変えて実行するような作りになっており、タスク実行順の組み換えやアプリ独自の処理を追加するにはツール自体をアプリごとにカスタマイズする必要がありました。
このアプリごとのカスタマイズをより簡単に行えるよう、最近フォルシアでは「ecflow」というワークフローエンジンを導入し始めました。本記事では簡単なワークフローを作りながら、ecflowについて紹介したいと思います。
ecflowとは?
ecflowは欧州中期予報センター(ECMWF)が開発したワークフローエンジンであり、天気予報のためのプログラム実行を担っています。こちらの記事でも紹介していますが、複雑な依存関係を持った大量のタスクを処理できるだけでなく、タスク間の待ち時間が短いことが特徴です。
フォルシアはフロントでの検索速度だけでなくバッチの速さにも重きを置いているため、このオーバーヘッドが短縮されることは大きなメリットでした。
ecflowでは独自形式のファイルを組み合わせることで、一つのワークフローを構築していきます。どのようなファイルを用意する必要があるのか、具体的に紹介していきたいと思います。
※ecflowのインストールは本家のドキュメントを参考にしてください。
タスクの定義
今回は単純に「Hello!!」と出力するだけのワークフローを作ってみます。
ファイル構成は以下のようになります。
├── ecf_files │ └── echo.ecf ├── ecf_include │ ├── head.h │ └── tail.h └── test.py
.ecfファイル
ecflowで実行される処理は.ecfファイルに記載します。
例えば、今回作成したecho.ecfは以下のようになっており、任意の単語を出力できるようになっています。
%include <head.h> echo "%WORD%" %include <tail.h>
変数の埋め込み
.ecfファイルでは変数名を%で囲うことで任意の文字列を埋め込むことができます。例に挙げたecho.ecfでは%WORD%部分に任意の文字列を埋め込むことで、出力する文字列を設定できるようになっています。
include
%include <{{ file_name }}>と記載することで、任意の処理を各.ecfファイルに追加することができます。各タスクで共通に実行されるべき処理は別のファイルに切り出すことができるのです。
今回はecflowに対して、実行開始/終了を知らせる処理をhead.hとtail.hに切り出しています。
【head.h】
#!/bin/bash
set -eux
set -o pipefail
# ecflowとのやり取りに必要な変数
export ECF_PORT=%ECF_PORT%
export ECF_HOST=%ECF_HOST%
export ECF_NAME=%ECF_NAME%
export ECF_PASS=%ECF_PASS%
export ECF_TRYNO=%ECF_TRYNO%
export ECF_RID=$$
export PATH=/usr/local/ecflow-%ECF_VERSION%/bin:$PATH
# ecflowにタスク開始を知らせる。
ecflow_client --init=$$
# タスク中でエラーが発生した場合に実行する。
ERROR() {
set +e
wait
ecflow_client --abort=trap
trap 0
exit 0
}
trap ERROR 0
trap '{ echo "Killed by a signal"; ERROR ; }' 1 2 3 4 5 6 7 8 10 12 13 15
【tail.h】
wait # ecflowにタスク終了を知らせる。 ecflow_client --complete trap 0 exit 0
ワークフローの構築
用意した.ecfファイルを組み合わせて、一つのワークフローを構成する必要があります。そのためには「それぞれの.ecfファイルをどのような順番で実行するか」を記載したファイルを用意し、ecflowに読み込ませる必要があります。
しかし、フローが複雑になってくると、この設定を0から用意するのがかなり難しくなってきます。そこでecflowが用意しているPythonライブラリを利用します。
【test.py】
import os
from ecflow import Defs, Suite, Family, Task, Edit
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ECF_DIR = os.path.join(THIS_DIR, "ecf_files")
INCLUDE_DIR = os.path.join(THIS_DIR, "ecf_include")
print("Creating suite definition")
# Suite: 一つのワークフローを示す
suite = Suite(
"test",
Edit(
ECF_HOME=THIS_DIR,
ECF_FILES=ECF_DIR, # ecfファイルを置いたディレクトリ
ECF_INCLUDE=INCLUDE_DIR #includeするファイルを置いたディレクトリ
)
)
# Family: 複数のタスクやFamilyをまとめたもの
# 任意の名前を付ける(今回は"hello")
hello = Family("hello")
# Task: ecfファイルを読み込み処理を実行する
hello.add_task(
Task(
"echo", # ecfファイルを指定
Edit(
WORD="Hello!!" # %WORD%に埋め込む文字列
)
)
)
suite.add_family(hello)
defs = Defs()
defs.add_suite(suite)
print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())
print("Saving definition to file 'test.def'")
defs.save_as_defs("test.def")
上記を実行すると以下のように新しいファイルが生成されます。
├── ecf_files │ └── echo.ecf ├── ecf_include │ ├── head.h │ └── tail.h ├── test │ └── hello │ └── echo.job0 ★NEW ├── test.def ★NEW └── test.py
.defファイル
新しく生成されたtest.defが、先に紹介した「それぞれの.ecfファイルをどのような順番で実行するか」を設定したファイルになります。このファイルをecflowが読み込むことでワークフローが構築されます。
#5.1.0
suite test
edit ECF_HOME '/home/forcia/ecflow_test'
edit ECF_FILES '/home/forcia/ecflow_test/ecf_files'
edit ECF_INCLUDE '/home/forcia/ecflow_test/ecf_include'
family hello
task echo
edit WORD 'Hello!!'
endfamily
endsuite
# enddef
.jobファイル
では、echo.job0とは何なのでしょうか?ファイルの中身は以下のようになっています。
#!/bin/bash
set -eux
set -o pipefail
# ecflowとのやり取りに必要な変数
export ECF_PORT=3141
export ECF_HOST=localhost
export ECF_NAME=/test/hello/echo
export ECF_PASS=XXXXXX
export ECF_TRYNO=0
export ECF_RID=$$
export PATH=/usr/local/ecflow-5.1.0/bin:$PATH
# ecflowにタスク開始を知らせる。
ecflow_client --init=$$
# タスク中でエラーが発生した場合に実行する。
ERROR() {
set +e
wait
ecflow_client --abort=trap
trap 0
exit 0
}
trap ERROR 0
trap '{ echo "Killed by a signal"; ERROR ; }' 1 2 3 4 5 6 7 8 10 12 13 15
echo "Hello!!"
wait
# ecflowにタスク終了を知らせる。
ecflow_client --complete
trap 0
exit 0
こちらを見てわかるように、echo.ecfでは%includeや%WORD%で記載されていた部分が展開されて通常のbashファイルが生成されています。
ecflowでは各変数を展開して生成されたファイルを、defファイルで指定した順番で実行することでワークフローを実行しているのです。
ワークフローの実行
生成されたdefファイルをecflowに読み込ませて実行してみます。
$ ecflow_client --load=test.def # 設定の読み込み $ ecflow_client --begin=test
今回の実行ログは./test/hello/echo.1に出力されます。
このファイルを確認するとHello!!と出力されており、無事タスクが実行されたことがわかります。
...略... + echo 'Hello!!' Hello!! ...略...
最後に
今回紹介したように、ecflowではワークフロー内のタスクが一つの実行ファイルとして生成されます。そのため、jobファイルを見ればタスク実行時に何が起きているのかが一発でわかり、デバッグ等もやりやすいです。
また、スクリプトに落とし込める処理は何でも実行できるので、タスク生成の自由度も高いのではないかと感じています。
今回の内容以外にもGUIによるワークフローの管理、トリガー設定、タスク失敗時の後処理・・・など、ecflowでできることはたくさんあります。それらについても、今後機会があれば紹介したいと思います。
籏野 拓
2018年新卒入社、事業開発部所属エンジニア。
ecflowをdeploy作業の自動化に適用中。



