最近の砂場活動その5: AWS Step Functionsで機械学習のワークフローの管理をする

はてなブログのHTTPS配信をやっていた同僚からAWS Step Functionsはいいぞ!というのを教えてもらいました(発表資料)。機械学習のワークフロー管理にもこれは便利そうだなーと思って、自分でも試してみました。やってる内容はN番煎じです...。

機械学習とワークフローの管理

状態を持つワークフローの管理、機械学習でも難しいので悩むところですね。例えば

  • データの取得
  • 前処理
  • 特徴量の生成
  • モデルの学習
  • 検証データに対する精度をトラッキングできるように記録
  • S3等に学習済みのモデルファイルを配置
  • 新しいデータに対して予測を行なう
  • 全てが終わったらslackに通知

などがぱっと上げられますが、ときどきどこかがエラーでこけます。エラーでこけていてもretryして解決するならretryして欲しいし、何度も失敗するようだったらexponential backoffしながらretryして欲しいです。あまりに何度も失敗するようなら系として失敗にして欲しいし、系として失敗したならslackで通知をしたりフローの中のどこでこけたのかが一目瞭然に分かって欲しいものです。この問題の解決方法としてフローのどこの処理を現在行なっていて、どのジョブは前回どういう状態だったか(成功したのか、何回失敗したのかなど)を記録しておくという方法が考えられます。しかし、これを自前でやるのは明らかに面倒です。こういったことを引き受けてくれるマネージドサービスの一つにAWS Step Functionsがあります。

機械学習のワークフローをStep Functionsで管理する

ML Newsで定期的に機械学習関係のワークフローを回しているので、これをStep Functionsで管理します。小さなアプリケーションなのであまり大したことはしていないですが、以下の2つのことをしています。

  • 1: Twitterで話題になっているURLをクローリング(特徴量で必要になる本文等を取得)
  • 2: 学習データから分類器を構築、新規のURLに対して分類*1、推薦リストを更新する

1が終わる前に2が始まっても仕方ないですし、1が何らかの理由で失敗していたら2は開始しないで欲しいです*2。こういった制御をStep Functionsにやらせます。Step Functionsでワークフローの管理をするには、ステートマシーンを書くだけです。見てもらったら分かると思うけど、ステートマシンはこういうやつです。

フローが図として分かるの最高だし、水色の進行中のところを見れば今どこの処理をやっているか分かります。赤の失敗のところを見ればどこでこけたかがログを見なくても一目で分かりますし、失敗したステートをクリックするとそのコンポーネントのエラーログを見ることができます。長大になっているワークフローのどこでこけているかCloudWatch Logsから根性で探すというのは地獄なので、かなり便利であることが分かりますね。

詳細は他の方が書いているエントリを見てくれ!!

参考までに今回ステートマシンを作ったCloudFormationのコードの断片を置いておきます。

ステートマシンを構築するCloudFormation

  MyStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: HelloWorld-StateMachine
      RoleArn: 
        Fn::ImportValue:
          !Sub "${IAMStackName}:StepFunctionsRole"
      DefinitionString: !Sub |
        {
          "StartAt": "AddRecentUrls",
          "States": {
            "AddRecentUrls": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:BatchJobTriggerResource",
              "Next": "WaitXSecondsForAddingRecentUrls"
            },
            "WaitXSecondsForAddingRecentUrls": {
              "Type": "Wait",
              "Seconds": 60,
              "Next": "GetJobStatusOfAddingRecentUrls"
            },
            "GetJobStatusOfAddingRecentUrls": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:PollCheckJobFunction",
              "ResultPath": "$.status",
              "Next": "HasJobCompleteAddingRecentUrls"
            },
            "HasJobCompleteAddingRecentUrls": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.status",
                  "StringEquals": "FAILED",
                  "Next": "JobFailedAddingRecentUrls"
                },
                {
                  "Variable": "$.status",
                  "StringEquals": "SUCCEEDED",
                  "Next": "UpdateRecommendation"
                }
              ],
              "Default": "WaitXSecondsForAddingRecentUrls"
            },
            "JobFailedAddingRecentUrls": {
              "Type": "Fail",
              "Cause": "AWS Batch Job Failed",
              "Error": "DescribeJob returned FAILED"
            },
            "UpdateRecommendation": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:UpdateRecommendationBatchJobTriggerResource",
              "Next": "WaitXSecondsForUpdatingRecommendation"
            },
            "WaitXSecondsForUpdatingRecommendation": {
              "Type": "Wait",
              "Seconds": 60,
              "Next": "GetJobStatusOfUpdatingRecommendation"
            },
            "GetJobStatusOfUpdatingRecommendation": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:PollCheckJobFunction",
              "ResultPath": "$.status",
              "Next": "HasJobCompleteUpdatingRecommendation"
            },
            "HasJobCompleteUpdatingRecommendation": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.status",
                  "StringEquals": "FAILED",
                  "Next": "JobFailedUpdatingRecommendation"
                },
                {
                  "Variable": "$.status",
                  "StringEquals": "SUCCEEDED",
                  "Next": "JobSucceedUpdatingRecommendation"
                }
              ],
              "Default": "WaitXSecondsForUpdatingRecommendation"
            },
            "JobFailedUpdatingRecommendation": {
              "Type": "Fail",
              "Cause": "AWS Batch Job Failed",
              "Error": "DescribeJob returned FAILED"
            },
            "JobSucceedUpdatingRecommendation": {
              "Type": "Succeed"
            }
          }
        }

