大量のBigQueryの日付別テーブルから特定の条件でレコードを削除する

この記事はdatatech-jp Advent Calendar 2023の12日目の投稿です。本日は12/18ですが、Advent Calendarの空きがあったのでねじこみました。

背景

データ分析のコンテキストではBigQueryのテーブルに破壊的操作をする際、以下の場合が多いと思います。

  • CREATE OR REPLACE TABLEで洗い替え
  • INSERTで既存テーブルにappend onlyで行の追加
    • 全期間を毎度集計すると大変なので、直近の分析結果を積み上げていくようなタイプ

しかし、データ基盤の運営をしていると特定の条件に基づくレコードを削除しないといけない場合もあります。対象が単独であれば、DELETE文を実行していけばいいですが、対象が大量の日付別テーブルであった場合はちょっと面倒です。なぜならば

  • SELECTでクエリするときに使うワイルドカードでのDELETEは実行できない
    • 一つ一つの日付のテーブルに対してDELETEを実行していく必要があります
  • 日付別テーブルが大量にあると、対象の全てにDELETEを実行するのに時間がかかる場合があります
    • 数年間テーブルが運用されていると、日付別テーブルが1000個を越えていることは珍しくないです
    • 一日のテーブルに含まれているレコード数が多い場合、一個のテーブルにそれなりに時間がかかります
      • SELECT文では数秒で結果が返ってくる場合でも、DELETE文のような破壊的操作をする場合は数分かかることもありえる

日付別テーブルではなく、パーティショニングテーブルになっていれば一回のDELETE文の実行だけで済むので話は簡単ですが

  • 歴史的経緯によって仕方なく日付別テーブルで運用している
  • GA4やfirebase analyticsのイベントは日付別テーブルとして取り込まれ、利用者観点だとそれは受け入れるしかない

などの状況では、大量の日付別テーブルを頑張って特定の条件に合致するものだけを削除しないといけないケースはありえると思います。

困ること: 実行時間が長い

大量の日付別テーブルが対象になる & DELETEの実行は重い、ということで、実行時間が割とバカになりません。データ量や対象の期間にもよりますが、素朴にやると実行時間が数時間や数日かかってしまう、という場合もあると思います。

自分も最初bash内のbqコマンドで素朴に削除を行なおうと思っていましたが、案の定時間はかかるし、xargsの並列実行で解決させようとすると、後述する再実行時のケアなどがやりにくくなるため、断念しました。

脱線: レコードの削除時に考慮したいこと

日付別テーブルの削除に限らずですが、削除のような破壊的操作を行なう際は取り返しがつかないことが多いので*1、事前の準備をしっかりとしておきましょう。考慮しておくべきポイントをいくつか書いてみます。

dry-runモードで何が実行されるかを分かるようにしておく

実際の削除は結構緊張感があるオペレーションになることが多いと思うので、事前にどういうコマンドが実行されるか、削除対象のレコードがどれくらいになるかが分かるとよいでしょう。削除予定のコマンドや削除対象のレコード数が事前に分かれば、レビュアーも安心できます。

後述のコードではそこまでやってないですが、削除対象のレコード数が全体の何%を占めているかなども表示されるとより安心ですね(削除の条件が間違っていて、削除予定の行数が予定より多すぎる、などに気付けるため)。

バックアップ用にテーブルをコピーしておく

何かあって間違っていた場合に元の状態に素早く戻せるようになっているとよいです。削除の実行前にテーブルをバックアップ用のデータセットなどに分割しておくとよいでしょう。

ログはしっかりめに残しておく

大量のテーブルに対してオペレーションする場合、少数の異常系でどういう問題があったか分かるようにしておくことが大切です。どこかのテーブルで何かが起きていてしまった場合、ログがないと途方にくれてしまうことになると思います。ログがないことにより復旧や再実行の手順が面倒になることもあるので、ログはしっかり出すようにしましょう。

削除に必要な最小の権限に絞ったサービスアカウント経由で実行する

コードがミスっていて削除対象外のテーブルを消してしまうと、大惨事になることが多いでしょう。コードレビューはもちろん行なうとして、それでももしミスっていた場合にはそもそも権限的に削除できないようになっているとより安心です。

作業用のサービスアカウントを発行した上で対象のデータセットなどに対してのみ削除が行なえる権限を付与し、impersonateなどを活用して、そのサービスアカウントで削除を実行すれば万が一間違ったコマンドを実行したとしても致命的なことにならないで済みます。

複数回コマンドが実行されても問題ない冪等な設計にしておく

大量のテーブルを対象にしていると、一部のテーブルでBigQuery側でInternal Errorなどが出ることが想定されますし、想定しておくべきです。そうなった場合、コマンドを再度実行することになるので、再実行されても問題ない冪等な設計にしておきましょう。また、再実行は必要な箇所のみできるように例えば削除対象のレコード数が存在しない場合には削除はスキップする、などが考慮できていると再実行もやりやすくなると思います。

復旧用の手順もまとめておく

ここまで書いたことを実行していたとしても想定外のことが起きることはありえるでしょう。バックアップテーブルなどを用意している場合、そのテーブルからどうやって復旧できるかなどを事前に手順書として書いておけると安心です。想定外のことが起きた場合、心理的にも動揺していることが多いですし、通常のメンタルのときにしっかりと手順をまとめておけるとよいです。復旧作業を行なう際は二次被害を防ぐためにも、複数人でのオペレーションを行なうなどができるようにしておくといいですね。

