差分転送するモチベーション
機械学習を使った情報推薦を行なうために、RDSのテーブルをBigQueryに定期転送しています。細かいことは気にしたくなかったので、一日一回の洗い替え(全データ送信で全部上書き)していましたが、もう少し鮮度を上げたくなりました(新しい情報に対して推薦ができないため)。何も考えずに定期転送の頻度を上げると
- 1: 転送のためのCPUコスト
- 2: AWSからGCPへのデータ転送量
が気になってきます。個人の趣味プロジェクトでは、特に2が大きい。先月のAWSの利用料金を見て、涙を流していました...。というわけで、情報の鮮度は上げつつもう少し効率的に定期転送するべく、Embulkでの差分転送をすることにしました。
やり方
差分だけBigQueryに転送する
基本的にはメルカリメソッドそのままです。いつもお世話になっております。
updated_at
のような最終更新日時が分かるようなカラムがあると仮定します。updated_at
が一日前より新しいレコードをBigQueryのdiff_xxx
に流し込んでいきます(ここは洗い替えでいい)。
in: type: postgresql host: my-db.rds.amazonaws.com port: 5432 database: my-database user: user password: password query: | SELECT * FROM example WHERE updated_at > CURRENT_TIMESTAMP - INTERVAL '1 day' out: type: bigquery mode: replace auth_method: json_key json_keyfile: aaa.json dataset: source__db table: diff_example location: asia-northeast1 formatter: type: csv delimiter: "\0" quote: quote_policy: NONE header_line: false source_format: NEWLINE_DELIMITED_JSON
ここでは、一日より高頻度に転送することを仮定しています。一日以上転送が失敗するようなことがあれば、データ欠損が起こることに注意。もっと厳密にやりたければ、Embulkがincremental loadingに対応しているので、それを使うとよいでしょう。前回どこまで転送したかlast_record
で情報を持っているので、それをstateとして転送完了したら例えばS3に書き出して、次に転送するときはS3からlast_record
を読み込んで、という形でやるとより正確に差分転送できます。
updated_at
のようなカラムがテーブルに存在しない場合、状況が難しくなります。user_id
が単調増加するなどであれば、それをWHERE句の条件に使うなどもできますが、そうでない場合は素朴には差分転送できない場合があるかもしれません。新規にupdated_at
のようなカラムを追加することを検討するほうが早いかも...(気軽にALTER TABLE
できないケースはもちろんあると思う)。
差分をマージする
差分がBigQueryに転送できたら、既存のものと結果をマージします。基本的にはUNION ALL
でくっ付けるだけですが、incremental loadingしていない場合はレコードの重複が起きます。そこで重複排除のSQLを書きます。
SELECT * FROM ( SELECT * FROM `my-project.source__db.diff_example` UNION ALL SELECT * FROM `my-project.source__db.example` ) WHERE TRUE QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1
重複排除するには最近BigQueryでサポートされたQUALIFY演算子が便利です。サブクエリを一段減らせるので、可読性が増します。WHERE TRUE QUALIFY
はもうちょいどうにかなって欲しいけど...。
BigQueryに追加されたQUALIFY演算子、便利。レンタルサイクルの各乗り始め駅からのtop3をフィルタとかがサブクエリなしに書ける。https://t.co/xS0b0pNJoO #gcpja pic.twitter.com/uaA8FTJBZR
— Yuta.H (@yutah_3) May 11, 2021
あとはこれまで準備してきたパーツをdigdagでワークフローとしてまとめれば完成です。やってることは
- 1: 差分だけBigQueryに転送
- 2: 差分を既存のものとマージ
- 3: 差分のテーブルは不要なので、削除
で、簡単ですね。
timezone: Asia/Tokyo +sync_diff: sh>: java -jar /bin/embulk run ../embulk/example.yml +merge: sh>: bq query --batch --max_rows 1 --replace --allow_large_results --use_legacy_sql=false --destination_table source__db.example "$(cat ../sql/merge_example.sql)" +delete_diff_table: sh>: bq rm --force source__db.diff_example
これでコストを下げつつ、データ転送の頻度を上げることでBigQueryでより鮮度の高い情報を見ることができるようになりました。よりリアルタイム性を求めるならばCDC(Change Data Capture)するのが最適でしょうが、個人プロジェクトでそこまでの要件は必要ないため、ひとまずこれで満足しておきます。