でーたさいえんすって何それ食えるの?

JuliaとかRとかPythonとかと戯れていたい

R版metaflowの使い方

2019年12月にpython版が公開され、2020年8月にR版が公開されたので早速チュートリアルを参考にしながら使ってみてポイントをメモとして残しておく。

触ってみた所感は、DS人材のようにプロダクションReadyなソースをつくることに比重を置いていない人に対しても扱えそうで良さそう。ただし、このフレームワークだけでプロダクションReadyな状態に仕上げることは難しそう。なので、mlflowのような足りない部分を補うライブラリと併用する必要はありそう。とは言っても、metaflowで管理されているだけである程度安定して運用できそうではあるので、PoCでしばらく運用してみるという段階であればmetaflowだけの利用でも事足りるのではないかな、と感じた。

metaflowとは

NETFLIXが開発したMLワークフロー管理用フレームワーク
類似したものはAirflow、Luigi、kedro、RではrecipesやDrakeといったものがあるが、これらと比べて機能責務が狭く利用できるようになるまでの学習コストは低い印象。

metaflowを使うことで
- 処理の実行
- 実行開始と終了といった実行ステータスの管理
- 処理の並列化
- エラーハンドリング
といった機能を簡単に実装することができる。
更に、NETFLIXではawsでへデプロイしていたのかawsと連携したスケールアップ/アウトを簡単に行うための仕組みも含まれている。

もともとはPython用モジュールで、これをRから呼び出せるようにラップしたものがR版のmetaflowとなっている。
なので、R版metaflowの実行にはPythonの実行環境も必須となる。

基本的な使い方・考え方

関数で処理を定義し、第一引数で指定したオブジェクトを次の処理に引き渡して処理をつなぐイメージ。
この各処理のことをstepとよび、stepの開始から終了までをflowと呼んでいる。
チュートリアルに従うとselfとされていて、Pythonでクラスを定義してメソッドを利用しているような感覚。
この処理を連結させる部分はtidyverseの考え方と親和性が高く、パイプで処理ステップを連結させることが当たり前に想定されている。

例えば、チュートリアルのフローをそのまま掲載すると次のように書くことになる。
なので、Global envに処理がベタっと書かれたソースから必然的にモジュール化されたソースになるような仕組みとなっており、ML系のソースを運用のために整えるには都合が良さそう。

#  A flow where Metaflow prints 'Hi'.
#  Run this flow to validate that Metaflow is installed correctly.

library(metaflow)

# This is the 'start' step. All flows must have a step named 
# 'start' that is the first step in the flow.
start <- function(self){
    print("HelloFlow is starting.")
}

# A step for metaflow to introduce itself.
hello <- function(self){
    print("Metaflow says: Hi!") 
}

# This is the 'end' step. All flows must have an 'end' step, 
# which is the last step in the flow.
end <- function(self){
     print("HelloFlow is all done.")
}

metaflow("HelloFlow") %>%
    step(step = "start", r_function = start, next_step = "hello") %>%
    step(step = "hello", r_function = hello,  next_step = "end") %>%
    step(step = "end", r_function = end) %>% 
    run()

stepには空のstepを入れることも可能で、step(step = "end")のようにr_functionを指定しなくても良い。
処理完了や、ある程度の塊の処理が終わったところを通知させるためにログを吐かせるのに活用できそう。

最後のstepでselfに引き渡しておいたオブジェクトがすべて結果としてartifactとして保存される。 例えば最終stepで

self$result1 <- 'foo'
self$result2 <- c(1, 2, 3)

みたいにもたせておけば、このフローの実行結果としてresult1とresult2がartifactとして.metaflow内に格納される。
これは再利用が可能な上に、後で紹介するように別なフローから呼び出して再利用したりもできる。