上記のStep Functionsを定期的に起動するLambdaとCloud Watch Events Rule

  StepFunctionsTrigger:
    Type: AWS::Lambda::Function
    Description: "データ取得→学習および推薦リストの作成をやるStepFunctionsをkickするLambda Function"
    Properties:
      FunctionName: StepFunctionsTrigger
      Role: 
        Fn::ImportValue: !Sub "${IAMStackName}:StepFunctionsRole"
      Handler: index.lambda_handler
      Runtime: python3.6
      MemorySize: 128
      Timeout: 10
      Environment:
        Variables:
          STATE_MACHINE_ARN: !Ref MyStateMachine
      Code:
        ZipFile: |
          import os
          import boto3
          from datetime import datetime as dt
          
          client = boto3.client('stepfunctions')
          def lambda_handler(event, context):
              try:
                  response = client.start_execution(
                      stateMachineArn=os.environ['STATE_MACHINE_ARN'],
                      name=dt.now().strftime('%Y_%m_%d_%H_%M_%S'),
                  )
              except Exception as e:
                  raise e
  StepFunctionsTriggerRule:
    Type: AWS::Events::Rule
    Properties:
      Name: StepFunctionsTriggerRule
      ScheduleExpression: rate(3 hours)
      Targets:
        - Id: StepFunctionsTrigger
          Arn: !GetAtt StepFunctionsTrigger.Arn
      State: "ENABLED"
  StepFunctionsPermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref StepFunctionsTrigger 
      SourceArn: !GetAtt StepFunctionsTriggerRule.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com

AWS Step Functionsのモニタリング

ステートマシンの状態遷移を見ているのは楽しいですが、監視はモニタリングツールに任せたいですね。Mackerelをお使いの方はプラグインを入れてもらうとすぐにモニタリングできます。

実行時間や失敗した数などがメトリックとして取れるので、適宜監視を仕込むと安心です。

f:id:mackerelio:20180420191310p:plain

AWS Step Functionsの類似ツール

ワークフロー管理はAWS Step Functionsが初めて出したわけではなく、同じようなものがすでにいくつかあります。

これらのツールを使ったことがあるわけではないので的を外しているかもしれませんが、AWS Step Functionsを使うと

  • AWS Batch/Lambda/ECSのタスク/EMRなど既存のAWSスタックとの連携がスムーズにできる
  • マネージトサービスなので、ワークフロー監視システム自体の管理をする必要がない

といったところが利点かなと思います。

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

*1:バッチでまとめてやっています

*2:これまではCloudWatch Eventで適当に回していて、これくらい立ったら1が終わってるから2をやればよかろうとやっていたり、1が失敗しても2は問答無用で走る形になっていました...

AWS Batchとそのモニタリング方法について

最近、仕事や趣味でAWS Batchをよく使っています。仕事と趣味のそれぞれの用途は以下の通りです。

AWS Batchの使い方やモニタリング方法が大分こなれてきたので、エントリにまとめておきます。

AWS Batch is ...

フルマネージド型のバッチ処理サービスです。バッチジョブを次々に投下したい際に計算を行なうクラスタの管理を自分で行なうのはしんどいですが、AWS Batchを使うと面倒を見てくれます。必要な時にはインスタンスを増やして素早く終わらせ、不必要になったらインスタンスを落としてくれるので、コストを抑えることができます。AWS Black Beltの資料が分かりやすいと思います。

Black Beltの資料にもありますが、以下の概念を理解しておくとスムーズにいくと思います。

  • コンピューティング環境
    • VPCやサブネット、必要なVCpu数などを管理。リソースが足りなければ追加でEC2インスタンスを立ち上げ、不要になったら落としてくれる
  • ジョブキュー
    • どのコンピューティング環境にどういう優先順位でジョブを投入するかなどが指定できる
    • キューに状態が管理されているので、EC2が立ち上がりまくってAWS破産する心配がない
  • ジョブ定義
    • ECSのタスク定義と似たようなもの。最近、タイムアウトのサポートもされて安心感が増した

コンピューティング環境/ジョブキュー/ジョブ定義をそれぞれCloudFormationで書いた例も載せておきます。

CloudFormationの例

