AWS Batchとそのモニタリング方法について

最近、仕事や趣味でAWS Batchをよく使っています。仕事と趣味のそれぞれの用途は以下の通りです。

AWS Batchの使い方やモニタリング方法が大分こなれてきたので、エントリにまとめておきます。

AWS Batch is ...

フルマネージド型のバッチ処理サービスです。バッチジョブを次々に投下したい際に計算を行なうクラスタの管理を自分で行なうのはしんどいですが、AWS Batchを使うと面倒を見てくれます。必要な時にはインスタンスを増やして素早く終わらせ、不必要になったらインスタンスを落としてくれるので、コストを抑えることができます。AWS Black Beltの資料が分かりやすいと思います。

Black Beltの資料にもありますが、以下の概念を理解しておくとスムーズにいくと思います。

  • コンピューティング環境
    • VPCやサブネット、必要なVCpu数などを管理。リソースが足りなければ追加でEC2インスタンスを立ち上げ、不要になったら落としてくれる
  • ジョブキュー
    • どのコンピューティング環境にどういう優先順位でジョブを投入するかなどが指定できる
    • キューに状態が管理されているので、EC2が立ち上がりまくってAWS破産する心配がない
  • ジョブ定義
    • ECSのタスク定義と似たようなもの。最近、タイムアウトのサポートもされて安心感が増した

コンピューティング環境/ジョブキュー/ジョブ定義をそれぞれCloudFormationで書いた例も載せておきます。

CloudFormationの例

Resources:
  ComputeEnvironment:
    Type: "AWS::Batch::ComputeEnvironment"
    Properties:
      Type: MANAGED
      ServiceRole: !Sub "arn:aws:iam::${AWS::AccountId}:role/service-role/AWSBatchServiceRole"
      ComputeEnvironmentName: go-active-learning
      ComputeResources:
        MinvCpus: 0
        MaxvCpus: 256
        DesiredvCpus: 0
        SecurityGroupIds:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:BatchSecurityGroup"
        Type: EC2
        Subnets:
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZa"
          - Fn::ImportValue:
              !Sub "${VPCStackName}:PublicSubnetAZc"
        InstanceTypes:
          - optimal
        InstanceRole: !Sub "arn:aws:iam::${AWS::AccountId}:instance-profile/ecsInstanceRole"
      State: ENABLED
  UpdateRecommendationJobDefinition:
    Type: "AWS::Batch::JobDefinition"
    Properties:
      Type: container
      ContainerProperties:
        Command:
          - "/bin/sh"
          - "/app/update_recommendation"
        Memory: 2500
        Privileged: false
        Environment:
          - Name: "DB_USER"
            Value: "MY_USERNAME"
          - Name: "DB_PASSWORD"
            Value: "MY_PASSWORD"
        ReadonlyRootFilesystem: false
        Vcpus: 1
        Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/go-active-learning:latest"
      JobDefinitionName: update_recommendation
      RetryStrategy:
        Attempts: 1
    DependsOn: ComputeEnvironment
  JobQueue:
    Type: "AWS::Batch::JobQueue"
    Properties:
      ComputeEnvironmentOrder:
        - ComputeEnvironment: !Ref ComputeEnvironment
          Order: 1
      Priority: 1
      State: ENABLED
      JobQueueName: go-active-learning
    DependsOn: ComputeEnvironment

AWS Batch is not ...

cronではありません。ジョブをkickするAPIがあるので、Lambdaなどから起動したりすることが多いですね。CloudWatch Eventのルールで起動させたり、AWS Step Functionsの中に挟むこともあるので、そういった方法での起動がメインになると思います。

CloudWatch Eventから3時間毎にLambdaを起動して、そのLambaでAWS BatchのジョブをkickするCloudFormationの例を置いておきます。

CloudFormationの例

  UpdateRecommendationBatchJobTrigger:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: UpdateRecommendationBatchJobTriggerResource
      Description: Starts a job of AWS Batch desu
      Role: 
        Fn::ImportValue: !Sub "${IAMStackName}:BatchJobTriggerFunctionRole"
      Handler: index.lambda_handler
      Runtime: python2.7
      MemorySize: 128
      Timeout: 30
      Environment:
        Variables:
          JOB_NAME: update_recommendation
          JOB_QUEUE_NAME: go-active-learning
          JOB_DEFINITION_ARN: !Ref UpdateRecommendationJobDefinition
      Code:
        ZipFile: |
          from __future__ import print_function
          from datetime import datetime as dt
          import json
          import os
          import boto3
          batch = boto3.client('batch')
          def lambda_handler(event, context):
              try:
                  response = batch.submit_job(
                      jobName=os.environ['JOB_NAME'],
                      jobQueue=os.environ['JOB_QUEUE_NAME'],
                      jobDefinition=os.environ['JOB_DEFINITION_ARN'],
                      parameters={
                          'Arg1': dt.now().strftime('%H:%M:%S'),
                          'Arg2': 'from-AWS-Lambda'
                      }
                  )
                  print(response)
                  return response['jobId']
              except Exception as e:
                  print(e)
                  raise e
  UpdateRecommendationBatchJobTriggerRule:
    Type: AWS::Events::Rule
    Properties:
      Name: UpdateRecommendationBatchJobTriggerRule
      ScheduleExpression: rate(3 hours)
      Targets:
        - Id: UpdateRecommendationBatchJobTrigger
          Arn: !GetAtt UpdateRecommendationBatchJobTrigger.Arn
      State: "ENABLED"
  UpdateRecommendationPermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref UpdateRecommendationBatchJobTrigger 
      SourceArn: !GetAtt UpdateRecommendationBatchJobTriggerRule.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com

