TL;DR
- 巨大なプロジェクトでdbt-osmosisを実行しようとすると時間がかかるが、それを短縮するための
--catalog-file
というオプションが存在する catalog.json
はdbt docs generate
の成果物であるが、巨大なプロジェクトの場合は実行に非常に時間がかかるかOOMで失敗することもある- dbt-osmosisを実行したい限られたnodesに対してのみ
dbt docs generate
を適用したいが、範囲を絞るオプションは現在のところ存在しない manifest.json
とINFORMATION_SCHEMA.COLUMNS
を使って、最小限のcatalog.json
を生成するスクリプトを書いた
背景
巨大なプロジェクトに対してのdbt-osmosisとcatalog-fileオプション
dbt-osmosisでカラムのmetadataを付与する際、対象が小さければ特に問題なく終わりますが、対象が大きい場合は実行に時間がかかる or 終わらない場合があります。というのも、対象の全てのモデルに対して、dbt-osmosisがINFORMATION_SCHEMAを実行するからです。
このような巨大なプロジェクトに対してdbt-osmosisを実行する場合、毎回INFORMATION_SCHEMAを実行しなくて済むように--catalog-file
というオプションが用意されています。
catalog.jsonを生成するdbt docs generateの実行時間が長い & 対象を絞れない問題
では、オプションで指定するこのcatalog.json
は誰が作っているかというと、dbt docs generate
です。
このコマンドはdbtのmanifest.json
内に登場するモデルの情報を元に(BigQueryなどの)DWHに実際のリソースの情報を問い合せて
- カラム名の一覧やその型
- テーブルサイズや行数
などの情報を取得します。大きめのdbtプロジェクトではdbt docs generate
が30分かかることなどはあまり珍しくないと思います(dbtのモデル数が多い場合も問題ですが、日付別テーブルのメタデータの取得で時間がかかることが経験的に多いように思います)。dbt-osmosisを実行してカラムのメタデータを付与したいだけなのに、30分毎回待たされるのは非常にストレスです。
dbt run
などのサブコマンドでは--select
オプションで対象を絞ることができますが、dbt docs generate
に対しては効果がありません。
dbt cloudではcatalog.json
などの成果物をAPI経由で取得することができますが、巨大なプロジェクトでdbt docs generate
を実行しようとすると、そもそもdbt cloud側でThis run exceeded your account's run memory limits
と怒られてしまい、そもそも成果物を生成できません。手元でdbt docs generate
を実行すると5GB以上メモリを食っていることがあり、dbt cloudで実行できないのも納得です...。
解決方法: manifest.jsonとINFORMATION_SCHEMA.COLUMNSを使って、最小限のcatalog.jsonを生成
うーーーんと考えたところ、対象になるモデルの情報はmanifest.json
で使いやすいような形になっているので、dbt-osmosisで必要なカラムの情報はINFORMATION_SCHEMA.COLUMNS
で補完してcatalog.json
を生成すればよい、という考えに至りました。
より具体的な作戦は以下の通りです。
- dbt cloudで毎日生成されている最新の
manifest.json
を取得する manifest.json
をparseして、dbt-osmosisの実行に必要なモデルの情報のみを取り出す- モデルの情報を元に、
INFORMATION_SCHEMA.COLUMNS
を使ってカラム名などを取得する - これらの情報を結合し、
catalog.json
を生成する
実際のスクリプトは以下のようになります。
% DBT_CLOUD_API_TOKEN=abcdefg % python scripts/generate_catalog_for_osmosis.py \ --account_id 123 \ --dbt_project_id 456 \ --job_definition_id 789 \ --path models/my_model | \ jq . > my_catalog.json
import json import sys import os from typing import Dict, Any import argparse import requests from google.cloud import bigquery def get_manifest( account_id: int, dbt_project_id: int, job_definition_id: int, api_token: str, ) -> Dict[str, Any]: headers = {"Authorization": f"Bearer {api_token}"} response = requests.get( f"https://cloud.getdbt.com/api/v2/accounts/{account_id}/runs/?project_id={dbt_project_id}&job_definition_id={job_definition_id}&limit=1&order_by=-created_at", headers=headers, ) run_id = json.loads(response.text)["data"][0]["id"] response = requests.get( f"https://cloud.getdbt.com/api/v2/accounts/{account_id}/runs/{run_id}/artifacts/manifest.json", headers=headers, ) return json.loads(response.text) def get_columns_info_from_information_schema( client: bigquery.Client, gcp_project_id: str, dataset: str, table_name: str, ) -> Dict[str, Any]: query = f""" SELECT * FROM `{gcp_project_id}`.{dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = "{table_name}" """ columns = {} for _, item in client.query(query).result().to_dataframe().iterrows(): columns[item["column_name"]] = { "type": item["data_type"], "index": item["ordinal_position"], "comment": None, } return columns def build_metadata(manifest: Dict[str, Any]) -> Dict[str, Any]: return { "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", "dbt_version": manifest["metadata"]["dbt_version"], "generated_at": manifest["metadata"]["generated_at"], "invocation_id": manifest["metadata"]["invocation_id"], "env": {} } def main(): parser = argparse.ArgumentParser() parser.add_argument( "--account_id", type=int, required=True, ) parser.add_argument( "--dbt_project_id", type=int, required=True, ) parser.add_argument( "--job_definition_id", type=int, required=True, ) parser.add_argument( "--path", type=str, required=True, ) args, _ = parser.parse_known_args() api_token = os.environ.get("DBT_CLOUD_API_TOKEN") client = bigquery.Client() manifest = get_manifest(args.account_id, args.dbt_project_id, args.job_definition_id, api_token) result = { "metadata": build_metadata(manifest), "sources": {}, "errors": None } nodes = {} for key, node in manifest['nodes'].items(): if not ("original_file_path" in node and node["original_file_path"].startswith(args.path)): continue print(key, file=sys.stderr) columns = get_columns_info_from_information_schema( client, gcp_project_id = node["database"], dataset = node["schema"], table_name = node["name"], ) nodes[key] = {} nodes[key]["columns"] = columns nodes[key]["metadata"] = { "database": node["database"], "schema": node["schema"], "name": node["name"], "comment": None, } result["nodes"] = nodes print(json.dumps(result)) if __name__ == "__main__": main()
このスクリプトで生成されたカタログファイルをdbt-osmosisに食わせれば、必要最小限の範囲だけカラムdescriptionを伝播させる、ということがさっとできるようになりました。
% dbt-osmosis yaml refactor \ --skip-add-tags \ --skip-merge-meta \ --catalog-file my_catalog.json \ models/my_model