くれなゐの雑記

例を上げて 自分で手を動かして学習できる入門記事を多めに書いています

OpenFOAMをairflowでジョブスケジューリングしてみる

動機

普段はCentOSにtorqueを使ってジョブスケジューリングして計算していた。 ある日普段使用しているPCも空いてる時間は計算を回そうと思い、torqueをインストールしようとしたが、Ubuntuのaptで入らなかった。 入れようと思えば入れれるが、せっかくなので最近のツールも使ってみようと思い、試してみた。

ジョブスケジューリングするツールの選択

調べてもよくわからなかった。 なんかモダンっぽいGUIで、並列実行ができて、Pythonでいろいろかけそうだったのでこの記事を見てAirflowを選んだ。 Python3に対応しており、最近もしっかりメンテナンスされているのもいい。

qiita.com

実行環境

kurenaif@ubuntu:~$ screenfetch 
                          ./+o+-       kurenaif@ubuntu
                  yyyyy- -yyyyyy+      OS: Ubuntu 18.10 cosmic
               ://+//////-yyyyyyo      Kernel: x86_64 Linux 4.18.0-10-generic
           .++ .:/++++++/-.+sss/`      Uptime: 29m
         .:++o:  /++++++++/:--:/-      Packages: 1637
        o:+o+:++.`..```.-/oo+++++/     Shell: bash 4.4.19
       .:+o:+o/.          `+sssoo+/    Resolution: 1920x983
  .++/+:+oo+o:`             /sssooo.   DE: GNOME 
 /+++//+:`oo+o               /::--:.   WM: GNOME Shell
 \+/+o+++`o++o               ++////.   WM Theme: Adwaita
  .++.o+++oo+:`             /dddhhh.   GTK Theme: Yaru [GTK2/3]
       .+.o+oo:.          `oddhhhh+    Icon Theme: Yaru
        \+.++o+o``-````.:ohdhhhhh+     Font: Ubuntu 11
         `:o+++ `ohhhhhhhhyo++os:      CPU: Intel Core i7-4790 @ 4x 3.6GHz [100.0°C]
           .o:`.syhhhhhhh/.oo++o`      GPU: svgadrmfb
               /osyyyyyyo++ooo+++/     RAM: 952MiB / 3921MiB
                   ````` +oo+++o\:    
                          `oo++.      
mysql: 5.7.24
airflow: v1.10.0
OpenFOAM: dev

Airflowのインストール

Airflowのインストール

リブセンスのAirflowの記事のページと、airflowの公式ドキュメントを参考にしてインストールした。

$ sudo apt install python3-pip
$ export  SLUGIFY_USES_TEXT_UNIDECODE=yes 
$ pip3 install apache-airflow

ここでairflowを実行しようとすると

$ airflow initdb
airflow: command not found

おや、新しい環境なのでパスが通っていないらしい pip3 show apache-airflowコマンドでパスを調べて… パスを通す

$ airflow initdb
$ airflow version
[2018-11-04 10:46:53,290] {__init__.py:51} INFO - Using executor SequentialExecutor
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   v1.10.0

v1.10.0 が入ったらしい

mysqlのインストール、準備

今回は、リブセンスのAirflowの記事と同じ構成にするので、続けてmysqlceleryのパッケージをインストールしてゆく。

$ sudo apt install libmysqlclient-dev
$ sudo apt install mysql-server mysql-client
$ sudo mysql_secure_installation
$ pip3 install "apache-airflow[mysql, celery]"

これでとりあえず必要なものはインストールできたので、mysqlの設定をしていく airflowというユーザー名で作ると設定が楽なので、airflowというユーザー名と、airflowというDBを作る。(ちなみに、airflowのパスワードをairflowにすると一番何も考えずに設定ができるが、セキュリティ的にアレ)

雑なパスワード設定の方法は

qiita.com

を参照する。

mysql> create user airflow@localhost identified by {パスワード}
mysql> create database airflow;
mysql> grant all on airflow.* to airflow@localhost;

ここで、dbを見てみよう

$ mysql -u airflow -p
Enter password: 
(メッセージ略)
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| airflow            |
+--------------------+
2 rows in set (0.00 sec)

OK ユーザーとデータベースの作成ができた。

airflowの設定

構成として、

とすべてMySQLの構成で行う。 これが一番設定が楽だった。

今回は、以下に示す設定を変更した。

executor = CeleryExecutor
sql_alchemy_conn = mysql://airflow:{パスワード}@localhost:3306/airflow
load_examples = False
catchup_by_default = False
worker_concurrency = 3
executor

今回はCeleryExecutorを使います。 Localでしか使わないので、LocalExecutorと悩んだのですが、なんか動かなかったので今回はこれで。

sql_alchemy_conn

CeleryExecutorではデフォルトのsqliteは使えません。

mysql://{ユーザ名}:{パスワード}@{アドレス}:{ポート}/{DB名}

というフォーマットで指定します。

load_examples

これは趣味なのですが、examplesをloadするとすごいloadされるので僕は切りました。

catchup_by_default

catchupという機能をデフォルトでonにするかどうかです。 リブセンスのAirflowの記事がわかりやすいです。

worker_concurrency

一つのワーカーの並列実行する最大のプロセス数だと思っています。 4スレッドなので3を与えました。合ってるかわかりません。

設定の反映

満足行く設定ができたら、設定を反映します

airflow initdb

このときに

Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql

のようなエラーが出るかもしれません。その時は以下の記事を読んでみてください。

MySQL - 5.6 系で TIMESTAMP 型デフォルト値警告! - mk-mode BLOG

設定の確認

$ airflow webserver

コマンドでウェブサーバーを起動してみましょう。 ブラウザを立ち上げて、 ipaddress:8080 で管理画面っぽいやつが出るはずです。

f:id:kurenaif:20181105204523p:plain

計算するケースの準備

今回はOpenFOAMチュートリアルから5ケース使います デスクトップに、5ケース分置きます

$ cd Desktop/
$ cp -r ~/OpenFOAM/OpenFOAM-dev/tutorials/compressible/rhoSimpleFoam/aerofoilNACA0012/ .
$ cp -r ~/OpenFOAM/OpenFOAM-dev/tutorials/compressible/rhoPimpleFoam/RAS/angledDuct .
$ cp -r ~/OpenFOAM/OpenFOAM-dev/tutorials/multiphase/interFoam/RAS/damBreak/damBreak/ .
$ cp -r ~/OpenFOAM/OpenFOAM-dev/tutorials/combustion/chemFoam/gri/ .
$ cp -r ~/OpenFOAM/OpenFOAM-dev/tutorials/compressible/rhoCentralFoam/shockTube/ .

DAGファイルの作成

airflowでは、DAGファイルというpythonスクリプトを書いて、 airflow.cfg にかかれている dags_folder のフォルダに入れます。

airflowの公式ページのチュートリアルに簡単なスクリプトチュートリアルがあるので、 一度目を通してみてください。ここでは、いきなりOpenFOAMの計算を流します。

下記のソースコードを適当なファイル名で保存し、dags_folderに入れます。(デフォルトでは ~/airflow/dags になっていると思います。)

これでファイルの準備は完了です。

2点、チュートリアルと大きく違っており、

  • start_date は本来はn日前みたいな表記は非推奨なのですが、今回は1回実行でキャッチアップなしなので、これで大丈夫です。
  • schedule_interval は今回は一回実行のみなので、 @once を与えています。

また、今回はタスクの依存関係は特にないので、set_upstreamは不要です。 あとは基本的にこのファイルを見ればおおよそやっていることがわかると思います。 ファイルがちゃんとできているかどうかは

python3 file_name.py

とすると、チェックできます。 BashOperatorの変数への代入は、今回のケースは不要です

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(7),
    'email': None,
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG('cfd-test', default_args=default_args, schedule_interval = '@once')

# t1, t2 and t3 are examples of tasks created by instantiating operators

aerofoilNACA0012 = """
source ~/OpenFOAM/OpenFOAM-dev/etc/bashrc
cd ~/Desktop/aerofoilNACA0012/
./Allrun
"""
task_angledDuct = BashOperator(
    task_id='aerofoilNACA0012',
    bash_command=aerofoilNACA0012,
    dag=dag)

angledDuct = """
source ~/OpenFOAM/OpenFOAM-dev/etc/bashrc
cd ~/Desktop/angledDuct/
./Allrun
"""
task_angledDuct = BashOperator(
    task_id='angledDuct',
    bash_command=angledDuct,
    dag=dag)

damBreak = """
source ~/OpenFOAM/OpenFOAM-dev/etc/bashrc
cd ~/Desktop/damBreak/
./Allrun
"""
task_damBreak = BashOperator(
    task_id='damBreak',
    bash_command=damBreak,
    dag=dag)

gri = """
source ~/OpenFOAM/OpenFOAM-dev/etc/bashrc
cd ~/Desktop/gri/
./Allrun
"""
task_gri = BashOperator(
    task_id='gri',
    bash_command=gri,
    dag=dag)

shockTube = """
source ~/OpenFOAM/OpenFOAM-dev/etc/bashrc
cd ~/Desktop/shockTube/
./Allrun
"""
task_shockTube = BashOperator(
    task_id='shockTube',
    bash_command=shockTube,
    dag=dag)

airflowの起動、実行

今回の構成では、以下の3つのairflowのツールを実行し、その後タスクをトリガーします。

3つターミナルを起動し、(systemdで裏で実行してもいいです)

airflow webserver
airflow worker
airflow scheduler

を起動します。それぞれ何を表すかは、リブセンスのairflowの記事がとてもわかりやすかったです。

localhost:8080 につなぐと、

f:id:kurenaif:20181105204541p:plain

のような画面があるので、

一番左のOFFになってるやつを ONにしましょう。 これでトリガーするとジョブが流れるようになります。

f:id:kurenaif:20181105204549p:plain

どんなタスクが中に入っているかは、ON/OFFのスイッチの右の cfd-test をクリックすると、Graph ViewやTree Viewでタスクの一覧がみえます

f:id:kurenaif:20181105204603p:plain

f:id:kurenaif:20181105204614p:plain

実行方法は、CUIとウェブページを使う方法があり、

CUIの方法は

airflow trigger_dag cfd-test

GUIの方法は、 ウェブページのLinksの再生ボタンみたいなやつを押せばトリガーされます。

f:id:kurenaif:20181105204620p:plain

実行が終わると、再生ボタンを押したページのRecent Tasksの数字が増えたり、DAG Runsの数字が増えたり、

Treeviewの緑のランプが増えたり

f:id:kurenaif:20181105204628p:plain

ガントチャートを確認できたり(途中でも確認できます)

f:id:kurenaif:20181105204634p:plain

します。

これで実行完了です。

angledDuctをみてみると…

$ ls
0  10  Allrun  constant  log.blockMesh  log.rhoPimpleFoam  system

できていますね。

所感

並列処理なジョブを回すときに、どうやって制限するのかよくわからなかったタスクのresourcesフィールドに、cpusみたいなのがあって、それに3とか入れても平気で他と並列処理してるしウーンという感じ

GUIでジョブの進行状況が見れたり、Pythonスクリプトで簡単にジョブファイルを作れるのはいいですね。 いろいろ応用できそうです。

今後も少し運用してみます。