Argo Workflowsの代替としてのCloud Workflowsの調査

夏休みの自由研究です。軽く触ってみました程度の技術調査なので、あまり当てにしないでください...。

Argo WorkflowsからCloud Workflowsへの移行のモチベーション

仕事やプライベートでデータ基盤や機械学習のワークフローエンジンとして、Argo Workflowsを最近ずっと使っている。単体だと足りない部分も多少あったが、補助のスクリプトを自前で書くことで概ね満足した使い勝手になっている。

一人や少人数のチームで使う分にはいいのだが、Argo WorkflowsはKubernetesネイティブなワークフローエンジンということもあり、ある程度はKubernetesの知識が必要とされる。GitOpsなどを利用して、普段の生活ではそれほどKubernetesを意識せず元々のやりたいことであるワークフロー管理に集中できる形にはしているが、調査やメンテナンスの過程ではやはりどうしてもKubernetesの知識が必要になる。

グループとしてタスクを進めていきたい場合、オンボーディングコストなどを考慮してスピードが出るならKubernetesでないツールも検討したい。ワークフローエンジンのコントロールプレイン相当をマネージドでやってくれるCloud Workflowsはどうなんだろうと思い、軽く触ってみることにした。

ワークフローエンジン上で動かしている既存のジョブ

データエンジニアあるあるだと思うけど、以下のようなバッチを動かしているという前提。

  • BigQueryのスキャン量通知ボット
    • いわゆるBigQuery警察的なもの
  • テーブル削除系
    • 監査ログなどを使って、90 ~ 180日以上参照されていないテーブルは自動的に削除する
  • BigQueryのスロット管理系
    • 利用人数が多い時間帯はflex slotで多めにスロットを購入する
  • SQLによるデータ品質の監視

必要とされる要件としては以下のようにまとめることができる。

  • 一個一個のバッチは割とシンプルなスクリプトであることが多い
    • 数はじわじわ増えていきがちなので、それぞれのバッチの成否やログは簡単に追いたい
  • 実行時間はめっちゃシビアというわけではない
    • 10分が20分になったとしても即死はしない
    • が、適切にタイムアウトやretryはワークフローエンジン側で設定したい
  • 繰り返し処理や条件分岐などが書きやすい
    • 特定の日はこうする、などの要件入りやすい
    • sprigJinjaのような便利テンプレート関数が使えると助かる
    • 成否に応じて、通知などを適切にしゅっとできる

Cloud Workflowsとは

基本的には公式のドキュメントを見てもらうとして、↑に書いたようなバッチジョブを想定した場合の特徴を列挙していく。

  • GCPのクラウドサービスを中心にワークフローを構成できる
    • イベントの発火はCloud Schedulerだったり、pubsubからイベントを受け取ったり
    • ジョブの実行はCloud FunctionsやCloud Runなど
      • ちょっと前まではCloud Runでもエンドポイントを生やさないといけなかったので、バッチジョブは使いにくいイメージがあったが、先日Cloud Run jobsが出たので、使いやすくなっていそう
      • 一方で、Batchとの使い分けはあんまり分かってない。GPUなど計算リソースを必要とするかどうかって感じかな?
    • BigQueryでのSQLの実行や翻訳など、よくあるユースケースについてはconnectorが用意されているので、比較的簡単に動かせる
      • 逆に言うと、これらのケースでカバーできないようなものはCloud Runなど自前でコンテナを使ってやることが多そう。自分のケースはconnectorで片づくことがほとんどないため、Cloud Run jobsを多用することになる
  • 基本的にはワークフローの管理のみ
    • コンピュートやストレージやスケジューリングは担当しない
      • Kubernetesの知識はいらない
    • 糊付けの役割に徹する、AWSでいうところのStep Functionsと同じ
  • Terraformも公式から提供されているので、IaCもやりやすい
    • メインのワークフロー部分はyamlで書いて、templatefileで読み込む形がよさそう

ワークフローを動かしてみる

やれることは概ね分かったが、実際に使ってみないと肌触りは分からない。簡単な例でいいので、動かしてみることにした。ワークフローエンジンはGUIでぽちぽちやっていると後から地獄を見ることになるのは分かっているので、IaCでやる。ありがたいことに、Cloud Workflows + Cloud Run Jobsの構成をTerraformでやる方法を公開してくれている人がいたので、それを真似しながらやった。

これをベースに並行してブロックを実行したり、条件分岐させるようなテンプレートを書いてみた。個々のステップの処理はほぼ意味のないもの。

ワークフローのyamlの例(クリックすると中身が展開されます)

