データレイクの可用性を可視化したい
データ品質には様々な項目がありますが、可用性はその中でも重要な項目です。データレイクありきで、dbtを使って生成するDWHやデータマートの可用性の可視化は以下のエントリで試していました。
DWHやデータマートと同様にデータレイクも可用性は重要です。「DWHやデータマートの生成バッチが朝の7時に走るのに、データレイクへのデータの同期が遅れて10時にならないと同期が終わらなかった」というようなことがあればデータレイクの役割をきちんと果せていないことになります。DAMAのデータ品質の定義に沿うと、CurrencyやPunctualityを満たしていない、ということになります。このエントリではこれらをまとめて(大雑把に)可用性と呼ぶことにします。
データソースの可用性の指標の定義として、dbtを使っているシステムであれば例えばsource freshnessを使うことができるでしょう。source freshness
を正常にパスできないのであれば、データソースの可用性は低いと言えそうです。これをデータソースの可用性の指標として利用する場合、source freshness
がパスしたかどうかを時系列でトラッキングしたくなります。
elementaryを使ってsource freshnessの実行履歴を収集する
dbtを使っている場合、データ品質に関する指標はelementaryを使うと簡単に集められることが多いです。以前の記事もelementaryを使って楽をさせてもらっていました。source freshness
の実行履歴も同様に簡単に集められるはず...と思いましたが、実際にできたテーブルを見てみると空になっていました。公式ドキュメントを読んでみると
Unlike dbt and Elementary tests, the results of the command dbt source-freshness are not automatically collected. You can collect the results using Elementary CLI tool.
と書いてあり、CLIを使って自分で収集しろ、ということのようです。だ、だるい。。。
とはいえ、他の項目と同じようにsource freshness
もelementaryの枠組みで実行履歴が収集できると便利なので、なんとかします。やることとしては
source freshness
を実行しているdbtのjobの最新のrunのidを取得- 最新のrunのidを元にそのrunの生成物の中からsources.jsonを取得、適当なパスに配置
- edrコマンドでelementaryのデータの保存先にアップロード(収集)させる
という感じです。curlでもいいですが、dbt cloudのAPIを何回か叩くので、dbt-cloud-cliがあると簡単です。
DBT_CLOUD_API_TOKEN=xxxxxxx latest_run_id=$(dbt-cloud run list --account-id 1234 --project-id 5678 --job-id 9999 --order-by -created_at | jq -r ".data[0].id") mkdir target dbt-cloud run get-artifact --account-id 1234 --run-id "${latest_run_id}" --path sources.json > target/sources.json edr run-operation upload-source-freshness --project-dir .
edr run-operation
はdbtっぽいディレクトリ構造をしていないと動いてくれないので、target/sources.json
のようにファイルを配置してあげるのがポイントです。
これで特定時点であるデータソースのsource freshness
が成功していたかどうか、という履歴が溜まっていくので、本来やりたかったデータレイクの可用性を可視化に繋げられます。以下のようなSQLであるデータソースが特定時点でsource freshness
が成功していたか分かるので、あとは適当に可視化すれば完成です。
SELECT unique_id, DATE(created_at, "Asia/Tokyo") AS created_at, status FROM `my-project.my_elementary.dbt_source_freshness_results` ORDER BY unique_id, created_at DESC