Resources:
  ComputeEnvironment:
    Type: "AWS::Batch::ComputeEnvironment"
    Properties:
      Type: MANAGED
      ServiceRole: !Sub "arn:aws:iam::${AWS::AccountId}:role/service-role/AWSBatchServiceRole"
      ComputeEnvironmentName: go-active-learning
      ComputeResources:
        MinvCpus: 0
        MaxvCpus: 256
        DesiredvCpus: 0
        SecurityGroupIds:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:BatchSecurityGroup"
        Type: EC2
        Subnets:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZa"
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZc"
        InstanceTypes:
          - optimal
        InstanceRole: !Sub "arn:aws:iam::${AWS::AccountId}:instance-profile/ecsInstanceRole"
      State: ENABLED
  UpdateRecommendationJobDefinition:
    Type: "AWS::Batch::JobDefinition"
    Properties:
      Type: container
      ContainerProperties:
        Command:
          - "/bin/sh"
          - "/app/update_recommendation"
        Memory: 2500
        Privileged: false
        Environment:
          - Name: "DB_USER"
            Value: "MY_USERNAME"
          - Name: "DB_PASSWORD"
            Value: "MY_PASSWORD"
        ReadonlyRootFilesystem: false
        Vcpus: 1
        Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/go-active-learning:latest"
      JobDefinitionName: update_recommendation
      RetryStrategy:
        Attempts: 1
    DependsOn: ComputeEnvironment
  JobQueue:
    Type: "AWS::Batch::JobQueue"
    Properties:
      ComputeEnvironmentOrder:
        - ComputeEnvironment: !Ref ComputeEnvironment
          Order: 1
      Priority: 1
      State: ENABLED
      JobQueueName: go-active-learning
    DependsOn: ComputeEnvironment

AWS Batch is not ...

cronではありません。ジョブをkickするAPIがあるので、Lambdaなどから起動したりすることが多いですね。CloudWatch Eventのルールで起動させたり、AWS Step Functionsの中に挟むこともあるので、そういった方法での起動がメインになると思います。

CloudWatch Eventから3時間毎にLambdaを起動して、そのLambaでAWS BatchのジョブをkickするCloudFormationの例を置いておきます。

CloudFormationの例

  UpdateRecommendationBatchJobTrigger:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: UpdateRecommendationBatchJobTriggerResource
      Description: Starts a job of AWS Batch desu
      Role: 
        Fn::ImportValue: !Sub "${IAMStackName}:BatchJobTriggerFunctionRole"
      Handler: index.lambda_handler
      Runtime: python2.7
      MemorySize: 128
      Timeout: 30
      Environment:
        Variables:
          JOB_NAME: update_recommendation
          JOB_QUEUE_NAME: go-active-learning
          JOB_DEFINITION_ARN: !Ref UpdateRecommendationJobDefinition
      Code:
        ZipFile: |
          from __future__ import print_function
          from datetime import datetime as dt
          import json
          import os
          import boto3
          batch = boto3.client('batch')
          def lambda_handler(event, context):
              try:
                  response = batch.submit_job(
                      jobName=os.environ['JOB_NAME'],
                      jobQueue=os.environ['JOB_QUEUE_NAME'],
                      jobDefinition=os.environ['JOB_DEFINITION_ARN'],
                      parameters={
                          'Arg1': dt.now().strftime('%H:%M:%S'),
                          'Arg2': 'from-AWS-Lambda'
                      }
                  )
                  print(response)
                  return response['jobId']
              except Exception as e:
                  print(e)
                  raise e
  UpdateRecommendationBatchJobTriggerRule:
    Type: AWS::Events::Rule
    Properties:
      Name: UpdateRecommendationBatchJobTriggerRule
      ScheduleExpression: rate(3 hours)
      Targets:
        - Id: UpdateRecommendationBatchJobTrigger
          Arn: !GetAtt UpdateRecommendationBatchJobTrigger.Arn
      State: "ENABLED"
  UpdateRecommendationPermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref UpdateRecommendationBatchJobTrigger 
      SourceArn: !GetAtt UpdateRecommendationBatchJobTriggerRule.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com

モニタリング方法

AWS Batchの概念が分かって実際のジョブ投入などができたら、正しく動作できているか継続的にモニタリングしたいですね。モニタリングはMackerelを使うと簡単/便利にできます(注: この記事を書いている人はMackerelの中の人です)。

Job Queue Statusのモニタリング

Job Queueに入っている各ステータスのジョブの個数を監視することができます。aws-batchプラグインが用意されているので、インストールすれば簡単にモニタリングできます。mkrを使うとプラグインを簡単にインストールできます。

このプラグインを使うと、こんなグラフが得られます。

ジョブが捌ける速度より投入される速度が早い場合、QueueのPendingのステータスにいる個数が増えていきます。Mackerelのメトリック監視を入れることでMaxVcpuを増やすなどの判断をすることができますね。

成功/失敗したタスクのモニタリング

Job Queueのステータスのモニタリングはこれまで書いたものでできますが、ジョブがSUCCEEDEDになったかFAILEDになったかはモニタリングすることができません。CloudWatch EventのAWS Batchイベントストリームを使うと状態変化を受け取ることができます。

ひとまずFAILEDになったかを知ることができればいいので、状態変化があったらslackに投稿するようにしています。同僚のid:astjさんが書いてくれたいいやつです。

