Mackerel Meetup #12で異常検知機能について発表しました

タイトルの通りですが、Mackerel Meetup #12で登壇してきました。

f:id:syou6162:20180802195908j:plain

ユーザーの皆さんからご要望を直接聞けるので、Meetupは開発者としてもとてもありがたい場になっています。参加してくださった皆さま、ありがとうございました。私が発表したスライドはこちらです。

発表時間が20分だったこともあり詳細は大分割愛していますが、異常検知の手法の詳細や異常検知のような機械学習を作る際の社内の体制をどう作っていったかといった話は過去の発表スライドにありますので、ご興味ある方はこちらも是非ご参照ください。

はてな社内でKaggleハッカソンを行ないました(TakingDataリベンジマッチ編)

先週末、はてな社内でKaggleハッカソンを行ないました。丸一日、各自好きなKaggleのコンペに取り組んで、得られた知見を共有するという会です。

自分は以前TalkingDataというコンペに参加していたのですが、データサイズが結構大きく、一月くらいやってみたももの試行錯誤に四苦八苦してしまい、途中で離脱していました...。このハッカソンでは、そういったデータセットでも何とかできるようになろう!ということを目標にして参加しました。もちろん1日だけではさすがに時間が足りないので、ハッカソン前の10日くらいは定時後にちまちま作業をやっていました。

以下はハッカソン終了後に使った発表資料です。Kaggle上位の人にとっては当たり前のことしか書いてないかもしれませんが、社内でこういった知見をじわじわと貯めていくことが大事だと思っています。なお、ハッカソン終了後にAWSのでかいインスタンスを借りてやりなおしたところ、無事目標のTop 10%入り相当のスコアに到達しました!!

モチベーション

TalkingDataのデータセットについて

  • データサイズ
    • 学習用データサイズがかなり大きい(約1.8億件)
    • ちなみにテストセットも1800万件程度ある
  • 特徴量
    • カラム数自体は少ない、文字列ではなくハッシュ化された数値が与えられる
      • ip, app, device, os, channel, click_time
    • かなりスパース(特にip特徴量)。pd.get_dummiesとかやると即死
  • ラベル
    • is_attributed: アプリをダウンロードしたか(二値)
    • 正例が50万件以下、割合では0.02%程度でextremely unbalanced data
  • 評価指標はAUC

敗因分析

  • 主に2点
    • データの巨大さに負けて、特徴量エンジニアリングの工夫がなかなかできなかった
    • クロスバリデーションなしで学習が6時間くらいかかり、パラメーターチューニングも満足にできない
  • 以下の2つについて取り組んだ
    • Negative down sampling
    • Bayesian optimization

Negative down sampling

  • 正例と同じ分量になるだけ正例をサンプリングして学習データにする方法
  • 元々の学習データ全体と比べて1%以下のデータ量になり、試行錯誤がとてもやりやすい
    • 計算時間もメモリ量も
  • 負側のサンプリングは何回も別のセットを作ることができるので、その回数だけ学習器を作ってbagging(例: rank averaging)することで最終的な精度向上も目指すことも
  • TalkingData AdTracking Fraud Detection Challenge (1st place solution)

Bayesian optimization(モチベーション)

  • KaggleをやりだすようになってからLightGBMという決定木ベースのものを使うように
    • 使いやすさ(スケーリングなどが多少不要になる。欠損値もよしなに)や精度面でもよい
  • 一方で、これまで使うことが多かったSVMやロジステック回帰と比べて、ハイパーパラメータが多く、チューニングが大変になった
    • 学習率、葉の数、binの数、samplingのratioなどなど10くらいある
  • ハイパーパラメータで時間を食ってしまうため、試行錯誤の回数を増やせない
  • ベイズ最適化でチューニングの回数を減らせないか

ハイパーパラメータの例

それぞれのパラメータを4-5個の選択肢を用意して真面目にCVすると結構大変。

params = {
    'learning_rate': 0.01,
    'num_leaves': 255,  # 2^max_depth - 1
    'max_depth': 8,  # -1 means no limits
    'min_child_samples': 200,  # Minimum number of data need in a child(min_data_in_leaf)
    'max_bin': 255,  # Number of bucketed bin for feature values
    'subsample': 0.9,  # Subsample ratio of the training instance.
    'subsample_freq': 1,  # frequence of subsample, <=0 means no enable
    'colsample_bytree': 0.5,  # Subsample ratio of columns when constructing each tree.
    'min_child_weight': 0,  # Minimum sum of instance weight(hessian) needed in a child(leaf)
    'scale_pos_weight': 1,
    'reg_lambda': 0.0,  # L2 regularization term on weights
}

脱線: ハイパラチューニングでそこまで精度が劇的に上がることは少ないのでは?

  • それはその通り
  • しかし、チューニング結果を人が全て見れるわけではないケースも社内で増えつつある
    • 例: 異常検知などの再学習
  • 以下を同時に叶えたい
    • チューニングをミスって極端に悪くならないようにしたい
    • 多数のハイパラのグリッドサーチはメモリも計算時間もかかるからはしょりたい
  • Kaggleで多少精度が欲しいときに役に立つ…かも?

Baysian optimization(方法論)

  • ベイズ的最適化(Bayesian Optimization)の入門とその応用
  • 入力をハイパーパラメター、出力をスコア(例: AUCやloglossなど)とする関数回帰の問題と見なす
  • ガウス過程は関数回帰をやってくれる代表選手であり、関数空間の事後分布を計算できる
  • 次にどこハイパーパラメータを試行錯誤するか、事後分布の広がりが一番大きい点を優先的に探す
  • グリッドサーチやランダムサーチと比べて、データにどのような特徴があるを考えてハイパーパラメータの探索を行なうため、学習回数を減らせる
  • 補足: やってること、どこかで見たことありますよね
    • 多椀バンディットのUCBとかと考え方が似ている(し、実際に関係ある)

