最近、仕事や趣味でAWS Batchをよく使っています。仕事と趣味のそれぞれの用途は以下の通りです。
- 仕事: 機械学習を用いた異常検知システムのパラメータ学習
- PyCon mini Osakaで異常検知システム構築の裏側について発表しました - yasuhisa's blog
- 学習データ取得と混合ガウス分布のパラメータ推定が元気に動いています
- 趣味: ML Newsという機械学習関連のエントリをまとめてくれる君
- 元になるエントリをTwitterから取得する部分
- 機械学習に関連するエントリか判定する分類器の学習
- 取得してきたエントリの判定
AWS Batchの使い方やモニタリング方法が大分こなれてきたので、エントリにまとめておきます。
AWS Batch is ...
フルマネージド型のバッチ処理サービスです。バッチジョブを次々に投下したい際に計算を行なうクラスタの管理を自分で行なうのはしんどいですが、AWS Batchを使うと面倒を見てくれます。必要な時にはインスタンスを増やして素早く終わらせ、不必要になったらインスタンスを落としてくれるので、コストを抑えることができます。AWS Black Beltの資料が分かりやすいと思います。
Black Beltの資料にもありますが、以下の概念を理解しておくとスムーズにいくと思います。
- コンピューティング環境
- VPCやサブネット、必要なVCpu数などを管理。リソースが足りなければ追加でEC2インスタンスを立ち上げ、不要になったら落としてくれる
- ジョブキュー
- どのコンピューティング環境にどういう優先順位でジョブを投入するかなどが指定できる
- キューに状態が管理されているので、EC2が立ち上がりまくってAWS破産する心配がない
- ジョブ定義
- ECSのタスク定義と似たようなもの。最近、タイムアウトのサポートもされて安心感が増した
コンピューティング環境/ジョブキュー/ジョブ定義をそれぞれCloudFormationで書いた例も載せておきます。
CloudFormationの例
Resources: ComputeEnvironment: Type: "AWS::Batch::ComputeEnvironment" Properties: Type: MANAGED ServiceRole: !Sub "arn:aws:iam::${AWS::AccountId}:role/service-role/AWSBatchServiceRole" ComputeEnvironmentName: go-active-learning ComputeResources: MinvCpus: 0 MaxvCpus: 256 DesiredvCpus: 0 SecurityGroupIds: - Fn::ImportValue: !Sub "${VPCStackName}:BatchSecurityGroup" Type: EC2 Subnets: - Fn::ImportValue: !Sub "${VPCStackName}:PublicSubnetAZa" - Fn::ImportValue: !Sub "${VPCStackName}:PublicSubnetAZc" InstanceTypes: - optimal InstanceRole: !Sub "arn:aws:iam::${AWS::AccountId}:instance-profile/ecsInstanceRole" State: ENABLED UpdateRecommendationJobDefinition: Type: "AWS::Batch::JobDefinition" Properties: Type: container ContainerProperties: Command: - "/bin/sh" - "/app/update_recommendation" Memory: 2500 Privileged: false Environment: - Name: "DB_USER" Value: "MY_USERNAME" - Name: "DB_PASSWORD" Value: "MY_PASSWORD" ReadonlyRootFilesystem: false Vcpus: 1 Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/go-active-learning:latest" JobDefinitionName: update_recommendation RetryStrategy: Attempts: 1 DependsOn: ComputeEnvironment JobQueue: Type: "AWS::Batch::JobQueue" Properties: ComputeEnvironmentOrder: - ComputeEnvironment: !Ref ComputeEnvironment Order: 1 Priority: 1 State: ENABLED JobQueueName: go-active-learning DependsOn: ComputeEnvironment
AWS Batch is not ...
cronではありません。ジョブをkickするAPIがあるので、Lambdaなどから起動したりすることが多いですね。CloudWatch Eventのルールで起動させたり、AWS Step Functionsの中に挟むこともあるので、そういった方法での起動がメインになると思います。
- [アップデート]AWS BatchがCloudWatch Eventsに対応しました!時間起動も可能に | Developers.IO
- この辺、CloudFormationからはまだできない気がします...
CloudWatch Eventから3時間毎にLambdaを起動して、そのLambaでAWS BatchのジョブをkickするCloudFormationの例を置いておきます。
CloudFormationの例
UpdateRecommendationBatchJobTrigger: Type: AWS::Lambda::Function Properties: FunctionName: UpdateRecommendationBatchJobTriggerResource Description: Starts a job of AWS Batch desu Role: Fn::ImportValue: !Sub "${IAMStackName}:BatchJobTriggerFunctionRole" Handler: index.lambda_handler Runtime: python2.7 MemorySize: 128 Timeout: 30 Environment: Variables: JOB_NAME: update_recommendation JOB_QUEUE_NAME: go-active-learning JOB_DEFINITION_ARN: !Ref UpdateRecommendationJobDefinition Code: ZipFile: | from __future__ import print_function from datetime import datetime as dt import json import os import boto3 batch = boto3.client('batch') def lambda_handler(event, context): try: response = batch.submit_job( jobName=os.environ['JOB_NAME'], jobQueue=os.environ['JOB_QUEUE_NAME'], jobDefinition=os.environ['JOB_DEFINITION_ARN'], parameters={ 'Arg1': dt.now().strftime('%H:%M:%S'), 'Arg2': 'from-AWS-Lambda' } ) print(response) return response['jobId'] except Exception as e: print(e) raise e UpdateRecommendationBatchJobTriggerRule: Type: AWS::Events::Rule Properties: Name: UpdateRecommendationBatchJobTriggerRule ScheduleExpression: rate(3 hours) Targets: - Id: UpdateRecommendationBatchJobTrigger Arn: !GetAtt UpdateRecommendationBatchJobTrigger.Arn State: "ENABLED" UpdateRecommendationPermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref UpdateRecommendationBatchJobTrigger SourceArn: !GetAtt UpdateRecommendationBatchJobTriggerRule.Arn Action: lambda:InvokeFunction Principal: events.amazonaws.com
モニタリング方法
AWS Batchの概念が分かって実際のジョブ投入などができたら、正しく動作できているか継続的にモニタリングしたいですね。モニタリングはMackerelを使うと簡単/便利にできます(注: この記事を書いている人はMackerelの中の人です)。
Job Queue Statusのモニタリング
Job Queueに入っている各ステータスのジョブの個数を監視することができます。aws-batchプラグインが用意されているので、インストールすれば簡単にモニタリングできます。mkrを使うとプラグインを簡単にインストールできます。
このプラグインを使うと、こんなグラフが得られます。
ジョブが捌ける速度より投入される速度が早い場合、QueueのPendingのステータスにいる個数が増えていきます。Mackerelのメトリック監視を入れることでMaxVcpuを増やすなどの判断をすることができますね。
成功/失敗したタスクのモニタリング
Job Queueのステータスのモニタリングはこれまで書いたものでできますが、ジョブがSUCCEEDED
になったかFAILED
になったかはモニタリングすることができません。CloudWatch EventのAWS Batchイベントストリームを使うと状態変化を受け取ることができます。
ひとまずFAILEDになったかを知ることができればいいので、状態変化があったらslackに投稿するようにしています。同僚のid:astjさんが書いてくれたいいやつです。
状態変化をslackに通知するCloudFormationの例
# AWS Batch ジョブの状態変化を Slack 通知する Lambda Function BatchJobStatusToSlack: Type: "AWS::Lambda::Function" Properties: FunctionName: "BatchJobStatusToSlack" Description: "AWS Batch job 状態変化を Slack に通知します" Handler: "index.handler" Role: !ImportValue "iam:LambdaBasicExecutionRoleARN" Runtime: "python3.6" Environment: Variables: MY_ENV: !Ref Environment SLACK_WEBHOOK_URL: "https://hooks.slack.com/services/..." Code: ZipFile: | # -*- coding: utf-8 -*- import json import os import urllib.request def post_to_slack(payload): url = os.getenv('SLACK_WEBHOOK_URL') if url is None: raise ValueError('webhook url is not defined') json_data = json.dumps(payload).encode('utf-8') request = urllib.request.Request(url, data=json_data, method='POST') with urllib.request.urlopen(request) as response: return response.read().decode('utf-8') def color_mapping(status): try: return { 'PENDING': 'warning', 'STARTING': '#66ccff', 'RUNNING': '#aa99ff', 'SUCCEEDED': 'good', 'FAILED': 'danger', }[status] except KeyError: return '#cccccc' def handler(event, context): account = os.getenv('MY_ENV') status = event['detail']['status'] job_id = event['detail']['jobId'] params = json.dumps(event['detail']['parameters']) url = 'https://%s.console.aws.amazon.com/batch/home#/jobs/queue/%s/job/%s' % (event['region'], event['detail']['jobQueue'].replace('/', '~F'), job_id) payload = { 'username': 'event_notifier', 'attachments': [{ 'color': color_mapping(status), 'fallback': '[%s] JobID %s status changed (%s)' % (status, job_id, account), 'title': '[%s] Job status changed (%s)' % (status, account), "title_link": url, "text": '```%s```' % params, 'mrkdwn_in': ['text'], }] } print(post_to_slack(payload)) Timeout: "10" TracingConfig: Mode: "PassThrough" RulesBatchJobStatusToSlack: Type: "AWS::Events::Rule" Properties: State: "ENABLED" EventPattern: source: - "aws.batch" detail-type: - "Batch Job State Change" detail: jobQueue: - !Ref MyJobQueue status: - "FAILED" Targets: - Arn: !GetAtt [BatchJobStatusToSlack, Arn] Id: "ToSlackLambda" LambdaInvokePermission: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt [RulesBatchJobStatusToSlack, Arn] FunctionName: !GetAtt [BatchJobStatusToSlack, Arn]
ECSのモニタリング
AWS Batchの裏側ではECSが起動しています。aws-ecsプラグインを使うことで、クラスタのリソース(CPUとMemory)の使用状況をモニタリングすることができます。
使用状況を継続的にモニタリングすることで、EC2のスペックを上げ下げする判断材料になります。グラフの例はこんな感じです。3時間毎に10分かからず終わるジョブなので、ぴょんぴょんしたかわいいグラフになっています。
困ること
真面目に使うとこの辺が困ってきますが、自前でやるよりは大分便利ですね...。
- テストでAWS Batchのmockをやってくれるツールがない
- 例えばdynamodbだとツールがある
- コンソール画面がまだこなれていない
- 例えばジョブの完了日時でソートができない。大量にジョブが並んでいるときに目的のものを探すのに時間がかかる
- コンソール画面からは設定できるが、CloudFormationからは設定できない項目がある
- 例えばタイムアウト設定など
参考
Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版
- 作者: 玉川憲,片山暁雄,今井雄太,大澤文孝
- 出版社/メーカー: 日経BP社
- 発売日: 2017/04/13
- メディア: 単行本
- この商品を含むブログを見る