状態変化をslackに通知するCloudFormationの例

  # AWS Batch ジョブの状態変化を Slack 通知する Lambda Function
  BatchJobStatusToSlack:
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: "BatchJobStatusToSlack"
      Description: "AWS Batch job 状態変化を Slack に通知します"
      Handler: "index.handler"
      Role: !ImportValue "iam:LambdaBasicExecutionRoleARN"
      Runtime: "python3.6"
      Environment:
        Variables:
          MY_ENV: !Ref Environment
          SLACK_WEBHOOK_URL: "https://hooks.slack.com/services/..."
      Code:
        ZipFile: |
          # -*- coding: utf-8 -*-
          import json
          import os
          import urllib.request


          def post_to_slack(payload):
              url = os.getenv('SLACK_WEBHOOK_URL')
              if url is None:
                  raise ValueError('webhook url is not defined')
              json_data = json.dumps(payload).encode('utf-8')
              request = urllib.request.Request(url, data=json_data, method='POST')
              with urllib.request.urlopen(request) as response:
                  return response.read().decode('utf-8')


          def color_mapping(status):
              try:
                  return {
                      'PENDING': 'warning',
                      'STARTING': '#66ccff',
                      'RUNNING': '#aa99ff',
                      'SUCCEEDED': 'good',
                      'FAILED': 'danger',
                  }[status]
              except KeyError:
                  return '#cccccc'


          def handler(event, context):
              account = os.getenv('MY_ENV')
              status = event['detail']['status']
              job_id = event['detail']['jobId']
              params = json.dumps(event['detail']['parameters'])
              url = 'https://%s.console.aws.amazon.com/batch/home#/jobs/queue/%s/job/%s' % (event['region'], event['detail']['jobQueue'].replace('/', '~F'), job_id)
              payload = {
                  'username': 'event_notifier',
                  'attachments': [{
                      'color': color_mapping(status),
                      'fallback': '[%s] JobID %s status changed (%s)' % (status, job_id, account),
                      'title': '[%s] Job status changed (%s)' % (status, account),
                      "title_link": url,
                      "text": '```%s```' % params,
                      'mrkdwn_in': ['text'],
                  }]
              }
              print(post_to_slack(payload))
      Timeout: "10"
      TracingConfig:
        Mode: "PassThrough"
  RulesBatchJobStatusToSlack:
    Type: "AWS::Events::Rule"
    Properties:
      State: "ENABLED"
      EventPattern:
        source:
          - "aws.batch"
        detail-type:
          - "Batch Job State Change"
        detail:
          jobQueue:
            - !Ref MyJobQueue
          status:
            - "FAILED"
      Targets:
        -
          Arn: !GetAtt [BatchJobStatusToSlack, Arn]
          Id: "ToSlackLambda"
  LambdaInvokePermission:
    Type: "AWS::Lambda::Permission"
    Properties:
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn: !GetAtt [RulesBatchJobStatusToSlack, Arn]
      FunctionName: !GetAtt [BatchJobStatusToSlack, Arn]

ECSのモニタリング

AWS Batchの裏側ではECSが起動しています。aws-ecsプラグインを使うことで、クラスタのリソース(CPUとMemory)の使用状況をモニタリングすることができます。

使用状況を継続的にモニタリングすることで、EC2のスペックを上げ下げする判断材料になります。グラフの例はこんな感じです。3時間毎に10分かからず終わるジョブなので、ぴょんぴょんしたかわいいグラフになっています。

困ること

真面目に使うとこの辺が困ってきますが、自前でやるよりは大分便利ですね...。

  • テストでAWS Batchのmockをやってくれるツールがない
    • 例えばdynamodbだとツールがある
  • コンソール画面がまだこなれていない
    • 例えばジョブの完了日時でソートができない。大量にジョブが並んでいるときに目的のものを探すのに時間がかかる
  • コンソール画面からは設定できるが、CloudFormationからは設定できない項目がある
    • 例えばタイムアウト設定など

参考

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

pg_activityで刺さっているクエリを殺す

サービス運用しているとクエリが刺さってパフォーマンスが悪くなってしまっているとき、ありますよね。根本対応は別途やるとして、今はひとまずこの刺さっているクエリを殺して凌ぎたい。PostgreSQLではpg_stat_activityというテーブルに現在実行中の情報が入っています。

SQLで殺してもいいですが、同僚に教えてもらったpg_activityというツールが便利だったのでメモしておきます。

cliのツールですが、tigなどと同じような形で現在実行中の中から殺したいクエリを選んで、というのを視覚的に行なえるので便利です。

インストール

pg_activityはPythonで書かれているツールなので、pipでインストールします。ansibleでインストールしているので、以下のように書きました。amazon linux2上での動作を確認していますが、他でも似たようにやれば動くと思います。

---
- tasks:
    - name: "yum install"
      yum: name={{item}} state=installed
      with_items:
        - gcc
        - postgresql
        - postgresql-devel
        - python3
        - python3-pip
        - python3-devel
      become: true
    - name : install pg_activity
      shell: pip3 install pg_activity
      become: true

pg_activityを動かしてみる

こんな感じで動かせます。

% pg_activity --username=my_user_name --host=my_rds_host --dbname=my_db_name --rds

殺したいクエリがいたら、それを選択してkを押すと殺せます。

試してみる

意図的にですが、クエリを殺す例をやってみます。ターミナルAでトランザクションを張って、テーブルをロックしてみます。

go-active-learning=> BEGIN;
BEGIN
go-active-learning=> LOCK TABLE example IN EXCLUSIVE MODE;
LOCK TABLE