ハッカソンでの初期状態

  • LB(leaderboard)のprivateスコアが0.9691844
  • 特徴量は基本的なもの
    • ip, app, device, os, channel ,click_time
    • ['ip', 'device', 'os']でグルーピングしたときのappの異なり数など
      • この辺がめっちゃ効くので、工夫したい
  • 分類器はLightGBM。ハイパラチューニングはそんなに頑張ってない
    • が、メモリに乗る学習データがせいぜい8000万件、学習もCVなしで(確か)6時間かかる… 😇

初手

Negative Down Samplingをやってみる

  • 1.8億件の中から負例を正例と同数サンプリング
    • 学習データは約100万件程度まで落ちる
    • 特徴量は変えない
  • LightGBMを使って学習
    • めっちゃ早くなる(パラメータによるけど、1分くらいで終わる)
  • AUCが落ちる 😇 (0.95程度まで落下)

RankAveragingをやってみる

  • もしかしてbagging必須?!と思ってひとまずやってみる
  • rank averagingをやってみる
  • 若干上がるが、0.96に全く届かない…

教訓: aggregation系の特徴量が入っている場合は特徴量生成でサンプリングするとダメ

  • 「この事例と同じip/deviceを持っている事例の数」といったaggregation(sum/cumsum/ratio/{prev,next}_click_time_diffなど)系の操作が入る特徴量を使う場合、サンプリング後のデータフレームに対して特徴量を作っていたのがいけなかった
    • ipのようなsparseな特徴量の場合、サンプリングすると出る/出ないで結果が大きく変わってしまうため
  • ダメな例: 元データ => negative down sampling => 特徴量生成(10分) => 学習(1分)
  • よい例: 元データ => 特徴量生成(1時間) => negative down sampling => 学習(1分)

Negative down samplingのやり方を改善した結果

  • 学習データ100万件のみで元の精度と同じ結果までいけた
  • rank averagingをすることで多少上がった(0.97に乗るくらい)
    • とはいえ、割とすぐにサチる(5〜10くらいで十分)
  • 試行錯誤がとてもやりやすくなった 🎉
    • パラメータ変えたいときにも特徴量生成待たなくてよくなった
    • 特徴量足したいときにも新しく列を追加するだけでよくなった

特徴量生成

  • 特徴量を変えて実験をする度に毎回1時間待っているのはダルい
  • 列毎に結果を吐いて学習時に列と足し込んでいく形式でやると時間をかけずにやれて便利
  • BigQueryでこういった特徴量を作れるとよさそうですね
    • 30分程度でできるらしい。11th solution

Bayesian optimizationやってみる

  • 色々パッケージがあるけど、skoptを使ってみる
  • ベイズ最適化自体の計算は軽い
  • LightGBMのようにハイパーパラメータが多く、人手で結果を見てられない場合はまあまあ有用?
    • 例: dailyで再学習が必要な場合
    • 人手で結果を毎度見れるようなものでは普通のグリッドサーチで十分か
space = [
    Integer(3, 31, name='max_depth'),
    Integer(7, 1023, name='num_leaves'),
    Integer(50, 1000, name='min_child_samples'),
    Real(1, 999, "log-uniform", name='scale_pos_weight'),
    Real(0.6, 0.9, name='subsample'),
    Real(0.4, 0.9, name='colsample_bytree'),
    Real(0.000001, 0.1, "log-uniform", name='learning_rate'),
    Real(0.000001, 0.1, "log-uniform", name='reg_lambda')
]

res = gp_minimize(baysian_optimization_objective, space, n_calls=100)
print(res)
print(res.fun)
print(res.x)

ベイズ最適化所感

  • ベイズ最適化を使っただけで精度が上がった、というのはtalkingdataでは特になかった
  • cross-validationやってられん!という場合には使ってもよいかも
  • 方法論としては深層学習のチューニングにも使える

最終結果

  • LBのprivateスコアが0.9815761(525位/Top 13.2%)
    • あと一歩足りなかった…
  • negative down samplingで高速にiterationを回せるようになった後であれこれ追加した特徴量が一番効いた

その他面白いやつを共有

pickleの代わりにfeatherを使う

  • 特徴量を追加したDataFrameをpickleでファイルに保存したい
  • pickleのread/write時に結構なメモリ量を消費する
    • せっかく特徴量を作ったのにOOMで死んで水の泡 😇
  • featherだともっと省メモリ、高速に読み書きできる
    • 5分くらいかかっていたのが1分くらいで終わる

pandas芸

  • 特定の条件でグルーピングした上で、そのグループ内で次のクリック時間の計算を効率的に
  • SQLのwindow関数とかでも同じようなことができる
