SageMaker Endpoint Deployment
A quick and easy guide for creating an AWS SageMaker endpoint for your model
Developing a machine learning (ML) model involves key steps, from data collection to model deployment. After refining algorithms and ensuring performance through testing, the final crucial step is deployment. This phase transforms innovation into utility, allowing others to benefit from the model’s predictive capabilities. The deployed ML model bridges the gap between development and real-world impact, providing tangible benefits to users and stakeholders.
This guide covers the basic steps required to develop a custom ML as a SageMaker endpoint. At this point, I assume that you already have a working model and wish to expose it to the rest of the world via an endpoint. The guide will work you through deploying a PyTorch-based model that aims to predict anomalies in video clips. The model, aka AI VAD, is based on the paper “Attribute-based Representations for Accurate and Interpretable Video Anomaly Detection”, and its implementation of it can be found in the anomalib GitHub repository by OpenVINO. To read more about this interesting approach, please scroll down to the end of this blog to the Appendix section.
At this point, I want to emphasize that in this case, we can’t use the PyTorchModel abstraction specifically built for deploying PyTorch models for two reasons. The first reason is that we have the anomalib package as an additional dependency that is not included in the pre-built PyTorch Sagemaker image. The second reason is that the model requires additional information that was learned during the training step which is not part of the PyTorch model’s weights.
Below are the steps to achieve this goal:
- Write the Sagemaker model serving script
- Upload the Model to S3
- Upload a custom Docker image to AWS ECR
- Create a Model in SageMaker
- Create an Endpoint Configuration
- Create an Endpoint
- Invoke the Endpoint
Write the Sagemaker model serving script
The Sagemaker model serving script (inference.py) is an important component when creating a Sagemaker model. It bridges between machine learning models and real-world data. Essentially, it processes incoming requests, runs the model predictions, and returns the results. Thus, influencing an application’s decision-making process.
The inference.py script is composed of several key methods, each serving a unique purpose, collectively facilitating the model serving process. Below I listed the four main ones.
- The model_fn method is tasked with loading the trained model. It reads the model artifacts that have been saved and returns a model object that can be used for predictions. This method is called only once when the SageMaker model server is started.
- The input_fn method method takes request data and formats it into a form suitable for making predictions. For example, in the code below this function formats the data differently based on the source of the data (image bytes or list of S3 URIs) and whether the list of frames should be considered as one video clip.
- The predict_fn method takes the formatted request data and performs inference against the loaded model.
- Finally, the output_fn method is used. It takes the prediction result and formats it into a response message. For example, pack it as a JSON object.
The code for the Sagemaker model serving script can be found below.
import os
import json
import joblib
import torch
from PIL import Image
import numpy as np
import io
import boto3
from enum import Enum
from urllib.parse import urlsplit
from omegaconf import OmegaConf
from anomalib.data.utils import read_image, InputNormalizationMethod, get_transforms
from anomalib.models.ai_vad.torch_model import AiVadModel
device = "cuda"
class PredictMode(Enum):
frame = 1
batch = 2
clip = 3
def model_fn(model_dir):
"""
This function is the first to get executed upon a prediction request,
it loads the model from the disk and returns the model object which will be used later for inference.
"""
# Load the config file
config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml"))
config_model = config.model
# Load the model
model = AiVadModel(
box_score_thresh=config_model.box_score_thresh,
persons_only=config_model.persons_only,
min_bbox_area=config_model.min_bbox_area,
max_bbox_overlap=config_model.max_bbox_overlap,
enable_foreground_detections=config_model.enable_foreground_detections,
foreground_kernel_size=config_model.foreground_kernel_size,
foreground_binary_threshold=config_model.foreground_binary_threshold,
n_velocity_bins=config_model.n_velocity_bins,
use_velocity_features=config_model.use_velocity_features,
use_pose_features=config_model.use_pose_features,
use_deep_features=config_model.use_deep_features,
n_components_velocity=config_model.n_components_velocity,
n_neighbors_pose=config_model.n_neighbors_pose,
n_neighbors_deep=config_model.n_neighbors_deep,
)
# Load the model weights
model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False)
# Load the memory banks
velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(os.path.join(model_dir, "ai_vad_banks.joblib"))
if velocity_estimator_memory_bank is not None:
model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank
if pose_estimator_memory_bank is not None:
model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank
if appearance_estimator_memory_bank is not None:
model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank
model.density_estimator.fit()
# Move the entire model to device
model = model.to(device)
# get the transforms
transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None
image_size = (config.dataset.image_size[0], config.dataset.image_size[1])
center_crop = config.dataset.get("center_crop")
center_crop = tuple(center_crop) if center_crop is not None else None
normalization = InputNormalizationMethod(config.dataset.normalization)
transform = get_transforms(config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization)
return model, transform
def input_fn(request_body, request_content_type):
"""
The request_body is passed in by SageMaker and the content type is passed in
via an HTTP header by the client (or caller).
"""
print("input_fn-----------------------")
if request_content_type in ("application/x-image", "image/x-image"):
image = Image.open(io.BytesIO(request_body)).convert("RGB")
numpy_array = np.array(image)
print("numpy_array.shape", numpy_array.shape)
print("input_fn-----------------------")
return [numpy_array], PredictMode.frame
elif request_content_type == "application/json":
request_body_json = json.loads(request_body)
s3_uris = request_body_json.get("images", [])
if len(s3_uris) == 0:
raise ValueError(f"Images is a required key and should contain at least a list of one S3 URI")
s3 = boto3.client("s3")
frame_paths = []
for s3_uri in s3_uris:
parsed_url = urlsplit(s3_uri)
bucket_name = parsed_url.netloc
object_key = parsed_url.path.lstrip('/')
local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}"
# Download the frame from S3
s3.download_file(bucket_name, object_key, local_frame_path)
frame_paths.append(local_frame_path)
frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0)
predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch
print("frames.shape", frames.shape)
print("predict_mode", predict_mode)
print("input_fn-----------------------")
return frames, predict_mode
# If the request_content_type is not as expected, raise an exception
raise ValueError(f"Content type {request_content_type} is not supported")
def predict_fn(input_data, model):
"""
This function takes in the input data and the model returned by the model_fn
It gets executed after the model_fn and its output is returned as the API response.
"""
print("predict_fn-----------------------")
model, transform = model
frames, predict_mode = input_data
processed_data = {}
processed_data["image"] = [transform(image=frame)["image"] for frame in frames]
processed_data["image"] = torch.stack(processed_data["image"])
image = processed_data["image"].to(device)
# Add one more dimension for a batch size of one clip
if predict_mode == PredictMode.clip:
image = image.unsqueeze(0)
print("image.shape", image.shape)
model.eval()
with torch.no_grad():
boxes, anomaly_scores, image_scores = model(image)
print("boxes_len", [len(b) for b in boxes])
processed_data["pred_boxes"] = [box.int() for box in boxes]
processed_data["box_scores"] = [score.to(device) for score in anomaly_scores]
processed_data["pred_scores"] = torch.Tensor(image_scores).to(device)
print("predict_fn-----------------------")
return processed_data
def output_fn(prediction, accept):
"""
Post-processing function for model predictions. It gets executed after the predict_fn.
"""
print("output_fn-----------------------")
# Check if accept type is JSON
if accept != "application/json":
raise ValueError(f"Accept type {accept} is not supported")
# Convert PyTorch Tensors to lists so they can be JSON serializable
for key in prediction:
# If torch.Tensor convert it to list
if isinstance(prediction[key], torch.Tensor):
prediction[key] = prediction[key].tolist()
# If list, convert every tensor in the list
elif isinstance(prediction[key], list):
prediction[key] = [tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]]
print("output_fn-----------------------")
return json.dumps(prediction), accept
P.S. It is strongly recommended to test the model serving script before moving forward to the next step. This can be done easily by simulating the invocation pipeline as shown in the code below.
import json
from inference import model_fn, predict_fn, input_fn, output_fn
response, accept = output_fn(
predict_fn(
input_fn(payload, "application/x-image"),
model_fn("../")
),
"application/json"
)
json.loads(response).keys()
Upload the Model to S3
To create a SageMaker endpoint that loads the AI VAD PyTorch model in the exact same state, we need the following files:
- AI VAD PyTorch model’s weights (aka state_dict)
- Density estimator memory banks (which are not part of the model’s weights)
- A config file with the hyperparameters of the PyTorch model
- A Sagemaker model serving script (inference.py)
The code below demonstrates how to organize all the required files in one directory.
P.S., I overrode the built-in PyTorch ModelCheckpoint callback to ensure those memory banks are being saved as part of the checkpoint saving (implementation can be found here).
import torch
import joblib
import shutil
checkpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"
config_path = "results/ai_vad/ucsd/run/config.yaml"
model_weights = torch.load(checkpoint)
model_state_dict = model_weights["state_dict"]
torch.save(model_state_dict, "../ai_vad_weights.pth")
velocity_estimator_memory_bank = None
pose_estimator_memory_bank = None
appearance_estimator_memory_bank = None
if "velocity_estimator_memory_bank" in model_weights:
velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]
if "pose_estimator_memory_bank" in model_weights:
pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]
if "appearance_estimator_memory_bank" in model_weights:
appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]
banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)
joblib.dump(banks, "../ai_vad_banks.joblib")
shutil.copyfile(config_path, "../ai_vad_config.yaml")
Then, the four files were zipped together to create the tar.gz using the command below.
tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py
Lastly, the file was uploaded to S3 using boto3.
import boto3
from datetime import datetime
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
s3 = boto3.resource('s3')
s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")
Upload a custom Docker image to AWS ECR
As mentioned above, since we have an additional dependency that is not included in the pre-built PyTorch Sagemaker image (i.e., anomalib package), we created a new Docker image for that purpose. Before building the custom Docker image, authentication to the Amazon ECR repository is required.
REGION=<my_aws_region>
ACCOUNT=<my_aws_account>
# Authenticate Docker to an Amazon ECR registry
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com
# Loging to your private Amazon ECR registry
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com
The Dockerfile can be found below and the different Docker registry paths can be found here. Make sure to select the right registry path based on the model’s needs (CPU/GPU, Python version, etc.) and your AWS region. For example, if the region is us-east-1 the full Docker registry path should look similar to this:
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Use the SageMaker PyTorch image as the base image
FROM <docker_registry_url>.dkr.ecr.<my_aws_region>.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Install the additional dependency
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"
Now, we can run the classic Docker build command to build this custom image.
docker build -t ai-vad-image .
The next step is to create the AWS ECR repository for the new image we built, tag it, and push the image to the AWS ECR repository.
# Create the AWS ECR repository
aws ecr create-repository --repository-name ai-vad-image
# Tag the image
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# Push the tagged image to the AWS ECR repository
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
Create a Model in SageMaker
This step is pretty straightforward. Code below.
import boto3
import sagemaker
sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()
model_name = f"ai-vad-model-{current_datetime}"
primary_container = {
"Image": f"{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest",
"ModelDataUrl": f"s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz"
}
create_model_response = sagemaker_client.create_model(
ModelName=model_name,
ExecutionRoleArn=role,
PrimaryContainer=primary_container)
Create an Endpoint Configuration
The next step is to create an endpoint configuration. Below you can find a basic one.
endpoint_config_name = f"ai-vad-model-config-{current_datetime}"
sagemaker_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[{
"InstanceType": "ml.g5.xlarge",
"InitialVariantWeight": 1,
"InitialInstanceCount": 1,
"ModelName": model_name,
"VariantName": "AllTraffic"}])
Create an Endpoint
Now we are ready to create the endpoint itself.
endpoint_name = f"ai-vad-model-endpoint-{current_datetime}"
sagemaker_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name)
Please note that it might take a few minutes till the status of the endpoint changes from “Creating” to “InService”. The current status can be checked as shown below.
response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]
Invoke the Endpoint
The money time has come. Now it’s time to invoke the endpoint to test everything works as expected.
with open(file_name, "rb") as f:
payload = f.read()
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type="image/x-image")
predictor.predict(payload)
So, this is a nice check but you should take into account that the predictor.predict function does not run the full invocation pipeline from the SageMaker serving script that includes:
output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)
To test it as well, let’s invoke the model using an API call.
with open(file_name, "rb") as f:
payload = f.read()
sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType="image/x-image",
Body=payload
)
response = json.loads(response["Body"].read().decode())
Using the great visualization anomalib provides, we can draw the boxes and their labels for a given frame from the UCSDped2 dataset.
Conclusion
OK, let’s quickly wrap up what we covered here. Deploying a SageMaker model for serving requires a series of steps.
First, the Sagemaker model serving script must be written to define the functionality and behavior of the model.
The model is then uploaded to Amazon S3 for storage and retrieval.
Additionally, a custom Docker image is uploaded to the AWS Elastic Container Registry (ECR) to containerize the model and its dependencies.
The next step involves creating a model in SageMaker, which associates the model artifacts stored in S3 with the Docker image stored in ECR.
An endpoint configuration is then created, defining the number and type of instances to use for hosting the model.
Finally, an endpoint is created to establish a live connection between the deployed model and client applications, allowing them to invoke the endpoint and make real-time predictions.
Through these steps, deploying a SageMaker model becomes a streamlined process that ensures efficient and reliable model serving.
Appendix
The Attribute-based Representations for Accurate and Interpretable Video Anomaly Detection paper published in 2023 by Reiss et al. that proposes a simple but highly effective method for video anomaly detection (VAD) using attribute-based representations.
The paper argues that traditional VAD methods, which often rely on deep learning, are often difficult to interpret, making it difficult for users to understand why the system is flagging certain frames or objects as anomalous.
To address this issue, the authors propose a method that represents each object in a video by its velocity, pose, and depth. These attributes are easy to understand and interpret, and they can be used to compute anomaly scores using a density-based approach.
The paper shows that this simple representation is sufficient to achieve state-of-the-art performance on several challenging VAD datasets, including ShanghaiTech, the largest and most complex VAD dataset.
In addition to being accurate, the authors also show that their method is interpretable. For example, they can provide users with a list of the objects in a video that are contributing most to its anomaly score, along with their velocity, pose, and deep information. This can help users to understand why the system is flagging the video as anomalous.
Overall, this paper is a significant contribution to the field of VAD. It proposes a simple, accurate, and interpretable method for VAD that can be used in a variety of applications.
Deploy a Custom ML Model as a SageMaker Endpoint was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
Originally appeared here:
Deploy a Custom ML Model as a SageMaker Endpoint
Go Here to Read this Fast! Deploy a Custom ML Model as a SageMaker Endpoint