別のターミナルBで更新系のクエリを発行してみます。そうするとまあ当然ブロックされる。

go-active-learning=> BEGIN;
BEGIN
go-active-learning=> UPDATE example SET label = 1 where id = 8508;

この状態でpg_activityを見にいくとこんな状態になっていることが分かります。

この状態でUPDATE ...を選択してkを押すとこのクエリを殺せます。ターミナルBでは無慈悲なメッセージが流れていて、殺せていることが分かります。

go-active-learning=> UPDATE example SET label = 1 where id = 8508;
FATAL:  terminating connection due to administrator command
LINE 1: UPDATE example SET label = 1 where id = 8508;
               ^
SSL connection has been closed unexpectedly
The connection to the server was lost. Attempting reset: Succeeded.

クエリが刺さらないのが一番ですが、もし刺さってしまったときにどう対処すればいいか押さえておくと安心できますね。

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

最近の砂場活動その4: CloudWatch Logs/Kinesis Firehose/S3/Athenaを利用してログ分析をやってみる

Fargateを使うようになり、EC2サーバーを管理しなくなってよくなりました(東京リージョン、7月ですね!)。一方、これまでサーバーにログとして吐かれていたファイルの閲覧や集計に困ることも増えてきました(sshでログインして、ログファイルをgrepなどができない)。AWSのサービスを組み合わせることでこれが解決できないか、ということをやってみました。個人運用のWebサービスにはオーバーキルですが、仕事で使うことも見越して...という感じです。素人が試行錯誤しながらやっているので、もっとよいやり方があると思います。

やりたいこと

一言でいうとNginxのアクセスログをいい感じで集計できるように、というのがやりたいことです。

FargateでWebアプリケーションサーバーとNginxを同一のタスクで動かしています。Nginxは静的ファイルへの振り分けとログ取りが主な目的です。Nginxのログをファイルに出力していると、タスクが終了した際にログが揮発してしまうので、標準出力に吐くようにします。そうすると、CloudWatch Logsに吐かれます。これをS3まで持っていければ、AthenaでSQLを書いたり、その結果をRedashで可視化することができます。今回はS3にログを運ぶやり方を探っていきます。

ログはltsv形式で吐いています。こんな感じ。

Nginxの設定ファイル

log_format tsv_go_active_learning "time:$time_local"
                                  "\thost:$remote_addr"
                                  "\tvhost:$host"
                                  "\tforwardedfor:$http_x_forwarded_for"
                                  "\tmethod:$request_method"
                                  "\turi:$request_uri"
                                  "\tprotocol:$server_protocol"
                                  "\tstatus:$status"
                                  "\tsize:$body_bytes_sent"
                                  "\tua:$http_user_agent"
                                  "\tapptime:$upstream_response_time"
                                  "\treqtime:$request_time"
                                  "\tupstream:$upstream_addr"
                                  "\tupstream_status:$upstream_status"
                                  "\thostname:$hostname"
                                  ;

upstream local-backend {
    server localhost:7778;
}

server {
    listen 7777 default_server;
    proxy_connect_timeout 10;
    proxy_read_timeout    10;
    proxy_send_timeout    10;
    access_log /dev/stdout tsv_go_active_learning;

    location / {
        root /www/app;
        proxy_set_header Host $http_host;
    }
    location /api {
        proxy_pass http://local-backend;
    }
}

CloudWatch LogsからS3までログを運ぶ手段

お手軽な方法としては、Lambdaを使って定期的にS3にエクスポートするのが常套手段のようです。

Lambdaを書けばいいのはお手軽ですが、リアルタイムに集計したい場合や一日のログが大きすぎてLambdaの制限時間の5分で終わらない!というの場合には困りそうですね。ある程度のログが溜まったら、それをトリガーにしてS3にエクスポートして欲しいです。調べてみると、こういう用途はどうやらKinesis Firehoseが得意なようです。CloudWatch Logsにはサブスクリプションという機能があり、これを使うとKinesis Firehoseにデータを流すことができます。

Kinesis Firehose、最初は何やってくれるのかさっぱり分からないですが、やってくれることとしては

  • リアルタイムにじゃんじゃか流れてくるデータを受け止める土管
  • [optional] 流れてきたデータをLambdaで変換できる
  • 一定量、もしくは一定時間経過したらAWSのデータストアに横流ししてくれる
    • 流す先はS3/Redshift/Amazon Elasticsearch Serviceなどの分析基盤となるサービスが中心
    • 流す処理は自分でコードを書く必要がない、firehoseが面倒を見てくれる

が挙げられます。これまでのよくあるパターンとしては、fluentdで集約(aggregate)してS3などに流すというのがありましたが、集約サーバーの運用に手間がかかるというのがありました。Kinesis Firehoseを使うと、この手間をmanagedサービスが受け持ってくれるので手間が減らせるようです。

脱線: Kinesis Data Streams