train_df['click_time'] = (train_df['click_time'].astype(np.int64, copy=False) // 10 ** 9).astype(np.int32, copy=False)
train_df['next_click'] = (train_df.groupby(['ip', 'app', 'device', 'os']).click_time.shift(-1) - train_df.click_time).astype(np.float32, copy=False)

validationデータの切り方

  • 学習データをシャッフルしてvalidationを作ると、validationでのAUCとtestでのAUCが結構違う
    • 時系列要素が強いデータなので
  • 最後の1日をvalidationにするとtestとのAUCの乖離が小さくなるので安心
  • 手元のvalidationとLBのスコアが乖離しないようにまずやりましょう

参考

前処理大全[データ分析のためのSQL/R/Python実践テクニック]

前処理大全[データ分析のためのSQL/R/Python実践テクニック]

最近の砂場活動その5: AWS Step Functionsで機械学習のワークフローの管理をする

はてなブログのHTTPS配信をやっていた同僚からAWS Step Functionsはいいぞ!というのを教えてもらいました(発表資料)。機械学習のワークフロー管理にもこれは便利そうだなーと思って、自分でも試してみました。やってる内容はN番煎じです...。

機械学習とワークフローの管理

状態を持つワークフローの管理、機械学習でも難しいので悩むところですね。例えば

  • データの取得
  • 前処理
  • 特徴量の生成
  • モデルの学習
  • 検証データに対する精度をトラッキングできるように記録
  • S3等に学習済みのモデルファイルを配置
  • 新しいデータに対して予測を行なう
  • 全てが終わったらslackに通知

などがぱっと上げられますが、ときどきどこかがエラーでこけます。エラーでこけていてもretryして解決するならretryして欲しいし、何度も失敗するようだったらexponential backoffしながらretryして欲しいです。あまりに何度も失敗するようなら系として失敗にして欲しいし、系として失敗したならslackで通知をしたりフローの中のどこでこけたのかが一目瞭然に分かって欲しいものです。この問題の解決方法としてフローのどこの処理を現在行なっていて、どのジョブは前回どういう状態だったか(成功したのか、何回失敗したのかなど)を記録しておくという方法が考えられます。しかし、これを自前でやるのは明らかに面倒です。こういったことを引き受けてくれるマネージドサービスの一つにAWS Step Functionsがあります。

機械学習のワークフローをStep Functionsで管理する

ML Newsで定期的に機械学習関係のワークフローを回しているので、これをStep Functionsで管理します。小さなアプリケーションなのであまり大したことはしていないですが、以下の2つのことをしています。

  • 1: Twitterで話題になっているURLをクローリング(特徴量で必要になる本文等を取得)
  • 2: 学習データから分類器を構築、新規のURLに対して分類*1、推薦リストを更新する

1が終わる前に2が始まっても仕方ないですし、1が何らかの理由で失敗していたら2は開始しないで欲しいです*2。こういった制御をStep Functionsにやらせます。Step Functionsでワークフローの管理をするには、ステートマシーンを書くだけです。見てもらったら分かると思うけど、ステートマシンはこういうやつです。

フローが図として分かるの最高だし、水色の進行中のところを見れば今どこの処理をやっているか分かります。赤の失敗のところを見ればどこでこけたかがログを見なくても一目で分かりますし、失敗したステートをクリックするとそのコンポーネントのエラーログを見ることができます。長大になっているワークフローのどこでこけているかCloudWatch Logsから根性で探すというのは地獄なので、かなり便利であることが分かりますね。

詳細は他の方が書いているエントリを見てくれ!!

参考までに今回ステートマシンを作ったCloudFormationのコードの断片を置いておきます。

ステートマシンを構築するCloudFormation

  MyStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: HelloWorld-StateMachine
      RoleArn: 
        Fn::ImportValue:
          !Sub "${IAMStackName}:StepFunctionsRole"
      DefinitionString: !Sub |
        {
          "StartAt": "AddRecentUrls",
          "States": {
            "AddRecentUrls": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:BatchJobTriggerResource",
              "Next": "WaitXSecondsForAddingRecentUrls"
            },
            "WaitXSecondsForAddingRecentUrls": {
              "Type": "Wait",
              "Seconds": 60,
              "Next": "GetJobStatusOfAddingRecentUrls"
            },
            "GetJobStatusOfAddingRecentUrls": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:PollCheckJobFunction",
              "ResultPath": "$.status",
              "Next": "HasJobCompleteAddingRecentUrls"
            },
            "HasJobCompleteAddingRecentUrls": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.status",
                  "StringEquals": "FAILED",
                  "Next": "JobFailedAddingRecentUrls"
                },
                {
                  "Variable": "$.status",
                  "StringEquals": "SUCCEEDED",
                  "Next": "UpdateRecommendation"
                }
              ],
              "Default": "WaitXSecondsForAddingRecentUrls"
            },
            "JobFailedAddingRecentUrls": {
              "Type": "Fail",
              "Cause": "AWS Batch Job Failed",
              "Error": "DescribeJob returned FAILED"
            },
            "UpdateRecommendation": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:UpdateRecommendationBatchJobTriggerResource",
              "Next": "WaitXSecondsForUpdatingRecommendation"
            },
            "WaitXSecondsForUpdatingRecommendation": {
              "Type": "Wait",
              "Seconds": 60,
              "Next": "GetJobStatusOfUpdatingRecommendation"
            },
            "GetJobStatusOfUpdatingRecommendation": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:PollCheckJobFunction",
              "ResultPath": "$.status",
              "Next": "HasJobCompleteUpdatingRecommendation"
            },
            "HasJobCompleteUpdatingRecommendation": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.status",
                  "StringEquals": "FAILED",
                  "Next": "JobFailedUpdatingRecommendation"
                },
                {
                  "Variable": "$.status",
                  "StringEquals": "SUCCEEDED",
                  "Next": "JobSucceedUpdatingRecommendation"
                }
              ],
              "Default": "WaitXSecondsForUpdatingRecommendation"
            },
            "JobFailedUpdatingRecommendation": {
              "Type": "Fail",
              "Cause": "AWS Batch Job Failed",
              "Error": "DescribeJob returned FAILED"
            },
            "JobSucceedUpdatingRecommendation": {
              "Type": "Succeed"
            }
          }
        }