モニタリング方法

AWS Batchの概念が分かって実際のジョブ投入などができたら、正しく動作できているか継続的にモニタリングしたいですね。モニタリングはMackerelを使うと簡単/便利にできます(注: この記事を書いている人はMackerelの中の人です)。

Job Queue Statusのモニタリング

Job Queueに入っている各ステータスのジョブの個数を監視することができます。aws-batchプラグインが用意されているので、インストールすれば簡単にモニタリングできます。mkrを使うとプラグインを簡単にインストールできます。

このプラグインを使うと、こんなグラフが得られます。

ジョブが捌ける速度より投入される速度が早い場合、QueueのPendingのステータスにいる個数が増えていきます。Mackerelのメトリック監視を入れることでMaxVcpuを増やすなどの判断をすることができますね。

成功/失敗したタスクのモニタリング

Job Queueのステータスのモニタリングはこれまで書いたものでできますが、ジョブがSUCCEEDEDになったかFAILEDになったかはモニタリングすることができません。CloudWatch EventのAWS Batchイベントストリームを使うと状態変化を受け取ることができます。

ひとまずFAILEDになったかを知ることができればいいので、状態変化があったらslackに投稿するようにしています。同僚のid:astjさんが書いてくれたいいやつです。

状態変化をslackに通知するCloudFormationの例

  # AWS Batch ジョブの状態変化を Slack 通知する Lambda Function
  BatchJobStatusToSlack:
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: "BatchJobStatusToSlack"
      Description: "AWS Batch job 状態変化を Slack に通知します"
      Handler: "index.handler"
      Role: !ImportValue "iam:LambdaBasicExecutionRoleARN"
      Runtime: "python3.6"
      Environment:
        Variables:
          MY_ENV: !Ref Environment
          SLACK_WEBHOOK_URL: "https://hooks.slack.com/services/..."
      Code:
        ZipFile: |
          # -*- coding: utf-8 -*-
          import json
          import os
          import urllib.request


          def post_to_slack(payload):
              url = os.getenv('SLACK_WEBHOOK_URL')
              if url is None:
                  raise ValueError('webhook url is not defined')
              json_data = json.dumps(payload).encode('utf-8')
              request = urllib.request.Request(url, data=json_data, method='POST')
              with urllib.request.urlopen(request) as response:
                  return response.read().decode('utf-8')


          def color_mapping(status):
              try:
                  return {
                      'PENDING': 'warning',
                      'STARTING': '#66ccff',
                      'RUNNING': '#aa99ff',
                      'SUCCEEDED': 'good',
                      'FAILED': 'danger',
                  }[status]
              except KeyError:
                  return '#cccccc'


          def handler(event, context):
              account = os.getenv('MY_ENV')
              status = event['detail']['status']
              job_id = event['detail']['jobId']
              params = json.dumps(event['detail']['parameters'])
              url = 'https://%s.console.aws.amazon.com/batch/home#/jobs/queue/%s/job/%s' % (event['region'], event['detail']['jobQueue'].replace('/', '~F'), job_id)
              payload = {
                  'username': 'event_notifier',
                  'attachments': [{
                      'color': color_mapping(status),
                      'fallback': '[%s] JobID %s status changed (%s)' % (status, job_id, account),
                      'title': '[%s] Job status changed (%s)' % (status, account),
                      "title_link": url,
                      "text": '```%s```' % params,
                      'mrkdwn_in': ['text'],
                  }]
              }
              print(post_to_slack(payload))
      Timeout: "10"
      TracingConfig:
        Mode: "PassThrough"
  RulesBatchJobStatusToSlack:
    Type: "AWS::Events::Rule"
    Properties:
      State: "ENABLED"
      EventPattern:
        source:
          - "aws.batch"
        detail-type:
          - "Batch Job State Change"
        detail:
          jobQueue:
            - !Ref MyJobQueue
          status:
            - "FAILED"
      Targets:
        -
          Arn: !GetAtt [BatchJobStatusToSlack, Arn]
          Id: "ToSlackLambda"
  LambdaInvokePermission:
    Type: "AWS::Lambda::Permission"
    Properties:
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn: !GetAtt [RulesBatchJobStatusToSlack, Arn]
      FunctionName: !GetAtt [BatchJobStatusToSlack, Arn]

ECSのモニタリング

AWS Batchの裏側ではECSが起動しています。aws-ecsプラグインを使うことで、クラスタのリソース(CPUとMemory)の使用状況をモニタリングすることができます。

使用状況を継続的にモニタリングすることで、EC2のスペックを上げ下げする判断材料になります。グラフの例はこんな感じです。3時間毎に10分かからず終わるジョブなので、ぴょんぴょんしたかわいいグラフになっています。

困ること

真面目に使うとこの辺が困ってきますが、自前でやるよりは大分便利ですね...。

  • テストでAWS Batchのmockをやってくれるツールがない
    • 例えばdynamodbだとツールがある
  • コンソール画面がまだこなれていない
    • 例えばジョブの完了日時でソートができない。大量にジョブが並んでいるときに目的のものを探すのに時間がかかる
  • コンソール画面からは設定できるが、CloudFormationからは設定できない項目がある
    • 例えばタイムアウト設定など

参考

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版