もっと汎用的な土管としてはKinesis Streamsがあります。Firehoseは出力先(コンシューマーというらしい)がS3/Redshift/Amazon Elasticsearch Serviceなどに限定されていましたが、Streamsでは自前のプログラムから触ることができ、自由度が高くなります。仕事で作っているMackerelでもKinesis Streamsはお世話になっていて、mackerel-agentから投稿されるデータを引き受けてくれています。Kinesis Streamsで受け取ったデータは、Lambdaがコンシューマーとなり、RedisやDynamoDbにデータを書き込んでいます。

Kinesis Streamsは汎用的に使える一方で

  • シャード数は自分で管理する必要がある
  • コンシューマーのコードは自分で書く必要がある

といった手間が増えます。コンシューマーの書き込み先がS3/Redshift/Amazon Elasticsearch Serviceなどであれば、これらの手間を肩代わりしてくれるKinesis Firehoseを使うとよいようです。

Kinesis FirehoseとCloudWatch LogsのサブスクリプションでログをS3に転送する

はい、あとはやるだけですね。CloudFormationで構成管理しているので、その断片を貼っておきます。やっていることは大まかにはこんな感じです。

  • CloudWatch Logsに流れてくるデータをsubscribeしておく
    • subscribe先はDeliveryStream
  • CloudWatch Logsに前回から50MBs以上データが溜まるか60秒以上経過していたらエクスポートを開始
  • エクスポートする際に変換をLambdaでやる
    • subscribeするとjson形式になるので、元のltsv形式に戻すだけの変換です
    • AWSが用意してくれているblueprintがあるので、それをちょっと変えればいいだけ
    • こういったLambdaの構成管理にはSAMが便利でした

Firehoseの構成管理

  DeliveryStream:
    Type: 'AWS::KinesisFirehose::DeliveryStream'
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Join
          - ''
          - - 'arn:aws:s3:::'
            - Fn::ImportValue:
                !Sub "${S3StackName}:NginxLogsS3Bucket"
        BufferingHints:
          IntervalInSeconds: '60'
          SizeInMBs: '50'
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN:
          Fn::ImportValue: !Sub "${IAMStackName}:DeliveryRole"
        ProcessingConfiguration:
          Enabled: 'true'
          Processors:
            - Parameters:
                - ParameterName: LambdaArn
                  ParameterValue:
                    Fn::ImportValue: !Sub "${SamStackName}:LambdaExtractMessageFromCloudWatch"
              Type: Lambda
  SubscriptionFilter:
    Type: "AWS::Logs::SubscriptionFilter"
    Properties:
      RoleArn:
        Fn::ImportValue:
          !Sub "${IAMStackName}:CloudWatchIAMRole"
      LogGroupName:
        !Ref NginxLogGroup
      FilterPattern: ""
      DestinationArn:
        Fn::GetAtt:
          - DeliveryStream
          - Arn

1分くらい待っているとS3のオブジェクトができあがってきます。日付が関連付けられています。

S3のファイルをAthenaで分析

S3にデータが集まれば、あとはAthenaでなんとでもなります。例えば処理時間がかかっているuriを列挙するのは、こんな感じです。最初にテーブルを作る。フルスキャンが走らないようにパーティションを切りましょう。

テーブル定義と分析のSQL