上記のStep Functionsを定期的に起動するLambdaとCloud Watch Events Rule

  StepFunctionsTrigger:
    Type: AWS::Lambda::Function
    Description: "データ取得→学習および推薦リストの作成をやるStepFunctionsをkickするLambda Function"
    Properties:
      FunctionName: StepFunctionsTrigger
      Role: 
        Fn::ImportValue: !Sub "${IAMStackName}:StepFunctionsRole"
      Handler: index.lambda_handler
      Runtime: python3.6
      MemorySize: 128
      Timeout: 10
      Environment:
        Variables:
          STATE_MACHINE_ARN: !Ref MyStateMachine
      Code:
        ZipFile: |
          import os
          import boto3
          from datetime import datetime as dt
          
          client = boto3.client('stepfunctions')
          def lambda_handler(event, context):
              try:
                  response = client.start_execution(
                      stateMachineArn=os.environ['STATE_MACHINE_ARN'],
                      name=dt.now().strftime('%Y_%m_%d_%H_%M_%S'),
                  )
              except Exception as e:
                  raise e
  StepFunctionsTriggerRule:
    Type: AWS::Events::Rule
    Properties:
      Name: StepFunctionsTriggerRule
      ScheduleExpression: rate(3 hours)
      Targets:
        - Id: StepFunctionsTrigger
          Arn: !GetAtt StepFunctionsTrigger.Arn
      State: "ENABLED"
  StepFunctionsPermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref StepFunctionsTrigger 
      SourceArn: !GetAtt StepFunctionsTriggerRule.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com

AWS Step Functionsのモニタリング

ステートマシンの状態遷移を見ているのは楽しいですが、監視はモニタリングツールに任せたいですね。Mackerelをお使いの方はプラグインを入れてもらうとすぐにモニタリングできます。

実行時間や失敗した数などがメトリックとして取れるので、適宜監視を仕込むと安心です。

f:id:mackerelio:20180420191310p:plain

AWS Step Functionsの類似ツール

ワークフロー管理はAWS Step Functionsが初めて出したわけではなく、同じようなものがすでにいくつかあります。

これらのツールを使ったことがあるわけではないので的を外しているかもしれませんが、AWS Step Functionsを使うと

  • AWS Batch/Lambda/ECSのタスク/EMRなど既存のAWSスタックとの連携がスムーズにできる
  • マネージトサービスなので、ワークフロー監視システム自体の管理をする必要がない

といったところが利点かなと思います。

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

*1:バッチでまとめてやっています

*2:これまではCloudWatch Eventで適当に回していて、これくらい立ったら1が終わってるから2をやればよかろうとやっていたり、1が失敗しても2は問答無用で走る形になっていました...

AWS Batchとそのモニタリング方法について

最近、仕事や趣味でAWS Batchをよく使っています。仕事と趣味のそれぞれの用途は以下の通りです。

AWS Batchの使い方やモニタリング方法が大分こなれてきたので、エントリにまとめておきます。

AWS Batch is ...

フルマネージド型のバッチ処理サービスです。バッチジョブを次々に投下したい際に計算を行なうクラスタの管理を自分で行なうのはしんどいですが、AWS Batchを使うと面倒を見てくれます。必要な時にはインスタンスを増やして素早く終わらせ、不必要になったらインスタンスを落としてくれるので、コストを抑えることができます。AWS Black Beltの資料が分かりやすいと思います。

Black Beltの資料にもありますが、以下の概念を理解しておくとスムーズにいくと思います。

  • コンピューティング環境
    • VPCやサブネット、必要なVCpu数などを管理。リソースが足りなければ追加でEC2インスタンスを立ち上げ、不要になったら落としてくれる
  • ジョブキュー
    • どのコンピューティング環境にどういう優先順位でジョブを投入するかなどが指定できる
    • キューに状態が管理されているので、EC2が立ち上がりまくってAWS破産する心配がない
  • ジョブ定義
    • ECSのタスク定義と似たようなもの。最近、タイムアウトのサポートもされて安心感が増した

コンピューティング環境/ジョブキュー/ジョブ定義をそれぞれCloudFormationで書いた例も載せておきます。

CloudFormationの例

Resources:
  ComputeEnvironment:
    Type: "AWS::Batch::ComputeEnvironment"
    Properties:
      Type: MANAGED
      ServiceRole: !Sub "arn:aws:iam::${AWS::AccountId}:role/service-role/AWSBatchServiceRole"
      ComputeEnvironmentName: go-active-learning
      ComputeResources:
        MinvCpus: 0
        MaxvCpus: 256
        DesiredvCpus: 0
        SecurityGroupIds:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:BatchSecurityGroup"
        Type: EC2
        Subnets:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZa"
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZc"
        InstanceTypes:
          - optimal
        InstanceRole: !Sub "arn:aws:iam::${AWS::AccountId}:instance-profile/ecsInstanceRole"
      State: ENABLED
  UpdateRecommendationJobDefinition:
    Type: "AWS::Batch::JobDefinition"
    Properties:
      Type: container
      ContainerProperties:
        Command:
          - "/bin/sh"
          - "/app/update_recommendation"
        Memory: 2500
        Privileged: false
        Environment:
          - Name: "DB_USER"
            Value: "MY_USERNAME"
          - Name: "DB_PASSWORD"
            Value: "MY_PASSWORD"
        ReadonlyRootFilesystem: false
        Vcpus: 1
        Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/go-active-learning:latest"
      JobDefinitionName: update_recommendation
      RetryStrategy:
        Attempts: 1
    DependsOn: ComputeEnvironment
  JobQueue:
    Type: "AWS::Batch::JobQueue"
    Properties:
      ComputeEnvironmentOrder:
        - ComputeEnvironment: !Ref ComputeEnvironment
          Order: 1
      Priority: 1
      State: ENABLED
      JobQueueName: go-active-learning
    DependsOn: ComputeEnvironment

AWS Batch is not ...

cronではありません。ジョブをkickするAPIがあるので、Lambdaなどから起動したりすることが多いですね。CloudWatch Eventのルールで起動させたり、AWS Step Functionsの中に挟むこともあるので、そういった方法での起動がメインになると思います。

