前提
- DWHやデータマートのリファクタリング、あるいは軽微なカラム追加をした際、意図せず差分が起きていないかを確認したい場面は多いでしょう
- 「一個カラムを追加するために一個JOINを増すだけ」と思ったら、JOIN先のテーブルに重複があってfan-outしていた、ということはありがち
- 比較には行数比較だけでなく、レコードの中身も比較した上で差分がない、ということを確認したいです
- 数件しか違わなくても、その数件の違いが重要ということもよくあります
- テーブル同士の差分比較をするには
EXCEPT DISTINCT
を両方向からしてあげる、というのが常套手段です - ...ということが分かっている人には以下は無用なスクリプトです
- チーム開発をしていると、件数のみのチェックや検証のやり方が違っており、認知負荷を下げるためにやり方と統一したい、という場合もあります
テーブル比較のクエリを型化するスクリプトを作った
これまでが前提で、それをやるスクリプトを作りました。
スクリプト(クリックで開きます)
import argparse import sys import pandas as pd from google.cloud import bigquery pd.set_option("display.max_rows", None) pd.set_option("display.max_columns", None) pd.set_option("display.max_colwidth", None) DIFF_COUNT_COLUMN_NAME = "diff_count" def build_cte_query( original_table: str, comparison_table: str, original_table_except_columns: list[str], comparison_table_except_columns: list[str], original_table_where_clause: str, comparison_table_where_clause: str, ) -> str: original_except_columns_str = ( "" if len(original_table_except_columns) == 0 else f"EXCEPT({', '.join(original_table_except_columns)})" ) comparison_except_columns_str = ( "" if len(comparison_table_except_columns) == 0 else f"EXCEPT({', '.join(comparison_table_except_columns)})" ) original_table_where_clause = ( f"AND {original_table_where_clause}" if original_table_where_clause else "" ) comparison_table_where_clause = ( f"AND {comparison_table_where_clause}" if comparison_table_where_clause else "" ) query = f""" WITH original_table AS ( SELECT * {original_except_columns_str} FROM `{original_table}` WHERE TRUE {original_table_where_clause} ), comparison_table AS ( SELECT * {comparison_except_columns_str} FROM `{comparison_table}` WHERE TRUE {comparison_table_where_clause} ) """ return query def build_count_query( original_table: str, comparison_table: str, original_table_except_columns: list[str], comparison_table_except_columns: list[str], original_table_where_clause: str, comparison_table_where_clause: str, ) -> str: cte = build_cte_query( original_table=original_table, comparison_table=comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=original_table_where_clause, comparison_table_where_clause=comparison_table_where_clause, ) query = f""" {cte} SELECT "{original_table}のみにあるレコード" AS _description, COUNT(*) AS {DIFF_COUNT_COLUMN_NAME}, FROM (SELECT * FROM original_table EXCEPT DISTINCT SELECT * FROM comparison_table) UNION ALL SELECT "{comparison_table}のみにあるレコード" AS _description, COUNT(*) AS {DIFF_COUNT_COLUMN_NAME}, FROM (SELECT * FROM comparison_table EXCEPT DISTINCT SELECT * FROM original_table) """ return query def build_raw_query( original_table: str, comparison_table: str, original_table_except_columns: list[str], comparison_table_except_columns: list[str], original_table_where_clause: str, comparison_table_where_clause: str, limit: int, ) -> str: cte = build_cte_query( original_table=original_table, comparison_table=comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=original_table_where_clause, comparison_table_where_clause=comparison_table_where_clause, ) limit_clause = f"LIMIT {limit}" if limit else "" query = f""" {cte} SELECT "{original_table}のみにあるレコード" AS _description, * FROM (SELECT * FROM original_table EXCEPT DISTINCT SELECT * FROM comparison_table) UNION ALL SELECT "{comparison_table}のみにあるレコード" AS _description, * FROM (SELECT * FROM comparison_table EXCEPT DISTINCT SELECT * FROM original_table) {limit_clause} """ return query def check_tables_difference_stats( client: bigquery.Client, original_table: str, comparison_table: str, original_table_except_columns: list[str], comparison_table_except_columns: list[str], original_table_where_clause: str, comparison_table_where_clause: str, show_query: bool, ) -> bool: query = build_count_query( original_table=original_table, comparison_table=comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=original_table_where_clause, comparison_table_where_clause=comparison_table_where_clause, ) if show_query: print("```sql") print(query) print("```\n") result = client.query(query=query).result().to_dataframe() if result[DIFF_COUNT_COLUMN_NAME].sum() == 0: print(f"`{original_table}`と`{comparison_table}`の差分はありません") return True print( f"`{original_table}`と`{comparison_table}`との間に以下の件数の差分がありました\n" ) print("```") print(result) print("```\n") return False def show_raw_tables_difference( client: bigquery.Client, original_table: str, comparison_table: str, original_table_except_columns: list[str], comparison_table_except_columns: list[str], original_table_where_clause: str, comparison_table_where_clause: str, show_query: bool, limit: int, ) -> tuple[str, str]: query = build_raw_query( original_table=original_table, comparison_table=comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=original_table_where_clause, comparison_table_where_clause=comparison_table_where_clause, limit=limit, ) if show_query: print("```sql") print(query) print("```\n") result = client.query(query=query).result().to_dataframe() if len(result) == 0: print(f"`{original_table}`と`{comparison_table}`の差分はありません") return print(f"`{original_table}`と`{comparison_table}`との間に以下の差分がありました\n") print("```csv") result.to_csv(sys.stdout, index=False) print("```\n") def main(): parser = argparse.ArgumentParser() parser.add_argument("--original_table", type=str, required=True) parser.add_argument("--original_table_except_columns", type=str) parser.add_argument("--original_table_where_clause", type=str) parser.add_argument("--comparison_table", type=str, required=True) parser.add_argument("--comparison_table_except_columns", type=str) parser.add_argument("--comparison_table_where_clause", type=str) parser.add_argument("--show_query", action="store_true") parser.add_argument("--no_show_raw_records", action="store_true") parser.add_argument("--max_raw_records", type=int) args = parser.parse_args() client = bigquery.Client() original_table_except_columns = ( args.original_table_except_columns.split(",") if args.original_table_except_columns else [] ) comparison_table_except_columns = ( args.comparison_table_except_columns.split(",") if args.comparison_table_except_columns else [] ) exist_no_diff = check_tables_difference_stats( client=client, original_table=args.original_table, comparison_table=args.comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=args.original_table_where_clause, comparison_table_where_clause=args.comparison_table_where_clause, show_query=args.show_query, ) if (not exist_no_diff) and (not args.no_show_raw_records): print("\n---\n") show_raw_tables_difference( client=client, original_table=args.original_table, comparison_table=args.comparison_table, original_table_except_columns=original_table_except_columns, comparison_table_except_columns=comparison_table_except_columns, original_table_where_clause=args.original_table_where_clause, comparison_table_where_clause=args.comparison_table_where_clause, show_query=args.show_query, limit=args.max_raw_records, ) if __name__ == "__main__": main()
基本的な使い方は比較対象の2つのテーブルを引数に渡してあげるだけです。
% python check_tables_diff.py \ --original_table my-project.my_dataset.my_mart \ --comparison_table my-project-dev.my_dataset.my_mart
特に差分がない場合は以下のように簡潔に分かるようにしてます。
`my-project.my_dataset.my_mart`と`my-project-dev.my_dataset.my_mart`の差分はありません
また、差分がある場合には差分があったレコードを表示します。実行結果はGitHubにぺたっと貼ってもらいやすいようにMarkdownで出力してます。レコード表示の部分はcsvでの出力にしているので、もっと見やくしたい場合はスプレッドシートにコピペして見栄えを整えるなどしてください。また、差分が大き過ぎるとBigQueryからの実行結果の取得に時間がかかるため、defaultは最大10000件で制限してます。増やしたり減らしたりしたい場合は--max_raw_records
で調整してください。
出力結果例(クリックで開きます)
my-project.my_dataset.my_mart
とmy-project-dev.my_dataset.my_mart
との間に以下の件数の差分がありました
_description \ 0 my-project-dev.my_dataset.my_martのみにあるレコード 1 my-project.my_dataset.my_martのみにあるレコード diff_count 0 1 1 1
my-project.my_dataset.my_mart
とmy-project-dev.my_dataset.my_mart
との間に以下の差分がありました
_description,col_a,col_b my-project.my_dataset.my_mart,1,2 my-project-dev.my_dataset.my_mart,1,2
修正前と修正後でカラム数が増減している場合もあるでしょう。その場合は追加/削除したカラムを除外して差分を考えたいケースがあると思います。その場合は--original_table_except_columns
あるいは--comparison_table_except_columns
オプションを与えてあげればEXCEPT
で除外します。
% python check_tables_diff.py \ --original_table my-project.my_dataset.my_mart \ --original_table_except_columns "additional_col_a,additional_col_b" \ --comparison_table my-project-dev.my_dataset.my_mart \ --comparison_table_except_columns "additional_col_c,additional_col_d"
また、比較対象がprodとdevで、devはコストのため直近のデータしか持っていない、という場合もあると思います。その場合はWHERE
句で比較対象の条件を絞りたいことがあると思うので、--original_table_where_clause
と--comparison_table_where_clause
オプションで条件を指定してください。