dbtのモデルとConnected Sheetsの依存関係をexposureで表現して、データ管理を効率的に行なおう

以下のConnected Sheets版です。これはかなり便利なものができたと、自画自賛してます。

背景

Tableauと比べると、Connected Sheetsはアドホックな分析で使われることが多いと思います。実際、筆者が所属している会社でも「Tableauは公式のダッシュボード、それ以外のアドホックな分析はConnected Sheetsで行なってね」と案内しています。とはいえ、公式のダッシュボードにするためにはdbtなどを使ってデータソースを信頼できる形に整備する必要などがあり、それを待っていられないからConnected Sheetsで分析を行なう、というケースは当然出てき得ると思います。Connected Sheetsでアドホックで作ったつもりのダッシュボードがいつの間にかたくさんの人に見られて半公式化してしまっているという状況は(望ましくないものの)ありがちな状況だと思います。

dbtで作ったテーブルがConnected Sheetsから参照されている場合、一定程度利用されているスプレッドシートからのテーブルの参照状況はデータ基盤を管理する人間としては把握しておきたいでしょう。参照状況を把握しておくことで、テーブルの変更による混乱を避けるための材料が手に入るからです。

Connected Sheetsをdbtのexposureとして取り込む

こういったConnected Sheetsからの参照状況はdbtのexposureとして取り込んでおくと便利です。先行してTableauのワークブックで同様のことを社内で行なっていますが「dbt CloudのCIのjobは通る(=参照しているTableauのワークブックがいない)から、このテーブルを消しても安心だな」といった形で不要なテーブルの削除を進めることができています。

後述するスクリプトを使うと、以下のようなexposureを自動生成できます。

version: 2
exposures:
- name: 123456789abcde
  label: hogeプロジェクトのダッシュボード
  type: dashboard
  url: https://docs.google.com/spreadsheets/d/123456789abcde
  owner:
    name: me@example.com
    email: me@example.com
  depends_on:
  - ref('fact_order')
  meta:
    total_queries_count: 50000
    queried_weeks_count: 10
- name: 987654321abcde
...

Connected Sheetsのexposureを自動生成するスクリプト(クリックで開きます)

import collections
from decimal import ROUND_HALF_UP, Decimal
from typing import Any, Dict

import yaml
from google.cloud import bigquery

sheet_name_by_sheet_id = {
    "123456789abcde": "hogeプロジェクトのダッシュボード",
}

# yamlには大まかクエリ回数のみ記録したいため、最上位の桁で丸める
def round_to_highest_digit(number):
    if number == 0:
        return 0

    length = len(str(abs(number)))
    highest_digit = int(str(number)[0])
    return highest_digit * 10 ** (length - 1)


def get_connected_sheet_info(
    client: bigquery.Client,
) -> Dict[str, Any]:
    duration_days = 93  # 3ヶ月
    infomation_schema_jobs = "region-us.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION"
    audit_log = "my-project.logging_export_gcp_audit_bigquery.cloudaudit_googleapis_com_data_access"
    elementary_dbt_models = "my-project.my_elementary.dbt_models"

    query = f"""
        WITH job_id_and_sheet_id AS (
            SELECT
                REGEXP_EXTRACT(protopayload_auditlog.resourceName, r'^projects/[\w-]+/jobs/([\w-]+)$') AS job_id,
                JSON_VALUE(protopayload_auditlog.metadataJson, "$.firstPartyAppMetadata.sheetsMetadata.docId") AS sheet_id,
            FROM
                `{audit_log}`
            WHERE
                protopayload_auditlog.serviceName = "bigquery.googleapis.com"
                AND JSON_VALUE(protopayload_auditlog.metadataJson, "$.firstPartyAppMetadata.sheetsMetadata.docId") IS NOT NULL
                AND timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
        ),
        jobs_with_sheet_id AS (
            SELECT
                sheet_id,
                jobs.user_email,
                jobs.referenced_tables,
                jobs.creation_time,                
            FROM
                `{infomation_schema_jobs}` AS jobs
            INNER JOIN
                job_id_and_sheet_id
            USING (job_id)
            WHERE
                creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
            UNION ALL
            # スプレッドシートからパラメータ付きでクエリする場合、BigQuery Scriptingとして実行される
            # BigQuery Scriptingで実行されたクエリはreferenced_tablesを含まれないため、子どものジョブを取得する必要がある
            SELECT
                sheet_id,
                jobs.user_email,
                jobs.referenced_tables,
                jobs.creation_time,                
            FROM
                `{infomation_schema_jobs}` AS jobs
            INNER JOIN
                job_id_and_sheet_id
            ON
                jobs.parent_job_id = job_id_and_sheet_id.job_id
            WHERE
                creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
        ),
        sheet_owner AS (
            SELECT
                sheet_id,
                user_email AS sheet_owner,
            FROM
                jobs_with_sheet_id
            QUALIFY ROW_NUMBER() OVER (PARTITION BY sheet_id ORDER BY creation_time DESC) = 1 # 直近のクエリ発行者をシートのオーナーと見なす
        ),
        queries_count_by_sheet_id AS (
            SELECT
                sheet_id,
                COUNT(*) AS cnt
            FROM
                jobs_with_sheet_id
            GROUP BY
                sheet_id
        ),
        # シートからクエリされている週の数を取得する。一定期間以上クエリされていないシートは除外したいため
        queries_week_count_by_sheet_id AS (
            SELECT
                sheet_id,
                COUNT(DISTINCT(DATE_TRUNC(creation_time, WEEK))) AS weeks_count,
            FROM
                jobs_with_sheet_id
            GROUP BY
                sheet_id
        ),
        referenced_tables_by_sheet_id AS (
        SELECT
            DISTINCT sheet_id, reference_tables.table_id
        FROM
            jobs_with_sheet_id,
            UNNEST(referenced_tables) AS reference_tables
        )
        SELECT
            queries_count_by_sheet_id.sheet_id,
            "https://docs.google.com/spreadsheets/d/" || queries_count_by_sheet_id.sheet_id AS sheet_url,
            sheet_owner.sheet_owner,
            referenced_tables_by_sheet_id.table_id,
            queries_count_by_sheet_id.cnt,
            queries_week_count_by_sheet_id.weeks_count,
        FROM
            queries_count_by_sheet_id
        INNER JOIN
            sheet_owner
        USING (sheet_id)
        INNER JOIN
            referenced_tables_by_sheet_id
        USING (sheet_id)
        INNER JOIN
            queries_week_count_by_sheet_id
        USING (sheet_id)
        WHERE
            cnt > 10
            AND weeks_count >= 4 # 4週間以上クエリされているシートのみを対象とする
            # ここは必須ではないが、dbtのモデルに関連するtableのみに絞るため入れている
            AND referenced_tables_by_sheet_id.table_id IN (SELECT alias FROM `{elementary_dbt_models}`)
        ORDER BY queries_count_by_sheet_id.cnt DESC
    """

    # クエリ回数順にyamlに出力するため、OrderedDictを使用する
    connected_sheet_info = collections.OrderedDict()
    for _, item in client.query(query).result().to_dataframe().iterrows():
        connected_sheet_info[item["sheet_id"]] = {
            "sheet_url": item["sheet_url"],
            "sheet_owner": item["sheet_owner"],
            "referenced_tables": connected_sheet_info.get(item["sheet_id"], {}).get(
                "referenced_tables", []
            )
            + [f"ref('{item['table_id']}')"],
            "queries_count": item["cnt"],
            "weeks_count": item["weeks_count"],
        }
    return connected_sheet_info