CREATE EXTERNAL TABLE IF NOT EXISTS go_active_learning_nginx.go_active_learning_nginx (
  `time` date,
  `host` string,
  `vhost` string,
  `forwardedfor` string,
  `method` string,
  `uri` string,
  `protocol` string,
  `status` int,
  `size` int,
  `ua` string,
  `apptime` double,
  `reqtime` double,
  `upstream` string,
  `upstream_status` int,
  `hostname` string
) PARTITIONED BY (
  year string,
  month string,
  day string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://my-bucket/firehose/'
TBLPROPERTIES ('has_encrypted_data'='false');

ALTER TABLE go_active_learning_nginx ADD PARTITION (year='2018', month='06', day='09') location 's3://my-bucket/firehose/2018/06/09/';
SELECT AVG(reqtime) as t, uri FROM go_active_learning_nginx WHERE year='2018' GROUP BY uri ORDER BY t DESC LIMIT 100;

こんな感じでブラウザで実行できます。

このままAthenaでやってもいいし、Redashで可視化してダッシュボードにまとめてもよいです。今回はS3に出力しましたが、RedshiftやAmazon Elasticsearch Serviceに出力して分析やってみたいですね(お財布事情が...)。

まとめ

大量にくるストリームデータをリアルタイムに分析したい場合、Kinesis Firehoseを間に挟むと便利ということが分かりました。自分のサイトはアクセスが雀の涙のような規模ですが、仕事で扱うようなデータ規模だと集約サーバーがmanagedになっているありがたさが出てきそうですね。

S3を中心として(?)、各サービスで連携を取りながら使えるようになっているAWSすごいなぁ。

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

10年戦えるデータ分析入門 SQLを武器にデータ活用時代を生き抜く (Informatics &IDEA)

Go言語でWebアプリを書くときにオートリロードどうするといいの問題

Go言語を書く際、成果物がシングルバイナリになるのは便利です。deployするときや他人に使ってもらうときに、それだけ渡せば使ってもらえるので。cliツールやapiサーバーを書くときにはこの方式で困っていなかったのですが、いわゆるWebアプリをGo言語で書くときのベストプラックティスが分からなかったのでエントリにしておきます。

前提

  • Go言語側は重厚なフレームワークは特に使わない
    • net/httphtml/templateといった標準ライブラリを使う
  • フロント側はVue.js

シングルバイナリを作るまでの過程

以下の過程をMakefileに書いてmake buildとやってシングルバイナリを作っていました。

  • webpackでJavaScript関係をbundle.jsという感じで一つのファイルにまとめる
  • go-assets-builderを使って、index.htmlbundle.jsを一つのasset(asset.goみたいなのができる)にまとめる
  • 全てgoのファイルになったので、go buildでシングルバイナリに固める

シングルバイナリがすぐできるならば問題ないですが、自然言語処理を内部で使っているアプリケーションのため、成果物が70Mb以上のサイズになります(kagomeを使わせてもらっています)。htmlやjsをちょっと変えて確認したいだけなのに、make buildで20秒くらいかかるのはなかなかストレスですね。なんとかしたい。ちなみにファイルに変更があったら自動でmake buildするには、reflexなどのファイル変更を監視する系のツールを使うといいというのを教えてもらいました。

解決案1: ローカルと本番で処理を分ける

ローカルではindex.htmlbundle.jsを呼ばれる度に静的ファイルを読み込むということをすれば、テンプレートが書き換えられる度にシングルバイナリを作りなおす必要はなくなります。本番環境ではさきほどまでに述べた方法で全部goのファイルにしてシングルバイナリを作ります。

問題自体は解決しますが、localと本番で処理を分岐するのは考えることも増えるし、コードも増えるのでちょっとイマイチですね...。

解決案2: 静的ファイルはNginxで返すようにして、アプリケーションサーバーはAPIサーバーの役割に徹する

自分はこれを選択しました。SPAになっているので、index.htmlbundle.jsはNginxで返します。Vue.jsから叩かれるエンドポイントはgoがAPIサーバーっぽく処理させます。Nginxの設定ファイルだとこんな感じ。

upstream local-backend {
    server localhost:7778; # goのアプリケーションサーバーです
}

server {
    listen 7777 default_server;
    proxy_connect_timeout 600;
    proxy_read_timeout    600;
    proxy_send_timeout    600;

    location / {
        root /www/app;
    }
    location /api {
        proxy_pass http://local-backend;
    }
}

手元でNginxを立てるのは面倒なので、その場合はwebpackの機能を使います。webpack.config.jsに以下のように書くと、特定のpath以下だとapiサーバーにリクエストをproxyしてくれます。webpack便利やん。

module.exports = {
  entry: './src/js/app',
  output: {
    path: __dirname + '/static',
    filename: 'bundle.js'
  },
  ...,
  devServer: {
    host: 'localhost',
    port: 7777,
    contentBase: __dirname + '/static',
    proxy: {
      '/api': {
        target: 'http://localhost:7778'
      }
    }
  }
};

これでhtmlやjsを書き換えただけなのにシングルバイナリ作りが始まってなかなか修正された様子が見れない、といったことがなくなりました。やったー。全体の様子はこの辺です。Webサイト自体はこちらです。

Goの変数でテンプレートを出し分けなどをやりたい場合、素のhtmlでない場合が多いのでこのパターンは使えないというのが悩ましいですね。

まとめ

Go言語でWebアプリケーションを書くときにオートリロードをするときに困ったこと、自分なりの解決策を書いてみました。もっといい方法がある!!という方は是非教えてください。

みんなのGo言語[現場で使える実践テクニック]

みんなのGo言語[現場で使える実践テクニック]

  • 作者: 松木雅幸,mattn,藤原俊一郎,中島大一,牧大輔,鈴木健太
  • 出版社/メーカー: 技術評論社
  • 発売日: 2016/09/09
  • メディア: Kindle版
  • この商品を含むブログを見る

最近の砂場活動その3: フロントエンド編(Vue.js/SPA)

Vue.jsでフロントエンド再入門しました。仕事でjs触るけど、ゼロからあれこれやると分からないことが無限に発生しますね...。成果物はこちら。

サーバーサイドはGoで書いていて、フロント周りがVue.jsです。それっぽい見た目になってきたので、Route53でドメイン取得しました。

前回までのあらすじ

今年の目標で自分であれこれ触って壊して試せる砂場を作る活動というのをやってきました。先月までは主にインフラとかミドルウェア周りをやりました。

下回りは整ってきつつありますが、表側がとにかく素朴なhtmlになっていて、スマホで見たときの体験がよくなかったです。体験がよろしくないものは自分でも触らなくなるし、砂場でもっと遊ぶためにもフロント周りを少し整備するか...ということで今回のエントリの内容をやりました。

初手

仕事のリポジトリはフロント周りのツール(linterなど)が整備されているけど、自分でゼロからやっていくときには何が必要かよく分かっていませんね...。めちゃくちゃ素朴に始めたかったので、以下のお手軽なところからやりました。

  • CDNから毎回scriptを読み込む
  • jsのファイル用意するのも面倒だったので、htmlのscriptタグ内に素朴にjsを書く

最低限の装備だけど、Vue.jsが動くようになりました。サーバーサイドとのやり取りはaxiosというやつが便利でした。こんな感じ。

import axios from 'axios';

fetchList(listname) {
  this.examples = [];
  axios.get("/api/examples?listName=" + listname)
  .then(response => {
    this.examples = response.data;
  });
}

どんな感じかまあまあ掴めたのでよしとします。

webpackを導入

先週ちょうどAWSサミットで東京に出張に行っていたのですが、新幹線でも開発しようと思うとCDNから読み込んでたらダメですね。npmで手元に持ってきたモジュールを一つにまとめてscriptで読み込みたい、ということでwebpackを使うことにしました。以下の資料を参考に進めました。

webpack.config.jsが設定ファイルになっています。複雑なことはとにかくしたくなかったので、最低限の設定をやりました。

  • entry: モジュール間の依存関係を解析を始める箇所
  • output: モジュールを一つにまとめた後に吐くファイルの場所
  • resolve: Vue.jsを使っている場合は設定が必要

webpackでjs/css関連は一つのファイルにまとまったので、それをgoから読み出しましょう。go-bindataは最近よろしくないので、go-assets-builderを使って一つのgoのassetにまとめました。

% go-assets-builder --package=templates templates dist > lib/assets/templates.go

assetとして読み込んでstatic以下で呼べるように配置すれば、デプロイも簡単なままです。

func doServe(c *cli.Context) error {
    http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(templates.Assets)))
    return http.ListenAndServe(addr, nil)
}

