AWS Lambdaで行えないような重い処理はAWS Batchを使うと簡単に行うことができますが、定期実行したりエンドポイントを作る仕組みはAWS BatchにはまだなくCloudWatchやAPI Gatewayと組み合わせる必要がありますがServerlessと組み合わせることで簡単に実現できるので紹介します。
つくるもの
今回は次のサンプルを作ってみます。
- 毎日AWS Batchで実行する定期バッチ
- AWS Batchを実行するAPI
環境
- serverless: 1.41.1
- aws-cli: 1.16.144
ファイル構成
最終的なファイル構成は次のようになります。 AWS Batchに関するファイルだけフォルダを分けています。
tree -L 2 ├── batch │ ├── app.dockerfile │ ├── ecr_deploy.sh │ └── sample.py ├── handler_.py └── serverless.yml
動かすバッチ
今回はサンプルとして引数を表示するスクリプトをバッチとして実行します。
batch/samle.py
import sys if __name__ == "__main__": print(sys.argv)
事前準備: ECRにDockerイメージの登録
事前準備として、ECRにDockerイメージを登録しておきます。
ECRリポジトリを作成します。
aws ecr create-repository --repository-name serverless-batch-example # { # "repository": { # "repositoryArn": "arn:aws:ecr:ap-northeast-1:xxxxxxxxxxxxx:repository/serverless-batch-example", # "registryId": "xxxxxxxxxxxxx", # "repositoryName": "serverless-batch-example", # # これをメモ # "repositoryUri": "xxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example", # "createdAt": 1556336732.0 # } # }
作成できました。repositoryUriは使うのでメモっておきましょう。 それではDockerイメージを作って登録します。
次のようにDockerfileとデプロイ用のshell scriptを作成します。
batch/app.docker
FROM python:3.7 ADD sample.py sample.py
batch/ecr_push.sh
#!/bin/bash # Variables IMAGE=serverless-batch-example TAG=dev DOCKERFILE=app.dockerfile ## さっきメモしたrepositoryUriを貼り付け ECR_URI=xxxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example # Build image docker build -t "${IMAGE}:${TAG}" -f "${DOCKERFILE}" . # Docker login $(aws ecr get-login --no-include-email --region ap-northeast-1) # Tag that image docker tag "${IMAGE}:${TAG}" "${ECR_URI}:${TAG}" # Push docker push "${ECR_URI}:${TAG}"
あとはこのshell scriptを実行します。
cd batch chmod +x ecr_push.sh ./ecr_push.sh # => ecrにDockerイメージのデプロイが行われる
コンソールから登録されていることを確認できます。
AWS Batch/Lambda実装
それでは前置きが長くなりましたが、Lambdaのアプリケーションを作成していきます。
sls create -t aws-python3 -n serverless-batch-example
CloudFormation部分
ServerlessではresourceフィールドでCloudFormationを使ってAWS BatchなどのStackを作成できるので活用しました。 設定にあたって以下の記事を参考にさせてもらいました。
ちょっと長くなりますが、serverless.ymlは次のようになります。前提として VPC・Subnet・Security Groupは既存のものを使う という設計にしています。 クラスメソッドさんのブログではVPCから作っていましたが、どちらかというとVPCなどはすでにあるものを使うほうがユースケースとして多いと思ったので、今回そうしました。 長いですが、皆さんの環境に合わせないといけない部分はcustomフィールドにすべて集約させているので、そこだけ編集してもらえればあとは使い回せると思います。
serverless.yml
service: serverless-batch-example # 設定はここだけ custom: default_stage: dev region: ap-northeast-1 stage: ${opt:stage, self:custom.default_stage} batch: compute_env: ${self:service}-compute-env-${self:custom.stage} job_queue: ${self:service}-job-queue-${self:custom.stage} job_queue_arn: !Ref SlsJobQueue job_definition: ${self:service}-job-definition-${self:custom.stage} commands: - python - sample.py - Ref::first_arg # 引数のプレースホルダを設定 - Ref::second_arg subnet_ids: subnet-xxxxxxxx,subnet-xxxxxxx,subnet-xxxxxxxxx # 設定したいsubnetを記載 security_groups: sg-xxxxxxxxxxxxxxx # 設定したいsecurity groupを記載 # メモっておいたrepositoryUriを設定(タグがある場合は :TAG で追加) repo_uri: xxxxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example provider: name: aws runtime: python3.7 stage: ${self:custom.stage} region: ${self:custom.region} environment: BATCH_JOB_QUEUE_ARN: ${self:custom.batch.job_queue_arn} BATCH_JOB_DEFINITION: ${self:custom.batch.job_definition} iamRoleStatements: - Effect: Allow Action: batch:* Resource: arn:aws:batch:* functions: batch: handler: handler.batch resources: Parameters: SubnetIds: Type: List<AWS::EC2::Subnet::Id> Default: ${self:custom.subnet_ids} SecurityGroupIds: Type: List<AWS::EC2::SecurityGroup::Id> Default: ${self:custom.security_groups} Resources: BatchServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - batch.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole ecsInstanceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - ec2.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role ecsInstanceProfile: Type: AWS::IAM::InstanceProfile Properties: Roles: - !Ref ecsInstanceRole SlsComputeEnv: # ComputeEnvironment Type: AWS::Batch::ComputeEnvironment Properties: Type: MANAGED ServiceRole: !GetAtt BatchServiceRole.Arn ComputeEnvironmentName: ${self:custom.batch.compute_env} ComputeResources: MaxvCpus: 256 # 最大vCPU数 MinvCpus: 0 # 最小vCPU数 SecurityGroupIds: !Ref SecurityGroupIds InstanceRole: !GetAtt ecsInstanceProfile.Arn Subnets: !Ref SubnetIds Type: EC2 InstanceTypes: - optimal State: ENABLED SlsJobQueue: # JobQueue Type: AWS::Batch::JobQueue Properties: JobQueueName: ${self:custom.batch.job_queue} ComputeEnvironmentOrder: - Order: 1 ComputeEnvironment: !Ref SlsComputeEnv State: ENABLED Priority: 1 SlsJobDefinition: # JobDefinition Type: AWS::Batch::JobDefinition Properties: Type: container JobDefinitionName: ${self:custom.batch.job_definition} ContainerProperties: Command: ${self:custom.batch.commands} Memory: 4048 Vcpus: 2 Image: ${self:custom.repo_uri}
ポイントとしてはAWS Batchにjobを送信するにあたって、Job DefinitionのARNが必要なのでCloudFormationの Ref
関数を使って設定しています。
これでとりあえずデプロイするとAWS Batchの環境が作られます。
sls deploy
Compute Environment
Job Queue
Job Definition
function部分
実行する関数は次のように実装しました。環境変数からAWS BatchのJobの情報を読んでboto3を使ってJobを送信するだけの関数です。
handler.py
import json import os import boto3 BATCH_JOB_QUEUE_ARN = os.environ.get("BATCH_JOB_QUEUE_ARN") BATCH_JOB_DEFINITION = os.environ.get("BATCH_JOB_DEFINITION") def batch(event, context): client = boto3.client('batch') job_name = "lambda_example_job" result = client.submit_job( jobName = job_name, jobQueue = BATCH_JOB_QUEUE_ARN, jobDefinition = BATCH_JOB_DEFINITION, parameters = { "first_arg": "hello", "second_arg": "world" } ) response = { "statusCode": 200, "body": json.dumps(result) } return response
実装できたので、もう一度デプロイします。
sls deploy
デプロイできたら関数を呼び出してみましょう。
sls invoke -f batch
エラーが起こらなければjobが送信されているはずです。
定期実行化/API化
ここまでできれば、あとはfunctionsフィールドに追加するだけなので簡単です。 例えば次のように設定します。
serverless.yml
functions: batch: handler: handler.batch batch_daily: # 1日おきに実行 handler: handler.batch events: - schedule: rate(1 day) batch_api: # APIエンドポイントを作成 handler: handler.batch events: - http: path: batch method: get
定期実行
APIエンドポイント
無事できているようです。 さらに細かい設定に関してはServerlessのドキュメントをチェックしてみてください。
さいごに
AWS BatchとServerlessを使って高速に定期実行バッチを作ることができました。 Serverlessの良いところはCloudFormationで作成したResourceとLambdaで実装するアプリケーションの情報共有が簡単にできるところだと思います。 AWS Batchの使い所としては、簡単な処理はLambdaで実装しつつ、複雑なデータ処理など必ずLambdaでは足りない場合が出てくるので、そうしたときにLambdaからプロキシするような形で使っていくのが1つの良い使い方ではないかと思います。
今回使ったコードはすべてGitHubにあげてあります。 github.com
何かコメントや改善点などありましたら教えてください!