dbtのsource freshnessの実行結果をelementaryに収集させる

データレイクの可用性を可視化したい

データ品質には様々な項目がありますが、可用性はその中でも重要な項目です。データレイクありきで、dbtを使って生成するDWHやデータマートの可用性の可視化は以下のエントリで試していました。

DWHやデータマートと同様にデータレイクも可用性は重要です。「DWHやデータマートの生成バッチが朝の7時に走るのに、データレイクへのデータの同期が遅れて10時にならないと同期が終わらなかった」というようなことがあればデータレイクの役割をきちんと果せていないことになります。DAMAのデータ品質の定義に沿うと、CurrencyやPunctualityを満たしていない、ということになります。このエントリではこれらをまとめて(大雑把に)可用性と呼ぶことにします。

データソースの可用性の指標の定義として、dbtを使っているシステムであれば例えばsource freshnessを使うことができるでしょう。source freshnessを正常にパスできないのであれば、データソースの可用性は低いと言えそうです。これをデータソースの可用性の指標として利用する場合、source freshnessがパスしたかどうかを時系列でトラッキングしたくなります。

elementaryを使ってsource freshnessの実行履歴を収集する

dbtを使っている場合、データ品質に関する指標はelementaryを使うと簡単に集められることが多いです。以前の記事もelementaryを使って楽をさせてもらっていました。source freshnessの実行履歴も同様に簡単に集められるはず...と思いましたが、実際にできたテーブルを見てみると空になっていました。公式ドキュメントを読んでみると

Unlike dbt and Elementary tests, the results of the command dbt source-freshness are not automatically collected. You can collect the results using Elementary CLI tool.

と書いてあり、CLIを使って自分で収集しろ、ということのようです。だ、だるい。。。

とはいえ、他の項目と同じようにsource freshnessもelementaryの枠組みで実行履歴が収集できると便利なので、なんとかします。やることとしては

  • source freshnessを実行しているdbtのjobの最新のrunのidを取得
  • 最新のrunのidを元にそのrunの生成物の中からsources.jsonを取得、適当なパスに配置
  • edrコマンドでelementaryのデータの保存先にアップロード(収集)させる

という感じです。curlでもいいですが、dbt cloudのAPIを何回か叩くので、dbt-cloud-cliがあると簡単です。

DBT_CLOUD_API_TOKEN=xxxxxxx
latest_run_id=$(dbt-cloud run list --account-id 1234 --project-id 5678 --job-id 9999 --order-by -created_at | jq -r ".data[0].id")
mkdir target
dbt-cloud run get-artifact --account-id 1234 --run-id "${latest_run_id}" --path sources.json > target/sources.json
edr run-operation upload-source-freshness --project-dir .

edr run-operationはdbtっぽいディレクトリ構造をしていないと動いてくれないので、target/sources.jsonのようにファイルを配置してあげるのがポイントです。

これで特定時点であるデータソースのsource freshnessが成功していたかどうか、という履歴が溜まっていくので、本来やりたかったデータレイクの可用性を可視化に繋げられます。以下のようなSQLであるデータソースが特定時点でsource freshnessが成功していたか分かるので、あとは適当に可視化すれば完成です。

SELECT
  unique_id,
  DATE(created_at, "Asia/Tokyo") AS created_at,
  status
FROM
  `my-project.my_elementary.dbt_source_freshness_results`
ORDER BY
  unique_id,
  created_at DESC