各flowの実行結果はmetadataが保存される。 プロジェクトのルートディレクトリに.metaflowというディレクトリが作成されて、その下に実行したflowのデータが保持される(これをartifactと呼ぶ)。具体的にHelloFlowというフローを実行すると.metaflow/HelloFlow という感じで作られる。
このディレクトリの中にある各フローのartifactに対して実行毎にメタデータが保存され、その中間生成オブジェクトの情報も保存される。 これはmlflowのartifactをローカルファイルシステムとして保持させている状態と同じような状態で、メタデータjson形式で持っている。なので万が一スクリプト側でどう扱っていいか困ったときには.metaflowの中に実際に入ってデータを覗いてみることもできそう。

使い方のポイント

実行

フローを定義したソースをターミナルからRscriptで実行するかRStudioのjobsとして流すことで実行できる。
※RStudioでCtrl+Sで実行しても良いが、namespaceが切り替わってしまい実行後にGlobal Environmentにもどすのが面倒なのであまり積極的には使わない方が良さそう。

ターミナルからの実行時に与えるパラメータで実行方法が変えられる

  • run:通常の実行
    • run --with retry:stepの再実行を有効にしてフローを実行
  • resume:失敗したところから再実行
    • resume --origin-run-id 12345:特定の実行IDに対して再実行(この例だとIDが12345)
    • resume Step001:特定のステップから再実行(この例だとステップ名がStep001のところから再開)

例えばリトライを有効にして通常実行するならば、次のような感じで実行することになる。

Rscript flow01.R run --with retry

ジョブのパラメータ化

parameter()で実行パラメータを定義することができる。
さらに、ここでパラメータ化したものはソースに追記なしでターミナルからの実行パラメータオプションとしても利用できるようになる

metaflow("test_flow") %>%
  parameter("genre") %%
  step(step = "start", r_function = proc1, next_step = "end") %>% 
  step(step = "end") %>%
  run()

のように定義されているとすると、genreがパラメータとなっておりコンソールから実行するときは次のようにパラメータを与えることができるようになる。

Rscript flow02.R run -genre 'Sci-Fi'

stepの分岐

stepを定義するときにnext_stepを複数指定することでstepを分岐させることができ、 step(..., join = TRUE)としたstepで結果を纏めることができる。
joinするせずにstepを繋げれば、joinまで並列処理されることになる。各ステップが独立したRのプロセスとして処理されるので、Rでの並列処理のコードを書かずに簡単に並列化できるのが便利。
考え方としては、map/reduceを扱うようなイメージ。

stepの並列化

Rのforech、またはPythonのjoblibのような感じでグループごとに処理を並列処理がmetaflowの仕組みで実現が可能。
次のstepにわたすときにforechオプションを指定するだけで並列化ができる。
iterateする各要素はself$inputとして次のstepで実行する関数で取り出せる。
結果をまとめるための関数には、前の処理結果を受け取るために2つ目の引数を用意しておくのが実装としてやりやすいっぽい。 具体的には次のように関数を用意しておくのが良さそう。

f1 <- function(self, inputs){
    # some process 
}

joinのstepでは、2つ目の引数にiterateした各結果がリストとして入っているので関数内で取り出して統合する必要がある。
ここが、考え方が面倒だけど入れ子構造のリストになっているのでpurrrを活用して書くと良さそう。

例えば、おなじみirisデータについてSpecies毎にSepal.Lengthの平均を計算して、結果をまとめてから1つのvectorとしてprintする処理を考えると、次のように書くことになる。

library(metaflow)

s <- function(self){
  self$df <- iris
  self$i <- unique(iris$Species)
}

p1 <- function(self){
  library(magrittr)
  library(dplyr)
  tmp_df <- self$df %>%
    filter(Species == self$i)
  self$mean_SepalLength <- mean(tmp_df$Sepal.Length)
}

p2 <- function(self, inputs){
  library(magrittr)
  purrr::map(inputs, function(x)x$mean_SepalLength) %>% unlist %>% print
}