CloudWatch Eventから3時間毎にLambdaを起動して、そのLambaでAWS BatchのジョブをkickするCloudFormationの例を置いておきます。

CloudFormationの例

  UpdateRecommendationBatchJobTrigger:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: UpdateRecommendationBatchJobTriggerResource
      Description: Starts a job of AWS Batch desu
      Role: 
        Fn::ImportValue: !Sub "${IAMStackName}:BatchJobTriggerFunctionRole"
      Handler: index.lambda_handler
      Runtime: python2.7
      MemorySize: 128
      Timeout: 30
      Environment:
        Variables:
          JOB_NAME: update_recommendation
          JOB_QUEUE_NAME: go-active-learning
          JOB_DEFINITION_ARN: !Ref UpdateRecommendationJobDefinition
      Code:
        ZipFile: |
          from __future__ import print_function
          from datetime import datetime as dt
          import json
          import os
          import boto3
          batch = boto3.client('batch')
          def lambda_handler(event, context):
              try:
                  response = batch.submit_job(
                      jobName=os.environ['JOB_NAME'],
                      jobQueue=os.environ['JOB_QUEUE_NAME'],
                      jobDefinition=os.environ['JOB_DEFINITION_ARN'],
                      parameters={
                          'Arg1': dt.now().strftime('%H:%M:%S'),
                          'Arg2': 'from-AWS-Lambda'
                      }
                  )
                  print(response)
                  return response['jobId']
              except Exception as e:
                  print(e)
                  raise e
  UpdateRecommendationBatchJobTriggerRule:
    Type: AWS::Events::Rule
    Properties:
      Name: UpdateRecommendationBatchJobTriggerRule
      ScheduleExpression: rate(3 hours)
      Targets:
        - Id: UpdateRecommendationBatchJobTrigger
          Arn: !GetAtt UpdateRecommendationBatchJobTrigger.Arn
      State: "ENABLED"
  UpdateRecommendationPermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref UpdateRecommendationBatchJobTrigger 
      SourceArn: !GetAtt UpdateRecommendationBatchJobTriggerRule.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com

モニタリング方法

AWS Batchの概念が分かって実際のジョブ投入などができたら、正しく動作できているか継続的にモニタリングしたいですね。モニタリングはMackerelを使うと簡単/便利にできます(注: この記事を書いている人はMackerelの中の人です)。

Job Queue Statusのモニタリング

Job Queueに入っている各ステータスのジョブの個数を監視することができます。aws-batchプラグインが用意されているので、インストールすれば簡単にモニタリングできます。mkrを使うとプラグインを簡単にインストールできます。

このプラグインを使うと、こんなグラフが得られます。

ジョブが捌ける速度より投入される速度が早い場合、QueueのPendingのステータスにいる個数が増えていきます。Mackerelのメトリック監視を入れることでMaxVcpuを増やすなどの判断をすることができますね。

成功/失敗したタスクのモニタリング

Job Queueのステータスのモニタリングはこれまで書いたものでできますが、ジョブがSUCCEEDEDになったかFAILEDになったかはモニタリングすることができません。CloudWatch EventのAWS Batchイベントストリームを使うと状態変化を受け取ることができます。

ひとまずFAILEDになったかを知ることができればいいので、状態変化があったらslackに投稿するようにしています。同僚のid:astjさんが書いてくれたいいやつです。

状態変化をslackに通知するCloudFormationの例

  # AWS Batch ジョブの状態変化を Slack 通知する Lambda Function
  BatchJobStatusToSlack:
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: "BatchJobStatusToSlack"
      Description: "AWS Batch job 状態変化を Slack に通知します"
      Handler: "index.handler"
      Role: !ImportValue "iam:LambdaBasicExecutionRoleARN"
      Runtime: "python3.6"
      Environment:
        Variables:
          MY_ENV: !Ref Environment
          SLACK_WEBHOOK_URL: "https://hooks.slack.com/services/..."
      Code:
        ZipFile: |
          # -*- coding: utf-8 -*-
          import json
          import os
          import urllib.request


          def post_to_slack(payload):
              url = os.getenv('SLACK_WEBHOOK_URL')
              if url is None:
                  raise ValueError('webhook url is not defined')
              json_data = json.dumps(payload).encode('utf-8')
              request = urllib.request.Request(url, data=json_data, method='POST')
              with urllib.request.urlopen(request) as response:
                  return response.read().decode('utf-8')


          def color_mapping(status):
              try:
                  return {
                      'PENDING': 'warning',
                      'STARTING': '#66ccff',
                      'RUNNING': '#aa99ff',
                      'SUCCEEDED': 'good',
                      'FAILED': 'danger',
                  }[status]
              except KeyError:
                  return '#cccccc'


          def handler(event, context):
              account = os.getenv('MY_ENV')
              status = event['detail']['status']
              job_id = event['detail']['jobId']
              params = json.dumps(event['detail']['parameters'])
              url = 'https://%s.console.aws.amazon.com/batch/home#/jobs/queue/%s/job/%s' % (event['region'], event['detail']['jobQueue'].replace('/', '~F'), job_id)
              payload = {
                  'username': 'event_notifier',
                  'attachments': [{
                      'color': color_mapping(status),
                      'fallback': '[%s] JobID %s status changed (%s)' % (status, job_id, account),
                      'title': '[%s] Job status changed (%s)' % (status, account),
                      "title_link": url,
                      "text": '```%s```' % params,
                      'mrkdwn_in': ['text'],
                  }]
              }
              print(post_to_slack(payload))
      Timeout: "10"
      TracingConfig:
        Mode: "PassThrough"
  RulesBatchJobStatusToSlack:
    Type: "AWS::Events::Rule"
    Properties:
      State: "ENABLED"
      EventPattern:
        source:
          - "aws.batch"
        detail-type:
          - "Batch Job State Change"
        detail:
          jobQueue:
            - !Ref MyJobQueue
          status:
            - "FAILED"
      Targets:
        -
          Arn: !GetAtt [BatchJobStatusToSlack, Arn]
          Id: "ToSlackLambda"
  LambdaInvokePermission:
    Type: "AWS::Lambda::Permission"
    Properties:
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn: !GetAtt [RulesBatchJobStatusToSlack, Arn]
      FunctionName: !GetAtt [BatchJobStatusToSlack, Arn]

