dbt-osmosisを必要最小限の箇所だけ動かせるように自前でcatalog.jsonを構成する

TL;DR

  • 巨大なプロジェクトでdbt-osmosisを実行しようとすると時間がかかるが、それを短縮するための--catalog-fileというオプションが存在する
  • catalog.jsondbt docs generateの成果物であるが、巨大なプロジェクトの場合は実行に非常に時間がかかるかOOMで失敗することもある
  • dbt-osmosisを実行したい限られたnodesに対してのみdbt docs generateを適用したいが、範囲を絞るオプションは現在のところ存在しない
  • manifest.jsonINFORMATION_SCHEMA.COLUMNSを使って、最小限のcatalog.jsonを生成するスクリプトを書いた

背景

巨大なプロジェクトに対してのdbt-osmosisとcatalog-fileオプション

dbt-osmosisでカラムのmetadataを付与する際、対象が小さければ特に問題なく終わりますが、対象が大きい場合は実行に時間がかかる or 終わらない場合があります。というのも、対象の全てのモデルに対して、dbt-osmosisがINFORMATION_SCHEMAを実行するからです。

このような巨大なプロジェクトに対してdbt-osmosisを実行する場合、毎回INFORMATION_SCHEMAを実行しなくて済むように--catalog-fileというオプションが用意されています。

catalog.jsonを生成するdbt docs generateの実行時間が長い & 対象を絞れない問題

では、オプションで指定するこのcatalog.jsonは誰が作っているかというと、dbt docs generateです。

このコマンドはdbtのmanifest.json内に登場するモデルの情報を元に(BigQueryなどの)DWHに実際のリソースの情報を問い合せて

  • カラム名の一覧やその型
  • テーブルサイズや行数

などの情報を取得します。大きめのdbtプロジェクトではdbt docs generateが30分かかることなどはあまり珍しくないと思います(dbtのモデル数が多い場合も問題ですが、日付別テーブルのメタデータの取得で時間がかかることが経験的に多いように思います)。dbt-osmosisを実行してカラムのメタデータを付与したいだけなのに、30分毎回待たされるのは非常にストレスです。

dbt runなどのサブコマンドでは--selectオプションで対象を絞ることができますが、dbt docs generateに対しては効果がありません。

dbt cloudではcatalog.jsonなどの成果物をAPI経由で取得することができますが、巨大なプロジェクトでdbt docs generateを実行しようとすると、そもそもdbt cloud側でThis run exceeded your account's run memory limitsと怒られてしまい、そもそも成果物を生成できません。手元でdbt docs generateを実行すると5GB以上メモリを食っていることがあり、dbt cloudで実行できないのも納得です...。

解決方法: manifest.jsonとINFORMATION_SCHEMA.COLUMNSを使って、最小限のcatalog.jsonを生成

うーーーんと考えたところ、対象になるモデルの情報はmanifest.jsonで使いやすいような形になっているので、dbt-osmosisで必要なカラムの情報はINFORMATION_SCHEMA.COLUMNSで補完してcatalog.jsonを生成すればよい、という考えに至りました。

より具体的な作戦は以下の通りです。

  • dbt cloudで毎日生成されている最新のmanifest.jsonを取得する
  • manifest.jsonをparseして、dbt-osmosisの実行に必要なモデルの情報のみを取り出す
  • モデルの情報を元に、INFORMATION_SCHEMA.COLUMNSを使ってカラム名などを取得する
  • これらの情報を結合し、catalog.jsonを生成する

実際のスクリプトは以下のようになります。

% DBT_CLOUD_API_TOKEN=abcdefg
% python scripts/generate_catalog_for_osmosis.py \
  --account_id 123 \
  --dbt_project_id 456 \
  --job_definition_id 789 \
  --path models/my_model | \
  jq . > my_catalog.json
import json
import sys
import os
from typing import Dict, Any
import argparse

import requests
from google.cloud import bigquery


def get_manifest(
    account_id: int,
    dbt_project_id: int,
    job_definition_id: int,
    api_token: str,
) -> Dict[str, Any]: 
    headers = {"Authorization": f"Bearer {api_token}"}
    response = requests.get(
        f"https://cloud.getdbt.com/api/v2/accounts/{account_id}/runs/?project_id={dbt_project_id}&job_definition_id={job_definition_id}&limit=1&order_by=-created_at",
        headers=headers,
    )

    run_id = json.loads(response.text)["data"][0]["id"]
    response = requests.get(
        f"https://cloud.getdbt.com/api/v2/accounts/{account_id}/runs/{run_id}/artifacts/manifest.json",
        headers=headers,
    )
    return json.loads(response.text)


def get_columns_info_from_information_schema(
    client: bigquery.Client,
    gcp_project_id: str,
    dataset: str,
    table_name: str,
) -> Dict[str, Any]:
    query = f"""
        SELECT
        *
        FROM
        `{gcp_project_id}`.{dataset}.INFORMATION_SCHEMA.COLUMNS
        WHERE
        table_name = "{table_name}"
    """

    columns = {}
    for _, item in client.query(query).result().to_dataframe().iterrows():
        columns[item["column_name"]] = {
            "type": item["data_type"],
            "index": item["ordinal_position"],
            "comment": None,
        }
    return columns


def build_metadata(manifest: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json",
        "dbt_version": manifest["metadata"]["dbt_version"],
        "generated_at": manifest["metadata"]["generated_at"],
        "invocation_id": manifest["metadata"]["invocation_id"],
        "env": {}
    }


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--account_id",
        type=int,
        required=True,
    )
    parser.add_argument(
        "--dbt_project_id",
        type=int,
        required=True,
    )
    parser.add_argument(
        "--job_definition_id",
        type=int,
        required=True,
    )
    parser.add_argument(
        "--path",
        type=str,
        required=True,
    )
    args, _ = parser.parse_known_args()

    api_token = os.environ.get("DBT_CLOUD_API_TOKEN")
    client = bigquery.Client()

    manifest = get_manifest(args.account_id, args.dbt_project_id, args.job_definition_id, api_token)
    result = {
        "metadata": build_metadata(manifest),
        "sources": {},
        "errors": None
    }

    nodes = {}
    for key, node in manifest['nodes'].items():
        if not ("original_file_path" in node and node["original_file_path"].startswith(args.path)):
            continue
        print(key, file=sys.stderr)

        columns = get_columns_info_from_information_schema(
            client,
            gcp_project_id = node["database"],
            dataset = node["schema"],
            table_name = node["name"],
        )

        nodes[key] = {}
        nodes[key]["columns"] = columns
        nodes[key]["metadata"] = {
            "database": node["database"],
            "schema": node["schema"],
            "name": node["name"],
            "comment": None,
        }
    result["nodes"] = nodes
    print(json.dumps(result))


if __name__ == "__main__":
    main()

このスクリプトで生成されたカタログファイルをdbt-osmosisに食わせれば、必要最小限の範囲だけカラムdescriptionを伝播させる、ということがさっとできるようになりました。

% dbt-osmosis yaml refactor \
  --skip-add-tags \
  --skip-merge-meta \
  --catalog-file my_catalog.json \
  models/my_model