最近の砂場活動その24: Embulkでのデータ転送を差分転送にする

差分転送するモチベーション

機械学習を使った情報推薦を行なうために、RDSのテーブルをBigQueryに定期転送しています。細かいことは気にしたくなかったので、一日一回の洗い替え(全データ送信で全部上書き)していましたが、もう少し鮮度を上げたくなりました(新しい情報に対して推薦ができないため)。何も考えずに定期転送の頻度を上げると

  • 1: 転送のためのCPUコスト
  • 2: AWSからGCPへのデータ転送量

が気になってきます。個人の趣味プロジェクトでは、特に2が大きい。先月のAWSの利用料金を見て、涙を流していました...。というわけで、情報の鮮度は上げつつもう少し効率的に定期転送するべく、Embulkでの差分転送をすることにしました。

やり方

差分だけBigQueryに転送する

基本的にはメルカリメソッドそのままです。いつもお世話になっております。

updated_atのような最終更新日時が分かるようなカラムがあると仮定します。updated_atが一日前より新しいレコードをBigQueryのdiff_xxxに流し込んでいきます(ここは洗い替えでいい)。

in:
  type: postgresql
  host: my-db.rds.amazonaws.com
  port: 5432
  database: my-database
  user: user
  password: password
  query: |
    SELECT
      *
    FROM
      example
    WHERE
      updated_at > CURRENT_TIMESTAMP - INTERVAL '1 day'
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: aaa.json
  dataset: source__db
  table: diff_example
  location: asia-northeast1
  formatter:
    type: csv
    delimiter: "\0"
    quote:
    quote_policy: NONE
    header_line: false
  source_format: NEWLINE_DELIMITED_JSON

ここでは、一日より高頻度に転送することを仮定しています。一日以上転送が失敗するようなことがあれば、データ欠損が起こることに注意。もっと厳密にやりたければ、Embulkがincremental loadingに対応しているので、それを使うとよいでしょう。前回どこまで転送したかlast_recordで情報を持っているので、それをstateとして転送完了したら例えばS3に書き出して、次に転送するときはS3からlast_recordを読み込んで、という形でやるとより正確に差分転送できます。

updated_atのようなカラムがテーブルに存在しない場合、状況が難しくなります。user_idが単調増加するなどであれば、それをWHERE句の条件に使うなどもできますが、そうでない場合は素朴には差分転送できない場合があるかもしれません。新規にupdated_atのようなカラムを追加することを検討するほうが早いかも...(気軽にALTER TABLEできないケースはもちろんあると思う)。

差分をマージする

差分がBigQueryに転送できたら、既存のものと結果をマージします。基本的にはUNION ALLでくっ付けるだけですが、incremental loadingしていない場合はレコードの重複が起きます。そこで重複排除のSQLを書きます。

SELECT
  *
FROM (
  SELECT
    *
  FROM
    `my-project.source__db.diff_example`
  UNION ALL
  SELECT
    *
  FROM
    `my-project.source__db.example` )
WHERE
  TRUE QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1

重複排除するには最近BigQueryでサポートされたQUALIFY演算子が便利です。サブクエリを一段減らせるので、可読性が増します。WHERE TRUE QUALIFYはもうちょいどうにかなって欲しいけど...。

あとはこれまで準備してきたパーツをdigdagでワークフローとしてまとめれば完成です。やってることは

  • 1: 差分だけBigQueryに転送
  • 2: 差分を既存のものとマージ
  • 3: 差分のテーブルは不要なので、削除

で、簡単ですね。

timezone: Asia/Tokyo

+sync_diff:
  sh>: java -jar /bin/embulk run ../embulk/example.yml

+merge:
  sh>: bq query --batch --max_rows 1 --replace --allow_large_results --use_legacy_sql=false --destination_table source__db.example "$(cat ../sql/merge_example.sql)"

+delete_diff_table:
  sh>: bq rm --force source__db.diff_example

これでコストを下げつつ、データ転送の頻度を上げることでBigQueryでより鮮度の高い情報を見ることができるようになりました。よりリアルタイム性を求めるならばCDC(Change Data Capture)するのが最適でしょうが、個人プロジェクトでそこまでの要件は必要ないため、ひとまずこれで満足しておきます。