ECSのモニタリング

AWS Batchの裏側ではECSが起動しています。aws-ecsプラグインを使うことで、クラスタのリソース(CPUとMemory)の使用状況をモニタリングすることができます。

使用状況を継続的にモニタリングすることで、EC2のスペックを上げ下げする判断材料になります。グラフの例はこんな感じです。3時間毎に10分かからず終わるジョブなので、ぴょんぴょんしたかわいいグラフになっています。

困ること

真面目に使うとこの辺が困ってきますが、自前でやるよりは大分便利ですね...。

  • テストでAWS Batchのmockをやってくれるツールがない
    • 例えばdynamodbだとツールがある
  • コンソール画面がまだこなれていない
    • 例えばジョブの完了日時でソートができない。大量にジョブが並んでいるときに目的のものを探すのに時間がかかる
  • コンソール画面からは設定できるが、CloudFormationからは設定できない項目がある
    • 例えばタイムアウト設定など

参考

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

pg_activityで刺さっているクエリを殺す

サービス運用しているとクエリが刺さってパフォーマンスが悪くなってしまっているとき、ありますよね。根本対応は別途やるとして、今はひとまずこの刺さっているクエリを殺して凌ぎたい。PostgreSQLではpg_stat_activityというテーブルに現在実行中の情報が入っています。

SQLで殺してもいいですが、同僚に教えてもらったpg_activityというツールが便利だったのでメモしておきます。

cliのツールですが、tigなどと同じような形で現在実行中の中から殺したいクエリを選んで、というのを視覚的に行なえるので便利です。

インストール

pg_activityはPythonで書かれているツールなので、pipでインストールします。ansibleでインストールしているので、以下のように書きました。amazon linux2上での動作を確認していますが、他でも似たようにやれば動くと思います。

---
- tasks:
    - name: "yum install"
      yum: name={{item}} state=installed
      with_items:
        - gcc
        - postgresql
        - postgresql-devel
        - python3
        - python3-pip
        - python3-devel
      become: true
    - name : install pg_activity
      shell: pip3 install pg_activity
      become: true

pg_activityを動かしてみる

こんな感じで動かせます。

% pg_activity --username=my_user_name --host=my_rds_host --dbname=my_db_name --rds

殺したいクエリがいたら、それを選択してkを押すと殺せます。

試してみる

意図的にですが、クエリを殺す例をやってみます。ターミナルAでトランザクションを張って、テーブルをロックしてみます。

go-active-learning=> BEGIN;
BEGIN
go-active-learning=> LOCK TABLE example IN EXCLUSIVE MODE;
LOCK TABLE

別のターミナルBで更新系のクエリを発行してみます。そうするとまあ当然ブロックされる。

go-active-learning=> BEGIN;
BEGIN
go-active-learning=> UPDATE example SET label = 1 where id = 8508;

この状態でpg_activityを見にいくとこんな状態になっていることが分かります。

この状態でUPDATE ...を選択してkを押すとこのクエリを殺せます。ターミナルBでは無慈悲なメッセージが流れていて、殺せていることが分かります。

go-active-learning=> UPDATE example SET label = 1 where id = 8508;
FATAL:  terminating connection due to administrator command
LINE 1: UPDATE example SET label = 1 where id = 8508;
               ^
SSL connection has been closed unexpectedly
The connection to the server was lost. Attempting reset: Succeeded.

クエリが刺さらないのが一番ですが、もし刺さってしまったときにどう対処すればいいか押さえておくと安心できますね。

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

最近の砂場活動その4: CloudWatch Logs/Kinesis Firehose/S3/Athenaを利用してログ分析をやってみる

Fargateを使うようになり、EC2サーバーを管理しなくなってよくなりました(東京リージョン、7月ですね!)。一方、これまでサーバーにログとして吐かれていたファイルの閲覧や集計に困ることも増えてきました(sshでログインして、ログファイルをgrepなどができない)。AWSのサービスを組み合わせることでこれが解決できないか、ということをやってみました。個人運用のWebサービスにはオーバーキルですが、仕事で使うことも見越して...という感じです。素人が試行錯誤しながらやっているので、もっとよいやり方があると思います。

やりたいこと

一言でいうとNginxのアクセスログをいい感じで集計できるように、というのがやりたいことです。

FargateでWebアプリケーションサーバーとNginxを同一のタスクで動かしています。Nginxは静的ファイルへの振り分けとログ取りが主な目的です。Nginxのログをファイルに出力していると、タスクが終了した際にログが揮発してしまうので、標準出力に吐くようにします。そうすると、CloudWatch Logsに吐かれます。これをS3まで持っていければ、AthenaでSQLを書いたり、その結果をRedashで可視化することができます。今回はS3にログを運ぶやり方を探っていきます。

ログはltsv形式で吐いています。こんな感じ。

Nginxの設定ファイル

