dbtのモデルとLooker Studioのレポートの依存関係をexposureで表現して、データ管理を効率的に行なおう

シリーズの第三弾です。読者の宿題にしてたけど、誰も書いてくれなさそうだったので結局自分で書きました。

背景

Looker StudioはGoogle Workspaceを使っていれば基本的に無料で使えますし*1、権限管理にGoogle Groupとも連携できるので、人気のBIの一つだと思います。私が初めて触ったBIもLooker Studioだったので、(API強化して欲しいとか不満は山のようにありつつも)何だかんだで憎めないし、さっとダッシュボード作りたいときはLooker Studioを使うことが多いです。会社によっては社内の公式のダッシュボードをLooker Studioで作っているところもあると思います。

dbtで作ったテーブルがConnected Sheetsから参照されている場合、一定程度利用されているスプレッドシートからのテーブルの参照状況はデータ基盤を管理する人間としては把握しておきたいでしょう。参照状況を把握しておくことで、テーブルの変更による混乱を避けるための材料が手に入るからです。

Looker Studioのレポートをdbtのexposureとして取り込む

こういったLooker Studioのレポートからの参照状況はdbtのexposureとして取り込んでおくと便利です。先行してTableauのワークブックやConnected Sheetsで同様のことを社内で行なっていますが「dbt CloudのCIのjobは通る(=参照しているダッシュボードがいない)から、このテーブルを消しても安心だな」といった形で不要なテーブルの削除を進めることができています。

後述するスクリプトを使うと、以下のようなexposureを自動生成できます。

version: 2
exposures:
- name: abcdefg_123456789
  label: 全社だっしゅぼーど
  type: dashboard
  url: https://lookerstudio.google.com/reporting/abcdefg-123456789
  owner:
    name: me@example.com
    email: me@example.com
  depends_on:
  - ref('fact_orders')
  meta:
    total_queries_count: 300
- name: 123456789_abcdefg
...

Looker Studioのレポートのexposureを自動生成するスクリプト(クリックで開きます)

import collections
from decimal import ROUND_HALF_UP, Decimal
from typing import Any, Dict

import yaml
from google.cloud import bigquery

unknown_report_name = "<閲覧権限がなかったため、レポート名が不明>"

report_name_by_report_id = {
    "abcdefg-123456789": "全社だっしゅぼーど",
}

deleted_report_ids = {}


# yamlには大まかクエリ回数のみ記録したいため、最上位の桁で丸める
def round_to_highest_digit(number):
    if number == 0:
        return 0

    length = len(str(abs(number)))
    highest_digit = int(str(number)[0])
    return highest_digit * 10 ** (length - 1)


def get_looker_studio_reports_info(
    client: bigquery.Client,
) -> Dict[str, Any]:
    duration_days = 93  # 3ヶ月
    infomation_schema_jobs = "region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT"
    elementary_dbt_models = "my-project.my_dataset.dbt_models"

    query = f"""
        WITH looker_stuido_audit_logs AS (
            SELECT
                creation_time,
                user_email,
                label,
                label.value AS report_id,
                referenced_table,
            FROM
                `{infomation_schema_jobs}`,
                UNNEST(labels) AS label,
                UNNEST(referenced_tables) AS referenced_table
            WHERE
                creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
                AND label.key = "looker_studio_report_id"
                AND label.value != "" # なぜか空文字の場合がある
        ),
        report_owner AS (
            SELECT
                report_id,
                user_email AS report_owner,
            FROM
                looker_stuido_audit_logs
            QUALIFY ROW_NUMBER() OVER (PARTITION BY report_id ORDER BY creation_time DESC) = 1 # 直近のクエリ発行者をレポートのオーナーと見なす
        ),
        queries_count_by_report_id AS (
            SELECT
                report_id,
                COUNT(*) AS cnt
            FROM
                looker_stuido_audit_logs
            GROUP BY
                report_id
        ),
        referenced_tables_by_report_id AS (
            SELECT
                DISTINCT report_id, referenced_table.table_id
            FROM
                looker_stuido_audit_logs
        )
        SELECT
            queries_count_by_report_id.report_id,
            "https://lookerstudio.google.com/reporting/" || queries_count_by_report_id.report_id AS report_url,
            report_owner.report_owner,
            referenced_tables_by_report_id.table_id,
            queries_count_by_report_id.cnt,
        FROM
            queries_count_by_report_id
        INNER JOIN
            report_owner
        USING
            (report_id)
        INNER JOIN
            referenced_tables_by_report_id
        USING
            (report_id)
        WHERE
            cnt > 5
            # ここは必須ではないが、dbtのモデルに関連するtableのみに絞るため入れている
            AND referenced_tables_by_report_id.table_id IN (SELECT alias FROM `{elementary_dbt_models}`)
        ORDER BY
            queries_count_by_report_id.cnt DESC
    """

    # クエリ回数順にyamlに出力するため、OrderedDictを使用する
    looker_studio_reports_info = collections.OrderedDict()
    for _, item in client.query(query).result().to_dataframe().iterrows():
        looker_studio_reports_info[item["report_id"]] = {
            "report_url": item["report_url"],
            "report_owner": item["report_owner"],
            "referenced_tables": looker_studio_reports_info.get(
                item["report_id"], {}
            ).get("referenced_tables", [])
            + [f"ref('{item['table_id']}')"],
            "queries_count": item["cnt"],
        }
    return looker_studio_reports_info


def main():
    client = bigquery.Client()

    looker_studio_reports_info = get_looker_studio_reports_info(
        client,
    )

    result = {"version": 2, "exposures": []}

    for report_id, item in looker_studio_reports_info.items():
        if report_id in deleted_report_ids:
            continue

        tmp = {
            # レポート名はユニークである必要があるため、report_idを使用する。dbtの制約からアンダーバーに変換する
            "name": report_id.replace("-", "_"),
            "label": report_name_by_report_id.get(report_id, report_id),
            "type": "dashboard",
            "url": item["report_url"],
            "owner": {
                "name": item["report_owner"],
                "email": item["report_owner"],
            },
            "depends_on": sorted(item["referenced_tables"]),
            "meta": {
                # 定期的なyamlの更新により、yamlの修正行が不用意に増えるのを防ぎたいため大まかに丸める
                "total_queries_count": round_to_highest_digit(item["queries_count"]),
            },
        }
        result["exposures"].append(tmp)
    print(yaml.dump(result, allow_unicode=True, sort_keys=False), end="")


if __name__ == "__main__":
    main()

まとめ

Tableau / Connected Sheets / Looker Studioと自社のほぼ全てのBIツールをこれでdbtのexposureに取り込めるようになりました。安全なdbtライフを2024年も過ごしていきましょう!

*1:最近はガバナンス面を強化したLooker Studio Proも出てはいるが