def main():
    client = bigquery.Client()

    connected_sheet_info = get_connected_sheet_info(
        client,
    )

    result = {"version": 2, "exposures": []}

    for sheet_id, item in connected_sheet_info.items():
        tmp = {
            # workbookの名前はユニークである必要があるため、sheet_idを使用する。dbtの制約からアンダーバーに変換する
            "name": sheet_id.replace("-", "_"),
            "label": sheet_name_by_sheet_id.get(sheet_id, sheet_id),
            "type": "dashboard",
            "url": item["sheet_url"],
            "owner": {
                "name": item["sheet_owner"],
                "email": item["sheet_owner"],
            },
            "depends_on": item["referenced_tables"],
            "meta": {
                # 定期的なyamlの更新により、yamlの修正行が不用意に増えるのを防ぎたいため大まかに丸める
                "total_queries_count": round_to_highest_digit(item["queries_count"]),
                "queried_weeks_count": int(
                    Decimal(item["weeks_count"]).quantize(Decimal("1e1"), ROUND_HALF_UP)
                ),
            },
        }
        result["exposures"].append(tmp)

    print(yaml.dump(result, allow_unicode=True, sort_keys=False))


if __name__ == "__main__":
    main()

見所

あまり難しいスクリプトではないものの、工夫した点や注意すべき点があるので、メモがてら書き残しておきます。

Connected Sheetsからのクエリか判断する

Connected Sheetsから発行されているクエリはINFORMATION_SCHEMAでは判断することができず、監査ログから特定する形になります。shiozakiさんのエントリが参考になります。

また、Connected Sheetsからクエリが発行されているからといって、該当のスプレッドシートを人間がよく参照しているかどうかは分かりません。もしそういった情報を含め判断したい場合、Google Workspaceのログイベントも参照するとよいでしょう(今回のスクリプトではやっていないです)。

BigQuery Scripting経由で発行されたクエリでもreferenced_tablesの情報を取得する

Connected Sheetsは割と自由にSQLが書けるため、以下のようなBigQuery Scriptingを記述できます。

declare first_date, last_date date;
SELECT
...

ユーザーからするとBigQuery Scriptingは便利な側面がありつつ、INFORMATION_SCHEMA.JOBSreferenced_tablesNULLになってしまうという問題に気付きました。

諦めるか...と思っていたんですが、識者に情報を教えてもらいました。parent_job_idでBigQuery Scriptingから発行されたクエリの情報を取れるので、スクリプト内もその知識を使っています。@takegueさん、ありがとうございます!

クエリ回数やクエリされた週の数をメタデータとして加える

dbtのモデルを変更する際にConnected Sheetsへの影響度合いを見る際、Connected Sheetsがどれくらいクエリを叩いているか、最近できたものなのか割と前から使っているものなのかが分かると利用者へのアプローチも割と決めやすいです。そのため、exposureのメタデータとしてそういった情報を加えるようにしました。

また、exposureはGitHub Actionsなどで自動生成 / 更新したいですが、exactな回数をyamlに記載すると更新の度に大量のdiffが発生するため、回数は適当に丸めるようにしてみています。

まとめ

アドホックな分析で使われることがConnected Sheetsをdbtのexposureとして取り込む方法について書きました。Tableau / Connected Sheetsときたので、Looker Explore版を書くのを読者の宿題とします(自分が書くのはもう飽きた...)。