metaflow("test02") %>%
  step(step = "start", r_function = s, next_step = "proc1", foreach = "i") %>%
  step(step = "proc1", r_function = p1, next_step = "join") %>%
  step(step = "join", r_function = p2, next_step = "end", join = TRUE) %>%
  step(step = 'end') %>% 
  run()

フローの再実行

実行が失敗した場合に対して頑健になるようcatch/retryの仕組みが用意されている。
catch/retryを追加するにはstepにdecoratorとして渡せば良く、decoratorは複数渡すことが可能。

  • retry:decorator('retry') で再実行するようになる。再実行方法のオプションは次のものがある。
    • times:再実行回数
    • minutes_between_retries:再実行までの待機時間

エラー発生時の処理を定義するには、さらにdecorator('catch')を追加する。
catchのdecoratorにはvarが指定でき、varで指定した名前のスロットにエラーが入る。正常終了していれば、このスロットはNULLとなっている。
具体例として前にあった並列処理の例にdecoratorを追加して次のようにすると、joinするstepで呼ばれる関数p2のinputsにはvarで指定したcompute_failedの値がエラー発生時に入る。

library(metaflow)

s <- function(self){
  self$df <- iris
  self$i <- unique(iris$Species)
}

p1 <- function(self){
  library(magrittr)
  library(dplyr)
  tmp_df <- self$df %>%
    filter(Species == self$i)
  self$mean_SepalLength <- mean(tmp_df$Sepal.Length)
}

p2 <- function(self, inputs){
  library(magrittr)
  purrr::map(inputs, function(x)x$mean_SepalLength) %>% unlist %>% print
}

metaflow("test02") %>%
  step(step = "start", r_function = s, next_step = "proc1", foreach = "i") %>%
  step(step = "proc1", 
  decorator("catch", var="compute_failed"),
  r_function = p1, next_step = "join") %>%
  step(step = "join", r_function = p2, next_step = "end", join = TRUE) %>%
  step(step = 'end') %>% 
  run()

ここではエラーが発生しないコードだが仮にp1でエラーが発生していたとすると、p2の引数inputsにエラーオブジェクトが含まれていることになり、次のようにis.null()でエラーが起きていたかどうか確認ができる。

p2 <- function(self, inputs){
  for(input in inputs){
    if(!is.null(input$compute_failed)){
      print("Exception happended!!")
    }
  }
  library(magrittr)
  purrr::map(inputs, function(x)x$mean_SepalLength) %>% unlist %>% print
}

更にdecorator('timeout')を追加することで処理がハングアップしていると判定してプロセスを終了させるまでのタイムアウトまでの時間を指定できる。

フロー結果の再利用

既に実行済みのflowはあとで再利用が可能。また、別なflowで利用することも可能。 例えばtestflow01というflowで結果はresult01となっているflowを実行済みだとすると、次のようなコードで実行済みの結果を再利用することができる。

flow <- flow_client$new("testflow01")
run <- run_client$new(flow, flow$latest_successful_run)
run$artifact("result01")

使う上での注意点

  • 2020年8月時点ではWindowsネイティブサポートが出ていない
  • 他ライブラリとちがい実行stepのDAGを可視化するライブラリはまだ用意されていないらしい。
  • 各ステップ毎に環境が独立しているから、ライブラリや自作関数はそれぞれロードし直す必要がある

  • AICを使った変数選択をするstep関数とかぶってしまうので、名前空間を明示的に書くかconflictedを利用すると良さそう

  • step名にスペースは入れるとエラーになってしっまう
  • metaflow('class_name')でつくったclass_nameがRのenvironmentとしてオブジェクトが生まれてしまう
    例えば、helloという関数を定義した状態でmetaflow('hello')とflow名を設定すると、hello関数がenvironmentオブジェクトで上書きされて想定通り動かない。
  • metaflowで処理を実行するとnamespaceが自動的にuser:{username}になってしまう
    →REPLで実行するとenvironmentが意図せず切り替わるのでRStudioだとsuggest機能がうまく働かなくなってしまいちょっと面倒