log_format tsv_go_active_learning "time:$time_local"
                                  "\thost:$remote_addr"
                                  "\tvhost:$host"
                                  "\tforwardedfor:$http_x_forwarded_for"
                                  "\tmethod:$request_method"
                                  "\turi:$request_uri"
                                  "\tprotocol:$server_protocol"
                                  "\tstatus:$status"
                                  "\tsize:$body_bytes_sent"
                                  "\tua:$http_user_agent"
                                  "\tapptime:$upstream_response_time"
                                  "\treqtime:$request_time"
                                  "\tupstream:$upstream_addr"
                                  "\tupstream_status:$upstream_status"
                                  "\thostname:$hostname"
                                  ;

upstream local-backend {
    server localhost:7778;
}

server {
    listen 7777 default_server;
    proxy_connect_timeout 10;
    proxy_read_timeout    10;
    proxy_send_timeout    10;
    access_log /dev/stdout tsv_go_active_learning;

    location / {
        root /www/app;
        proxy_set_header Host $http_host;
    }
    location /api {
        proxy_pass http://local-backend;
    }
}

CloudWatch LogsからS3までログを運ぶ手段

お手軽な方法としては、Lambdaを使って定期的にS3にエクスポートするのが常套手段のようです。

Lambdaを書けばいいのはお手軽ですが、リアルタイムに集計したい場合や一日のログが大きすぎてLambdaの制限時間の5分で終わらない!というの場合には困りそうですね。ある程度のログが溜まったら、それをトリガーにしてS3にエクスポートして欲しいです。調べてみると、こういう用途はどうやらKinesis Firehoseが得意なようです。CloudWatch Logsにはサブスクリプションという機能があり、これを使うとKinesis Firehoseにデータを流すことができます。

Kinesis Firehose、最初は何やってくれるのかさっぱり分からないですが、やってくれることとしては

  • リアルタイムにじゃんじゃか流れてくるデータを受け止める土管
  • [optional] 流れてきたデータをLambdaで変換できる
  • 一定量、もしくは一定時間経過したらAWSのデータストアに横流ししてくれる
    • 流す先はS3/Redshift/Amazon Elasticsearch Serviceなどの分析基盤となるサービスが中心
    • 流す処理は自分でコードを書く必要がない、firehoseが面倒を見てくれる

が挙げられます。これまでのよくあるパターンとしては、fluentdで集約(aggregate)してS3などに流すというのがありましたが、集約サーバーの運用に手間がかかるというのがありました。Kinesis Firehoseを使うと、この手間をmanagedサービスが受け持ってくれるので手間が減らせるようです。

脱線: Kinesis Data Streams

もっと汎用的な土管としてはKinesis Streamsがあります。Firehoseは出力先(コンシューマーというらしい)がS3/Redshift/Amazon Elasticsearch Serviceなどに限定されていましたが、Streamsでは自前のプログラムから触ることができ、自由度が高くなります。仕事で作っているMackerelでもKinesis Streamsはお世話になっていて、mackerel-agentから投稿されるデータを引き受けてくれています。Kinesis Streamsで受け取ったデータは、Lambdaがコンシューマーとなり、RedisやDynamoDbにデータを書き込んでいます。

Kinesis Streamsは汎用的に使える一方で

  • シャード数は自分で管理する必要がある
  • コンシューマーのコードは自分で書く必要がある

といった手間が増えます。コンシューマーの書き込み先がS3/Redshift/Amazon Elasticsearch Serviceなどであれば、これらの手間を肩代わりしてくれるKinesis Firehoseを使うとよいようです。

Kinesis FirehoseとCloudWatch LogsのサブスクリプションでログをS3に転送する

はい、あとはやるだけですね。CloudFormationで構成管理しているので、その断片を貼っておきます。やっていることは大まかにはこんな感じです。

  • CloudWatch Logsに流れてくるデータをsubscribeしておく
    • subscribe先はDeliveryStream
  • CloudWatch Logsに前回から50MBs以上データが溜まるか60秒以上経過していたらエクスポートを開始
  • エクスポートする際に変換をLambdaでやる
    • subscribeするとjson形式になるので、元のltsv形式に戻すだけの変換です
    • AWSが用意してくれているblueprintがあるので、それをちょっと変えればいいだけ
    • こういったLambdaの構成管理にはSAMが便利でした

Firehoseの構成管理

  DeliveryStream:
    Type: 'AWS::KinesisFirehose::DeliveryStream'
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Join
          - ''
          - - 'arn:aws:s3:::'
            - Fn::ImportValue:
                !Sub "${S3StackName}:NginxLogsS3Bucket"
        BufferingHints:
          IntervalInSeconds: '60'
          SizeInMBs: '50'
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN:
          Fn::ImportValue: !Sub "${IAMStackName}:DeliveryRole"
        ProcessingConfiguration:
          Enabled: 'true'
          Processors:
            - Parameters:
                - ParameterName: LambdaArn
                  ParameterValue:
                    Fn::ImportValue: !Sub "${SamStackName}:LambdaExtractMessageFromCloudWatch"
              Type: Lambda
  SubscriptionFilter:
    Type: "AWS::Logs::SubscriptionFilter"
    Properties:
      RoleArn:
        Fn::ImportValue:
          !Sub "${IAMStackName}:CloudWatchIAMRole"
      LogGroupName:
        !Ref NginxLogGroup
      FilterPattern: ""
      DestinationArn:
        Fn::GetAtt:
          - DeliveryStream
          - Arn

1分くらい待っているとS3のオブジェクトができあがってきます。日付が関連付けられています。

S3のファイルをAthenaで分析

S3にデータが集まれば、あとはAthenaでなんとでもなります。例えば処理時間がかかっているuriを列挙するのは、こんな感じです。最初にテーブルを作る。フルスキャンが走らないようにパーティションを切りましょう。

テーブル定義と分析のSQL