Componentの導入

あちこち同じパーツを使い回したい場所が出てきたので、componentを導入します。vueファイルにtemplate/script/styleのそれぞれ必要なものを書いていきます。componentを定義すれば、import Example from './Example.vue';という感じで他の箇所で定義したcomponentを使い回すことができます。この辺を参考にした気がします。

Bootstrapで見た目をマシにする

Twitterをまとめたタブ、slideshareをまとめたタブ...というのを作りたかったけど、自分でデザインすると明かにダサくなりそうでした。Bootstrapってやつを使うとある程度お手軽に見栄えのするが作れたなと思いましたが、自分で触ったことがなかったのでやってみることにしました。Vue.jsからも簡単に使うことができました。すでに用意されているComponentsから適当にそれっぽいものを選んでいると、それっぽくなるのでとにかくお手軽。説明だけではなく例も必ず載っているので、めっちゃ助かりました。

PCだと3カラムで表示されていますが、幅を狭くしたりスマホで見ると1カラムになったりnavibarが勝手に折り畳まれたりして便利ですね。

<meta name="viewport" content="width=device-width,initial-scale=1">を設定するのを忘れずに。

SPA導入

タブを切り変えた後にブラウザをリロードすると、どのタブにいたか分からなくなってしまうので困りました。それならばということでSPAにしました。Vue.jsは公式のrouterがあるので、それを使えばよいです。

サーバーサイドのコントローラーを書いているときのノリで、どのpathにきたらどのcomponentを呼び出すというのを書いてやればよいです。こんな感じ。

export default [
  {
    path: '/',
    redirect: '/list/general'
  },
  {
    path: '/list/:listname',
    component: ListExample
  },
  {
    path: '/recent-added-examples',
    component: RecentAddedExamples
  }
]

SPAにするとGoogle Analyticsで正確にトラッキングできない問題

何も考えずにGoogle Analyticsを入れたところ、訪問先が全部/になって悲しいことになりした。先人がいたので、それを使わせてもらいました。

きちんと取れるようになりました。

参考図書

大体この2冊で大丈夫でした。

React、Angular、Vue.js、React Nativeを使って学ぶ はじめてのフロントエンド開発

React、Angular、Vue.js、React Nativeを使って学ぶ はじめてのフロントエンド開発

  • 作者: 原一浩,taisa,小松大輔,永井孝,池内孝啓,新井正貴,橋本安司,日野洋一郎
  • 出版社/メーカー: 技術評論社
  • 発売日: 2018/05/09
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る
速習Vue.js 速習シリーズ

速習Vue.js 速習シリーズ

PyCon mini Osakaで異常検知システム構築の裏側について発表しました

現在仕事で作っている異常検知システムについてPyCon mini Osakaで登壇してきました。異常検知というマイナーなトピックですが、多くの人に聞いてもらえてよかったです。

はい、はてなのMackerelチームの中の人です。

機械学習の人からすると「なんだただの混合ガウス分布か」と思われるかもしれませんが、異常検知のシステムを実際に作ろうとすると考えることが色々あります。今回の発表では

  • ユーザーのどのような要望から異常検知機能を作るに至ったか
  • 異常検知とはそもそも何か、どういった問題設定か
  • 異常検知手法にはどういったものがあるか、どのようなメリット/デメリットがあるか
    • どの手法だとコストをなるべく抑えながらサービス要件を満たせるか
  • 実際のサービス提供するためにどのようなアーキテクチャ設計にしたのか
    • 高頻度かつ不定期な学習Jobに対応するためにAWS Batchを採用
    • 誤検知を少なくするためには、荒い粒度ではなく生データが重要。独自の時系列DBを構築

といった内容について話しました。

合わせて読みたい

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)