テーブル比較のクエリを型化するスクリプトを作った

前提

  • DWHやデータマートのリファクタリング、あるいは軽微なカラム追加をした際、意図せず差分が起きていないかを確認したい場面は多いでしょう
    • 「一個カラムを追加するために一個JOINを増すだけ」と思ったら、JOIN先のテーブルに重複があってfan-outしていた、ということはありがち
  • 比較には行数比較だけでなく、レコードの中身も比較した上で差分がない、ということを確認したいです
    • 数件しか違わなくても、その数件の違いが重要ということもよくあります
  • テーブル同士の差分比較をするにはEXCEPT DISTINCTを両方向からしてあげる、というのが常套手段です
  • ...ということが分かっている人には以下は無用なスクリプトです
  • チーム開発をしていると、件数のみのチェックや検証のやり方が違っており、認知負荷を下げるためにやり方と統一したい、という場合もあります

テーブル比較のクエリを型化するスクリプトを作った

これまでが前提で、それをやるスクリプトを作りました。

スクリプト(クリックで開きます)

import argparse
import sys

import pandas as pd
from google.cloud import bigquery

pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

DIFF_COUNT_COLUMN_NAME = "diff_count"


def build_cte_query(
    original_table: str,
    comparison_table: str,
    original_table_except_columns: list[str],
    comparison_table_except_columns: list[str],
    original_table_where_clause: str,
    comparison_table_where_clause: str,
) -> str:
    original_except_columns_str = (
        ""
        if len(original_table_except_columns) == 0
        else f"EXCEPT({', '.join(original_table_except_columns)})"
    )
    comparison_except_columns_str = (
        ""
        if len(comparison_table_except_columns) == 0
        else f"EXCEPT({', '.join(comparison_table_except_columns)})"
    )

    original_table_where_clause = (
        f"AND {original_table_where_clause}" if original_table_where_clause else ""
    )
    comparison_table_where_clause = (
        f"AND {comparison_table_where_clause}" if comparison_table_where_clause else ""
    )

    query = f"""
        WITH original_table AS (
            SELECT
                * {original_except_columns_str}
            FROM
                `{original_table}`
            WHERE
                TRUE {original_table_where_clause}
        ), comparison_table AS (
            SELECT
                * {comparison_except_columns_str}
            FROM
                `{comparison_table}`
            WHERE
                TRUE {comparison_table_where_clause}
        )
    """
    return query


def build_count_query(
    original_table: str,
    comparison_table: str,
    original_table_except_columns: list[str],
    comparison_table_except_columns: list[str],
    original_table_where_clause: str,
    comparison_table_where_clause: str,
) -> str:
    cte = build_cte_query(
        original_table=original_table,
        comparison_table=comparison_table,
        original_table_except_columns=original_table_except_columns,
        comparison_table_except_columns=comparison_table_except_columns,
        original_table_where_clause=original_table_where_clause,
        comparison_table_where_clause=comparison_table_where_clause,
    )
    query = f"""
        {cte}

        SELECT
            "{original_table}のみにあるレコード" AS _description, COUNT(*) AS {DIFF_COUNT_COLUMN_NAME},
        FROM
            (SELECT * FROM original_table EXCEPT DISTINCT SELECT * FROM comparison_table)
        UNION ALL
        SELECT
            "{comparison_table}のみにあるレコード" AS _description, COUNT(*) AS {DIFF_COUNT_COLUMN_NAME},
        FROM
            (SELECT * FROM comparison_table EXCEPT DISTINCT SELECT * FROM original_table)
    """
    return query


def build_raw_query(
    original_table: str,
    comparison_table: str,
    original_table_except_columns: list[str],
    comparison_table_except_columns: list[str],
    original_table_where_clause: str,
    comparison_table_where_clause: str,
    limit: int,
) -> str:
    cte = build_cte_query(
        original_table=original_table,
        comparison_table=comparison_table,
        original_table_except_columns=original_table_except_columns,
        comparison_table_except_columns=comparison_table_except_columns,
        original_table_where_clause=original_table_where_clause,
        comparison_table_where_clause=comparison_table_where_clause,
    )
    limit_clause = f"LIMIT {limit}" if limit else ""
    query = f"""
        {cte}

        SELECT
            "{original_table}のみにあるレコード" AS _description, *
        FROM
            (SELECT * FROM original_table EXCEPT DISTINCT SELECT * FROM comparison_table)
        UNION ALL
        SELECT
            "{comparison_table}のみにあるレコード" AS _description, *
        FROM
            (SELECT * FROM comparison_table EXCEPT DISTINCT SELECT * FROM original_table)
        {limit_clause}
    """
    return query


def check_tables_difference_stats(
    client: bigquery.Client,
    original_table: str,
    comparison_table: str,
    original_table_except_columns: list[str],
    comparison_table_except_columns: list[str],
    original_table_where_clause: str,
    comparison_table_where_clause: str,
    show_query: bool,
) -> bool:
    query = build_count_query(
        original_table=original_table,
        comparison_table=comparison_table,
        original_table_except_columns=original_table_except_columns,
        comparison_table_except_columns=comparison_table_except_columns,
        original_table_where_clause=original_table_where_clause,
        comparison_table_where_clause=comparison_table_where_clause,
    )
    if show_query:
        print("```sql")
        print(query)
        print("```\n")

    result = client.query(query=query).result().to_dataframe()
    if result[DIFF_COUNT_COLUMN_NAME].sum() == 0:
        print(f"`{original_table}`と`{comparison_table}`の差分はありません")
        return True

    print(
        f"`{original_table}`と`{comparison_table}`との間に以下の件数の差分がありました\n"
    )
    print("```")
    print(result)
    print("```\n")
    return False