テーブルの利用者に予めレコードの削除が起こることを伝える

BigQueryは基本的に分析用に使われることが多いため、利用者サイドとしてもテーブルは洗い替えもしくは追記のみを想定している場合が多いと思います。削除はあまり行なわれないことが多いため、それ前提で分析を行なわれる場合もあり、削除があることを通知し忘れていると「あれ、分析をやり直したら何か行数が合わないぞ...?」となってしまうこともありえます。監査ログなどを見ながら、テーブルの利用者に対してどういった条件のレコードが削除される、というのを事前に告知できていると関連業務への影響も少なくできるでしょう。

Pythonで並列にDELETE文を実行させる

長々と色々書いてしまいましたが、ここまで書いたことを考慮して、大量のBigQueryの日付別テーブルから特定の条件でレコードを削除するためのスクリプトを書きました。ポイントは前述してますが、concurrent.futuresを使って並列に削除を走らせているあたりがポイント(?)です。

import sys
import logging
import argparse
import traceback
import concurrent
from typing import List
from dataclasses import dataclass


from google.cloud import bigquery
from google.cloud.exceptions import NotFound


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger.addHandler(handler)


@dataclass
class BigQueryTable:
    project_name: str
    dataset_name: str
    table_name: str

    def __str__(self): 
        return f"{self.project_name}.{self.dataset_name}.{self.table_name}"


def backup_bigquery_table(
        client: bigquery.Client,
        source_table: BigQueryTable,
        destination_table: BigQueryTable,
        is_dry_run: bool,
) -> None:
    if is_dry_run:
        logger.info(f"Dry-run mode: planing copy {source_table} to {destination_table}")
        return
    try:
        client.get_table(str(destination_table))
        logger.info(f"Table {destination_table} already exists. Skip copying")
    except NotFound:
        logger.info(f"Table {destination_table} is not found.")
        logger.info(f"Start copying {source_table} to {destination_table}")
        client.copy_table(str(source_table), str(destination_table)).result()
        logger.info(f"Finished copying {source_table} to {destination_table}")


def get_plan_to_be_deleted_records_count(
        client: bigquery.Client,
        bigquery_table: BigQueryTable,
    ) -> int:
    dry_run_sql = f"""
        SELECT
            COUNT(*) AS plan_to_be_deleted_records_count
        FROM
            `{bigquery_table}`
        WHERE
            # 削除の条件を書く
    """
    query_job = client.query(dry_run_sql)
    plan_to_be_deleted_records_count = dict(list(query_job.result())[0])["plan_to_be_deleted_records_count"]
    return plan_to_be_deleted_records_count


def list_event_tables(
        client: bigquery.Client,
        project_name: str,
        dataset_name: str,
    ) -> List[str]:
    event_table_names = []
    for t in client.list_tables(f"{project_name}.{dataset_name}", max_results = 10000):
        if t.table_id.startswith("events_"):
            event_table_names.append(t.table_id)
    return event_table_names 


def delete_records(
        client: bigquery.Client,
        bigquery_table: BigQueryTable,
        is_dry_run: bool,
    ) -> None:
    # DELETEの操作は重いため、対象のレコードがなければ削除をスキップする
    plan_to_deleted_count = get_plan_to_be_deleted_records_count(client, bigquery_table)
    if plan_to_deleted_count == 0:
        logger.info(f"No target records in {bigquery_table}. Skip deleting")
        return

    delete_query = f"""
        DELETE
        FROM
            {bigquery_table}
        WHERE
            # 削除の条件を書く
    """
    if is_dry_run:
        logger.info(f"Dry-run mode: planing to execute following query\n\n{delete_query}")
        logger.info(f"Dry-run mode: planing to delete target records from {bigquery_table}, {plan_to_deleted_count} records will be deleted")
        return

    logger.info(f"Start deleting target records from {bigquery_table}, {plan_to_deleted_count} records will be deleted")
    for item in client.query(delete_query).result():
        logger.info(dict(item))
    logger.info(f"Finished deleting target records from {bigquery_table}")


def backup_and_delete_records(
        client: bigquery.Client,
        delete_target_table: BigQueryTable,
        backup_table: BigQueryTable,
        is_dry_run: bool,
    ) -> None:
    backup_bigquery_table(client, delete_target_table, backup_table, is_dry_run)
    delete_records(client, delete_target_table, is_dry_run)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--no_dry_run', action='store_true') # 引数を明示的に指定しなかった場合は削除せずdry_runだけするようにしたいので
    args = parser.parse_args()

    is_dry_run = not args.no_dry_run

    delete_target_project = "my-project"
    delete_target_dataset = "my-dataset"

    backup_project = "backup-project"
    backup_dataset = "backup-dataset"

    bigquery_client = bigquery.Client()

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = []

        for t in list_event_tables(bigquery_client, delete_target_project, delete_target_dataset):
            delete_target_table = BigQueryTable(delete_target_project, delete_target_dataset, t)
            backup_table = BigQueryTable(backup_project, backup_dataset, t)
            future = executor.submit(backup_and_delete_records, bigquery_client, delete_target_table, backup_table, is_dry_run)
            futures.append(future)

        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logger.warning(f"An exception occurred: {e}")
                logger.warning(traceback.format_exc())


if __name__ == "__main__":
    main()

*1:BigQueryはタイムトラベルで戻る技もありますが、期限付きなのでこれは最後の手段と思ってちゃんと準備しておくのがよいと思います