機械学習のモデルを学習する、あるいは推論APIを作成するにあたってSageMakerはとても便利ですが、 定期実行する機能をSageMaker自身では持っていない という問題があります。
このエントリでは、Serverless Framework(以降、Serverless)を使って素早くSageMakerの定期実行環境を構築する方法を紹介します。
以下が構成図です。
この環境を構築は、下記を前提としているのでご留意ください。
- VPC、サブネットは既に作成しているものを利用する
- 学習用のデータはこの環境以外からアップロードされる
またコードはすべて以下のリポジトリにアップロードしており、これをベースに説明をしていきます。
環境
- serverless framework: 1.49.0
- sagemaker(Python SDK): 1.35.0
- boto3: 1.9.199
CFでSageMaker、Fargate、S3環境を作成
Serverlessの resources
にCloudFormationを書いてそれぞれの環境を作成します。
それぞれの命名などは custom
に記載しています。
serverless.yml
custom: resource_base: ${self:service}-${self:custom.stage} fargate: dockerfile: train.dockerfile ecr_uri: ${self:custom.config.account_id}.dkr.ecr.ap-northeast-1.amazonaws.com/${self:custom.fargate.ecr_repo} tag: ${self:custom.stage} cluster_name: ${self:custom.resource_base}-ecs-cluster container_name: ${self:custom.resource_base}-ecs-container ecr_repo: ${self:service}-ecr-image s3: bucket: ${self:custom.resource_base}-bucket sagemaker: endpoint_name: ${self:custom.resource_base}-endpoint
前述したようにVPCとサブネットは事前に生成されている前提なので、 env.yml
に記載してください。
env.yml
account_id: xxxxxxxxxxxxxx vpc_id: vpc-xxxxxxxx subnet_ids: - subnet-xxxxxxx - subnet-xxxxxxx
serverless.yml
custom: config: # ここだけ設定すればOK account_id: ${file(./env.yml):account_id} vpc_id: ${file(./env.yml):vpc_id} subnet_ids: ${file(./env.yml):subnet_ids}
CloudFormationでの環境構築に関しては、下のようにファイルを分けて記載しています。
serverless.yml
resources: - ${file(./cloudformations/sagemaker.yml)} - ${file(./cloudformations/fargate.yml)} - ${file(./cloudformations/s3.yml)}
それぞれのファイルをここに記載すると、かなり長くなってしまうのでGitHubをご参照ください。
リソース構築はデプロイすればOKです。
sls deploy
# => 構築OK
うまくいけば下記のように作成されます。(ロールはスクショ省略してます)
S3
Fargate Cluster, Service
Fargate Task Definition
ECR Repository
データアップロード
S3リポジトリが作成されたのでデータをアップロードします。 今回はサンプルとしてUCI Machine Learning RepositoryのWine Qualityを用います。
前述したように、このデータは外部からアップロードされる前提なので、今回は適当なシェルスクリプトでアップロードします。
wget https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv aws s3 cp winequality-red.csv s3://scheduled-sagemaker-app-dev-bucket/inputs/raw/data.csv
データがアップロードできました。
ECR Repositoryにpush
次にFargateで使うDocker Imageをpushします。
Dockerファイルは前処理が行えて、Sagemakerを操作できるようなイメージにしておきます。
train.dockerfile
FROM python:3.7 RUN pip install -U pip boto3 sagemaker pandas numpy s3fs sklearn WORKDIR /root/workspace ADD train.py train.py CMD ["python", "train.py"]
train.py
は前処理と学習、デプロイを行うスクリプトで後述します。
ECRへのpushはServerless Puginを使って行う方法もありますが、複雑になるので、今回はコンソールでコマンドを確認しながらシェルスクリプトを書きました。
./continainer/push_ecr.sh
#!/bin/bash # Variables IMAGE=scheduled-sagemaker-app-ecr-image TAG=dev # stageによって切り替える DOCKERFILE=train.dockerfile ECR_URI=xxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/scheduled-sagemaker-app-ecr-image # Build image docker build -t "${IMAGE}:${TAG}" -f "${DOCKERFILE}" . # Docker login $(aws ecr get-login --no-include-email --region ap-northeast-1) # Tag that image docker tag "${IMAGE}:${TAG}" "${ECR_URI}:${TAG}" # Push docker push "${ECR_URI}:${TAG}"
cd container chmod +x push_ecr.sh ./push_ecr.sh
コンソールでECRにデプロイできていることが確認できます。
前処理、学習、デプロイの詳細
前処理、学習、デプロイは前述したDocker Image内の train.py
で行っています。
前処理はtrainデータとvalidationデータを分割し、SageMakerのデータ形式に合わせるだけ、学習はXGBoostで回帰モデルを作るのみになっています。
train.py
def preprocess(): # load df = pd.read_csv(S3_INPUT_PATH, sep=';') # transform y_col = 'quality' x_cols = [c for c in df.columns if c != y_col] df = df[[y_col] + x_cols] # split train_data, val_data = train_test_split(df, test_size=(VAL_SIZE), random_state=RANDOM_STATE) # upload train_data.to_csv(S3_TRAIN_PATH, header=False, index=False) val_data.to_csv(S3_VALID_PATH, header=False, index=False) def train(): boto_session = boto3.Session(region_name=REGION_NAME) sagemaker_session = sagemaker.Session(boto_session=boto_session) xgb = sagemaker.estimator.Estimator( image_name=TRAIN_IMAGE_NAME, role=SM_ROLE, train_instance_count=TRAIN_INSTANCE_COUNT, train_instance_type=TRAIN_INSTANCE_TYPE, output_path=S3_OUTPUT_PATH, sagemaker_session=sagemaker_session ) # train train_params = { "eta": 0.2, "objective":"reg:linear", "max_depth":"5", "eval_metric": 'rmse', "num_round":100 } xgb.set_hyperparameters(**train_params) xgb.fit({ 'train': s3_input(s3_data=S3_TRAIN_PATH, content_type="csv"), 'validation': s3_input(s3_data=S3_VALID_PATH, content_type="csv") }) # deploy deploy_params = { "initial_instance_count": DEPLOY_INSTANCE_COUNT, "instance_type": DEPLOY_INSTANCE_TYPE, "endpoint_name": DEPLOY_ENDPOINT_NAME, "update_endpoint": True } xgb.deploy(**deploy_params) if __name__ == "__main__": preprocess() train()
ハイパーパラメータは環境変数として渡しました。スクリプト全体はGitHubをご参照ください。
scheduled-sagemaker-sample/train.py at master · ikedaosushi/scheduled-sagemaker-sample · GitHub
Lambdaからの呼び出し
準備したFargateをLambdaから呼び出します。必要な情報は provider.environment
で環境変数として渡します。
serverless.yml
provider: #...(略) environment: CLUSTER_NAME: ${self:custom.fargate.cluster_name} TASK_DEFINITION: !Ref ECSTaskDefinition ECS_CONTAINER_NAME: ${self:custom.fargate.container_name} S3_BUCKET: ${self:custom.s3.bucket} SM_ROLE: !GetAtt SageMakerServiceRole.Arn DEPLOY_ENDPOINT_NAME: ${self:custom.sagemaker.endpoint_name} REGION_NAME: ${self:custom.region} SUBNET_IDS: !Join - "," - ${self:custom.config.subnet_ids} functions: train: handler: handler.train
handlerではboto3を使ってFargateを呼び出します。
handler.py
def train(event, context): fargate = boto3.client("ecs", region_name=REGION_NAME) resp = fargate.run_task( cluster=CLUSTER_NAME, taskDefinition=TASK_DEFINITION, launchType='FARGATE', networkConfiguration={ 'awsvpcConfiguration': { 'subnets': SUBNET_IDS.split(",") , 'assignPublicIp': 'ENABLED' } }, overrides={ "containerOverrides": [ { 'name': ECS_CONTAINER_NAME, 'environment': [ {"name": k, "value": v} for k, v in CONTAINER_ENV.items() ] } ] } ) url_base = "https://ap-northeast-1.console.aws.amazon.com/ecs/home?region=ap-northeast-1#/clusters/" cluster_name = CLUSTER_NAME task_id = resp['tasks'][0]['taskArn'].split("/")[-1] url = url_base + cluster_name + "/tasks/" + task_id + "/details" message = f"In order to check task, go to {url}" return { "ok": True, "message": message }
デプロイしたあと invoke
して動作を確認します。
sls deploy sls invoke -f train # { # "ok": true, # "message": "In order to check task, go to https://ap-northeast-1.console.aws.amazon.com/ecs/home?region=ap-northeast-1#/clusters/scheduled-sagemaker-app-dev-ecs-cluster/tasks/05d35352-7db0-49f1-8d22-c15f2dbb42e1/details" # }
コンソールからFargateのタスクが生成されて実行されていることが確認できます。
また同様にコンソールからSageMaker上で学習がされていることとエンドポイントが作成(or更新)されていることが確認できます。
定期実行
若干長い道のりになりましたが、ServerlessからSageMakerが実行できることが確認できたので、あとは functions
の events.schedule[]
に設定するだけです。
serverless.yml
functions: train: handler: handler.train events: - schedule: rate: cron(0 15 * * ? *) # 毎朝AM0:00に実行
sls deploy
これで毎朝AM0:00にSageMakerの学習、デプロイをする定期バッチを作ることができました。
Clean Up
検証が終わって使ったものを消したい方向けです。 Serverlessはコマンドでほとんどのリソースは削除できます。
sls remove
SageMakerのEndpointはコマンドでは削除されないので、コンソールから削除してください。 Endpointは削除しない限り料金が発生するのでご注意ください。
さいごに
SageMakerの定期実行バッチをServerless FrameworkとFargateでつくる方法を紹介しました。Fargateの部分はAWS Batchに置き換えても同様に実現できます。AWS Batchの場合は以前近い内容のブログを書いたのでそちらなど参考にしていただけると思います。
AWS BatchではなくFargateを使った理由は、ただ単にFargateの勉強をしたかっただけなので、カスタマイズ要件が特になければAWS Batchを使うかなと思います。
繰り返しになりますが、コードはすべてGitHubで公開しています。
間違いや質問などあれば気軽にコメントください。