Cloud Functionsを使って、BigQueryのクエリ結果をSlackに定期的に投稿する

N番煎じネタです。Google Apps Scriptでやる例をよく見る気がするけど、Cloud Functionsを使ってやりたかったのじゃ。

以下のような設定をyamlに書いておくと、クエリ結果をテンプレートに従ってテキストに展開して、定期的にSlackに投稿(cron likeな設定ができる)してくれる。通知を増やしたければ、こういうyamlファイルを真似して書けばいいだけ。

description: テスト用のサンプルです!
webhook_url: https://hooks.slack.com/services/XXX/YYY/ZZZ
cron: "45 9 * * 1" # 月曜の9:45
sql: |-
  SELECT
    name,
    created_at,
  FROM
    `my-project.my_dataset.my_table`
  ORDER BY
    created_at
  LIMIT
    3
template: |-
  こんにちは〜。サンプル情報だよ
  {%- for row in data %}
  - name: {{ row.name }}
  - created_at: {{ row.created_at }}
  {%- endfor %}

利用例

こんな感じで使ってる or 使おうと思ってます。

  • 定時前に昨日のBigQueryのスキャン量やスキャン回数が多いユーザートップ3を通知
    • BigQuery警察
  • N日前とのユーザーエンゲージメントスコアの増減を週1で流す
  • 特定施策のKPI(例: XXXをクリックしたユニークユーザー数)を定例ミーティングの前に流す
  • FAQサイト内でヒット件数が0件だったユーザーが検索したクエリを隔週で流す
    • FAQサイトのカバレッジ改善

Slackに頑張って全部の数字やグラフを流すのではなく、大雑把な数字とその変化だけ流して興味を持ってもらい、あとはData Studioに誘導してリッチなグラフを見てもらう、という感じでやっています。

中身

起動するための手順としてはこんな感じで動かしてます。この辺読めば大体分かると思います。

  • 1: Cloud Schedulerに定期実行スケジュールなどを登録
  • 2: Cloud SchedulerがCloud Pub/Subトピックにメッセージを送信
  • 3: Cloud Pub/SubをトリガーとするCloud Functionsを起動
  • 4: Cloud Functions内でBigQueryのクエリを実行、テンプレートに従ってテキストに変換し、Slackに投稿

Cloud Schedulerに登録

1のCloud Schedulerに登録はこんな感じのシェルスクリプトでやってます(create_or_update_scheduler_jobとか名前を付けておく)。途中で出てくるyqjqのyaml版って感じのやつです。テンプレートやSQLは改行込みで書きたい & jsonで書きたくない(yamlでやりたい)ため、yqを使っています。

#!/bin/bash
set -eu -o pipefail

[[ $# == "0" ]] && exit 1

FILENAME=$1

SCHEDULER_JOBS_COMMAND="create"
SCHEDULER_JOB_NAME="bq_slack_notification_${FILENAME}"

SCHEDULE=$(cat "templates/${FILENAME}".yml | yq -r '.cron')
DESCRIPTION=$(cat "templates/${FILENAME}".yml | yq -r '.description')

if gcloud scheduler jobs describe "${SCHEDULER_JOB_NAME}" > /dev/null; then
  SCHEDULER_JOBS_COMMAND="update"
fi

gcloud scheduler jobs "${SCHEDULER_JOBS_COMMAND}" \
  pubsub "${SCHEDULER_JOB_NAME}" \
  --description "${DESCRIPTION}" \
  --schedule "${SCHEDULE}" \
  --time-zone "Asia/Tokyo" \
  --topic bq_slack_notification_topic \
  --message-body "$(printf '{"filename":"templates/%s.yml"}' "$FILENAME")"

Cloud Pub/Subトピックにmessage-bodyの内容をscheduleに従って投稿してくれる。トピックはgcloud pubsub topics create bq_slack_notification_topicなどで作ります。

余談ですが、Cloud Schedulerはtime-zone指定できるのいいですね。自分でタイムゾーン変換しなくてよい。CloudWatch Eventsも指定できるようにならないかなぁ...。

Cloud FunctionsでBigQueryのクエリ実行結果をSlackに投稿

Pub/Subのeventで設定ファイル名がfilenameで渡されてくるので、それを読んでBigQueryでクエリ叩いて、 Jinjaのテンプレートに従ってテキストに変換してSlackに投稿するだけです。eventはbase64でエンコードされているので、デコードしてから使う。

import requests
import json
import json
import base64
import yaml
import os
import sys
from google.cloud import bigquery
from jinja2 import Template


def bq_slack_notification(event, context):
    dir_path = os.path.dirname(os.path.realpath(__file__))
    event_data = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    with open(dir_path + "/" + event_data["filename"], 'r') as f:
        try:
            yml = yaml.safe_load(f)
        except yaml.YAMLError as exc:
            print(exc)
            sys.exit(1)
    template = Template(yml["template"])

    bq = bigquery.Client('my-project')
    data = bq.query(yml["sql"]).to_dataframe()

    txt = template.render(data=data.itertuples())
    requests.post(yml["webhook_url"], data=json.dumps({'text': txt}))

テンプレートファイルを置いたらCloud {Scheduler, Functions}に反映するラッパーをMakefileに用意しておきます。設定ファイル名がsample.ymlならmake deploy schedule-job-sampleと叩けば反映完了。お手軽。

deploy:
  gcloud functions deploy bq_slack_notification --runtime python37 \
        --trigger-topic bq_slack_notification_topic

schedule-job-%:
  $(eval FILENAME := $(subst -,_, $*))
  ./create_or_update_scheduler_job $(FILENAME)

最終的なディレクトリ構成

こんな感じです。

% tree .
.
├── Makefile
├── create_or_update_scheduler_job
├── main.py
├── requirements.txt
└── templates
    ├── sampleA.yml
    └── sampleB.yml

1 directory, 12 files