def show_raw_tables_difference(
    client: bigquery.Client,
    original_table: str,
    comparison_table: str,
    original_table_except_columns: list[str],
    comparison_table_except_columns: list[str],
    original_table_where_clause: str,
    comparison_table_where_clause: str,
    show_query: bool,
    limit: int,
) -> tuple[str, str]:
    query = build_raw_query(
        original_table=original_table,
        comparison_table=comparison_table,
        original_table_except_columns=original_table_except_columns,
        comparison_table_except_columns=comparison_table_except_columns,
        original_table_where_clause=original_table_where_clause,
        comparison_table_where_clause=comparison_table_where_clause,
        limit=limit,
    )
    if show_query:
        print("```sql")
        print(query)
        print("```\n")

    result = client.query(query=query).result().to_dataframe()

    if len(result) == 0:
        print(f"`{original_table}`と`{comparison_table}`の差分はありません")
        return

    print(f"`{original_table}`と`{comparison_table}`との間に以下の差分がありました\n")
    print("```csv")
    result.to_csv(sys.stdout, index=False)
    print("```\n")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--original_table", type=str, required=True)
    parser.add_argument("--original_table_except_columns", type=str)
    parser.add_argument("--original_table_where_clause", type=str)

    parser.add_argument("--comparison_table", type=str, required=True)
    parser.add_argument("--comparison_table_except_columns", type=str)
    parser.add_argument("--comparison_table_where_clause", type=str)

    parser.add_argument("--show_query", action="store_true")
    parser.add_argument("--no_show_raw_records", action="store_true")
    parser.add_argument("--max_raw_records", type=int)

    args = parser.parse_args()
    client = bigquery.Client()

    original_table_except_columns = (
        args.original_table_except_columns.split(",")
        if args.original_table_except_columns
        else []
    )
    comparison_table_except_columns = (
        args.comparison_table_except_columns.split(",")
        if args.comparison_table_except_columns
        else []
    )

    exist_no_diff = check_tables_difference_stats(
        client=client,
        original_table=args.original_table,
        comparison_table=args.comparison_table,
        original_table_except_columns=original_table_except_columns,
        comparison_table_except_columns=comparison_table_except_columns,
        original_table_where_clause=args.original_table_where_clause,
        comparison_table_where_clause=args.comparison_table_where_clause,
        show_query=args.show_query,
    )
    if (not exist_no_diff) and (not args.no_show_raw_records):
        print("\n---\n")
        show_raw_tables_difference(
            client=client,
            original_table=args.original_table,
            comparison_table=args.comparison_table,
            original_table_except_columns=original_table_except_columns,
            comparison_table_except_columns=comparison_table_except_columns,
            original_table_where_clause=args.original_table_where_clause,
            comparison_table_where_clause=args.comparison_table_where_clause,
            show_query=args.show_query,
            limit=args.max_raw_records,
        )


if __name__ == "__main__":
    main()

基本的な使い方は比較対象の2つのテーブルを引数に渡してあげるだけです。

% python check_tables_diff.py \
    --original_table my-project.my_dataset.my_mart \
    --comparison_table my-project-dev.my_dataset.my_mart

特に差分がない場合は以下のように簡潔に分かるようにしてます。

`my-project.my_dataset.my_mart`と`my-project-dev.my_dataset.my_mart`の差分はありません

また、差分がある場合には差分があったレコードを表示します。実行結果はGitHubにぺたっと貼ってもらいやすいようにMarkdownで出力してます。レコード表示の部分はcsvでの出力にしているので、もっと見やくしたい場合はスプレッドシートにコピペして見栄えを整えるなどしてください。また、差分が大き過ぎるとBigQueryからの実行結果の取得に時間がかかるため、defaultは最大10000件で制限してます。増やしたり減らしたりしたい場合は--max_raw_recordsで調整してください。

出力結果例(クリックで開きます)

my-project.my_dataset.my_martmy-project-dev.my_dataset.my_martとの間に以下の件数の差分がありました

                                                           _description  \
0  my-project-dev.my_dataset.my_martのみにあるレコード   
1      my-project.my_dataset.my_martのみにあるレコード   

   diff_count  
0        1  
1        1

my-project.my_dataset.my_martmy-project-dev.my_dataset.my_martとの間に以下の差分がありました

_description,col_a,col_b
my-project.my_dataset.my_mart,1,2
my-project-dev.my_dataset.my_mart,1,2

修正前と修正後でカラム数が増減している場合もあるでしょう。その場合は追加/削除したカラムを除外して差分を考えたいケースがあると思います。その場合は--original_table_except_columnsあるいは--comparison_table_except_columnsオプションを与えてあげればEXCEPTで除外します。

% python check_tables_diff.py \
    --original_table my-project.my_dataset.my_mart \
    --original_table_except_columns "additional_col_a,additional_col_b" \
    --comparison_table my-project-dev.my_dataset.my_mart \
    --comparison_table_except_columns "additional_col_c,additional_col_d"

また、比較対象がprodとdevで、devはコストのため直近のデータしか持っていない、という場合もあると思います。その場合はWHERE句で比較対象の条件を絞りたいことがあると思うので、--original_table_where_clause--comparison_table_where_clauseオプションで条件を指定してください。