CREATE EXTERNAL TABLE IF NOT EXISTS go_active_learning_nginx.go_active_learning_nginx (
  `time` date,
  `host` string,
  `vhost` string,
  `forwardedfor` string,
  `method` string,
  `uri` string,
  `protocol` string,
  `status` int,
  `size` int,
  `ua` string,
  `apptime` double,
  `reqtime` double,
  `upstream` string,
  `upstream_status` int,
  `hostname` string
) PARTITIONED BY (
  year string,
  month string,
  day string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://my-bucket/firehose/'
TBLPROPERTIES ('has_encrypted_data'='false');

ALTER TABLE go_active_learning_nginx ADD PARTITION (year='2018', month='06', day='09') location 's3://my-bucket/firehose/2018/06/09/';
SELECT AVG(reqtime) as t, uri FROM go_active_learning_nginx WHERE year='2018' GROUP BY uri ORDER BY t DESC LIMIT 100;

こんな感じでブラウザで実行できます。

このままAthenaでやってもいいし、Redashで可視化してダッシュボードにまとめてもよいです。今回はS3に出力しましたが、RedshiftやAmazon Elasticsearch Serviceに出力して分析やってみたいですね(お財布事情が...)。

まとめ

大量にくるストリームデータをリアルタイムに分析したい場合、Kinesis Firehoseを間に挟むと便利ということが分かりました。自分のサイトはアクセスが雀の涙のような規模ですが、仕事で扱うようなデータ規模だと集約サーバーがmanagedになっているありがたさが出てきそうですね。

S3を中心として(?)、各サービスで連携を取りながら使えるようになっているAWSすごいなぁ。

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

Go言語でWebアプリを書くときにオートリロードどうするといいの問題

Go言語を書く際、成果物がシングルバイナリになるのは便利です。deployするときや他人に使ってもらうときに、それだけ渡せば使ってもらえるので。cliツールやapiサーバーを書くときにはこの方式で困っていなかったのですが、いわゆるWebアプリをGo言語で書くときのベストプラックティスが分からなかったのでエントリにしておきます。

前提

  • Go言語側は重厚なフレームワークは特に使わない
    • net/httphtml/templateといった標準ライブラリを使う
  • フロント側はVue.js

シングルバイナリを作るまでの過程

以下の過程をMakefileに書いてmake buildとやってシングルバイナリを作っていました。

  • webpackでJavaScript関係をbundle.jsという感じで一つのファイルにまとめる
  • go-assets-builderを使って、index.htmlbundle.jsを一つのasset(asset.goみたいなのができる)にまとめる
  • 全てgoのファイルになったので、go buildでシングルバイナリに固める

シングルバイナリがすぐできるならば問題ないですが、自然言語処理を内部で使っているアプリケーションのため、成果物が70Mb以上のサイズになります(kagomeを使わせてもらっています)。htmlやjsをちょっと変えて確認したいだけなのに、make buildで20秒くらいかかるのはなかなかストレスですね。なんとかしたい。ちなみにファイルに変更があったら自動でmake buildするには、reflexなどのファイル変更を監視する系のツールを使うといいというのを教えてもらいました。

解決案1: ローカルと本番で処理を分ける

ローカルではindex.htmlbundle.jsを呼ばれる度に静的ファイルを読み込むということをすれば、テンプレートが書き換えられる度にシングルバイナリを作りなおす必要はなくなります。本番環境ではさきほどまでに述べた方法で全部goのファイルにしてシングルバイナリを作ります。

問題自体は解決しますが、localと本番で処理を分岐するのは考えることも増えるし、コードも増えるのでちょっとイマイチですね...。

解決案2: 静的ファイルはNginxで返すようにして、アプリケーションサーバーはAPIサーバーの役割に徹する

自分はこれを選択しました。SPAになっているので、index.htmlbundle.jsはNginxで返します。Vue.jsから叩かれるエンドポイントはgoがAPIサーバーっぽく処理させます。Nginxの設定ファイルだとこんな感じ。

upstream local-backend {
    server localhost:7778; # goのアプリケーションサーバーです
}

server {
    listen 7777 default_server;
    proxy_connect_timeout 600;
    proxy_read_timeout    600;
    proxy_send_timeout    600;

    location / {
        root /www/app;
    }
    location /api {
        proxy_pass http://local-backend;
    }
}

手元でNginxを立てるのは面倒なので、その場合はwebpackの機能を使います。webpack.config.jsに以下のように書くと、特定のpath以下だとapiサーバーにリクエストをproxyしてくれます。webpack便利やん。

module.exports = {
  entry: './src/js/app',
  output: {
    path: __dirname + '/static',
    filename: 'bundle.js'
  },
  ...,
  devServer: {
    host: 'localhost',
    port: 7777,
    contentBase: __dirname + '/static',
    proxy: {
      '/api': {
        target: 'http://localhost:7778'
      }
    }
  }
};

これでhtmlやjsを書き換えただけなのにシングルバイナリ作りが始まってなかなか修正された様子が見れない、といったことがなくなりました。やったー。全体の様子はこの辺です。Webサイト自体はこちらです。

Goの変数でテンプレートを出し分けなどをやりたい場合、素のhtmlでない場合が多いのでこのパターンは使えないというのが悩ましいですね。

まとめ

Go言語でWebアプリケーションを書くときにオートリロードをするときに困ったこと、自分なりの解決策を書いてみました。もっといい方法がある!!という方は是非教えてください。

みんなのGo言語[現場で使える実践テクニック]

みんなのGo言語[現場で使える実践テクニック]

  • 作者: 松木雅幸,mattn,藤原俊一郎,中島大一,牧大輔,鈴木健太
  • 出版社/メーカー: 技術評論社
  • 発売日: 2016/09/09
  • メディア: Kindle版
  • この商品を含むブログを見る