main:
  params: [args]
  steps:
  - init:
      assign:
        - project_number: $${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
        - a: 0
        - b: 0
        - c: 0
  - parallel_test:
      parallel:
        shared: [a, b, c]
        branches:
          - test_cloudrun_jobs1:
              steps:
                - hoge:
                    call: googleapis.run.v1.namespaces.jobs.run
                    args:
                      name: $${"namespaces/" + project_number + "/jobs/integration-cloud-run-jobs"}
                      location: europe-west9
                    result: resp
                - call_a:
                    call: sys.log
                    args:
                      text: $${resp}
                      severity: INFO
                - set_a:
                    assign:
                      - a: $${resp}
          - test_cloudrun_jobs2:
              steps:
                - fuga:
                    call: googleapis.run.v1.namespaces.jobs.run
                    args:
                      name: $${"namespaces/" + project_number + "/jobs/integration-cloud-run-jobs"}
                      location: europe-west9
                    result: resp
                - call_b:
                    call: sys.log
                    args:
                      text: $${resp}
                      severity: INFO
                - set_b:
                    assign:
                      - b: $${resp}
  - test_cloudrun_jobs3:
      call: googleapis.run.v1.namespaces.jobs.run
      args:
        name: $${"namespaces/" + project_number + "/jobs/integration-cloud-run-jobs"}
        location: europe-west9
      result: resp
  - test_output:
      call: sys.log
      args:
        text: $${resp}
        severity: INFO
  - finish:
      return: "OK"

GUI上では以下のように可視化できる。

ワークフローの可視化

よかったポイント

  • 実行前にワークフローを実行するためのDAG(どういうブロックや条件分岐があるか)が見える
    • argo workflowsは実行中のものについてはDAGが見えるが、実行前には分からないので便利
  • Cloud Monitoring Metricsでワークフローのメトリックが取得できる
    • この辺はマネージドサービスならでは感。メトリックを元にアラートを飛ばすことも簡単にできそう

ワークフローに関するメトリックが一覧できるダッシュボード

改善して欲しいポイント

運用に乗せようと思うと、正直結構辛いポイントがまだまだ多かったので、つらつらと書いてみる。

ワークフローを走らせた時の実行結果が大分分かりにくかった。実際に動かしてみたときの画面はこういう感じ。

Cloud Workflowsの実行結果

ワークフローを走らせた後(特に失敗時)は以下のことを確認したいことが多い。現状の画面だとそれができない。

  • 下にArgo Workflowsで失敗した例を載せてますが、sub12のステップでコケたんだなということが分かる
  • Cloud Workflowsの場合、実行結果にDAGがないため、フラットなログからDAGのどのステップに該当するかを目grepして確認していくことになる
    • 障害対応など焦りがちな時にこういうのは辛い
    • ワークフローのyamlを書いた人と対応する人が違う場合にも辛い

Argo WorkflowsでのDAGの実行結果。失敗したノードが色付きで分かる

  • 各ステップの実行時間やretryの回数などがDAGから分からない
    • バッチが遅くなっているなーという時にDAGがそもそも出ないので、どのステップの実行時間が支配的かが全然分からない
    • sys.logで出せば分かるけど、正直その辺はワークフローで計算した上で画面に出して欲しい
      • こういうことを楽するためにワークフローエンジンを使っているので
  • Cloud Run jobsの実行ログが Cloud Workflowsの画面上からは分からない
    • 実行ログをsys.logに出力したところ、以下のようなJSONが返ってくる
      • {"apiVersion":"run.googleapis.com/v1","kind":"Execution","metadata":{"annotations":{"client.knative.dev/user-image":"asia-northeast1-docker.pkg.dev/my-project/integration-cloud-run-jobs/sample_image","run.googleapis.com/client-name":"gcloud","run.googleapis.com/client-version":"398.0.0"
    • Cloud Run jobsの裏側でknativeが動いているのは分かるが、肝心のCloud Run jobsの実行ログが出力されない...!
    • イチイチCloud Runの画面に行って、今回Cloud Workflowsで動かしたジョブに対応するものがどれかを自分で探さないといけない
      • Cloud Run jobsがまだプレビューでの提供ということもあるのだろうけど、これは本当に面倒なので、GAまでにはどうにかなっていて欲しい
  • ワークフローテンプレート内で利用可能なutility関数がもっと充実して欲しい

所感

改善して欲しいポイントに書きましたが、Argo Workflowsのような既存のバッチ用途のワークフローエンジンからの移行は正直まだまだ厳しいなと思いました(AWS Step Functionsのほうが使いやすかった)。特に運用に乗せたときに困るであろうことが容易に想像できるポイントが複数あったので、それの改善待ちかなーと思いました。とはいえ、この領域はどんどん進んでいくところではあるので、移行がしやすいように既存ジョブを整備しておこうとは思います。進化、待ってます!