• AIPressRoom
  • Posts
  • Greatest practices and design patterns for constructing machine studying workflows with Amazon SageMaker Pipelines

Greatest practices and design patterns for constructing machine studying workflows with Amazon SageMaker Pipelines

Amazon SageMaker Pipelines is a completely managed AWS service for constructing and orchestrating machine studying (ML) workflows. SageMaker Pipelines presents ML utility builders the flexibility to orchestrate totally different steps of the ML workflow, together with information loading, information transformation, coaching, tuning, and deployment. You should use SageMaker Pipelines to orchestrate ML jobs in SageMaker, and its integration with the larger AWS ecosystem additionally lets you use assets like AWS Lambda capabilities, Amazon EMR jobs, and extra. This allows you to construct a personalized and reproducible pipeline for particular necessities in your ML workflows.

On this put up, we offer some greatest practices to maximise the worth of SageMaker Pipelines and make the event expertise seamless. We additionally talk about some frequent design situations and patterns when constructing SageMaker Pipelines and supply examples for addressing them.

Greatest practices for SageMaker Pipelines

On this part, we talk about some greatest practices that may be adopted whereas designing workflows utilizing SageMaker Pipelines. Adopting them can enhance the event course of and streamline the operational administration of SageMaker Pipelines.

Use Pipeline Session for lazy loading of the pipeline

Pipeline Session allows lazy initialization of pipeline assets (the roles are usually not began till pipeline runtime). The PipelineSession context inherits the SageMaker Session and implements handy strategies for interacting with different SageMaker entities and assets, corresponding to coaching jobs, endpoints, enter datasets in Amazon Simple Storage Service (Amazon S3), and so forth. When defining SageMaker Pipelines, it is best to use PipelineSession over the common SageMaker Session:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
position = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    position=position,
    sagemaker_session=pipeline_session,
)

Run pipelines in native mode for cost-effective and fast iterations throughout improvement

You’ll be able to run a pipeline in local mode utilizing the LocalPipelineSession context. On this mode, the pipeline and jobs are run domestically utilizing assets on the native machine, as a substitute of SageMaker managed assets. Native mode gives an economical technique to iterate on the pipeline code with a smaller subset of knowledge. After the pipeline is examined domestically, it may be scaled to run utilizing the PipelineSession context.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
position = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    position=position,
    sagemaker_session=local_pipeline_session,
)

Handle a SageMaker pipeline by means of versioning

Versioning of artifacts and pipeline definitions is a standard requirement within the improvement lifecycle. You’ll be able to create a number of variations of the pipeline by naming pipeline objects with a singular prefix or suffix, the commonest being a timestamp, as proven within the following code:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    title=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

Manage and monitor SageMaker pipeline runs by integrating with SageMaker Experiments

SageMaker Pipelines will be simply built-in with SageMaker Experiments for organizing and tracking pipeline runs. That is achieved by specifying PipelineExperimentConfig on the time of making a pipeline object. With this configuration object, you’ll be able to specify an experiment title and a trial title. The run particulars of a SageMaker pipeline get organized underneath the required experiment and trial. Should you don’t explicitly specify an experiment title, a pipeline title is used for the experiment title. Equally, in the event you don’t explicitly specify a trial title, a pipeline run ID is used for the trial or run group title. See the next code:

Pipeline(
    title="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name = ExecutionVariables.PIPELINE_NAME,
        trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=[...]
)

Securely run SageMaker pipelines inside a non-public VPC

To safe the ML workloads, it’s a greatest apply to deploy the roles orchestrated by SageMaker Pipelines in a safe community configuration inside a non-public VPC, non-public subnets, and safety teams. To make sure and implement the utilization of this safe setting, you’ll be able to implement the next AWS Identity and Access Management (IAM) coverage for the SageMaker execution role (that is the position assumed by the pipeline throughout its run). You too can add the coverage to run the roles orchestrated by SageMaker Pipelines in community isolation mode.

# IAM Coverage to implement execution inside a non-public VPC

{

    "Motion": [

        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ],

    "Useful resource": "*",
    "Impact": "Deny",
    "Situation": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# IAM Coverage to implement execution in community isolation mode
{

    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Deny",
            "Action": [
                "sagemaker:Create*"
            ],
            "Useful resource": "*",
            "Situation": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    ]
}

For an instance of pipeline implementation with these safety controls in place, discuss with Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker in a secure environment.

Monitor the price of pipeline runs utilizing tags

Utilizing SageMaker pipelines by itself is free; you pay for the compute and storage assets you spin up as a part of the person pipeline steps like processing, coaching, and batch inference. To mixture the prices per pipeline run, you’ll be able to embody tags in each pipeline step that creates a useful resource. These tags can then be referenced in the associated fee explorer to filter and mixture complete pipeline run value, as proven within the following instance:

sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    position=position,
    tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)

step_process = ProcessingStep(
    title="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

From the associated fee explorer, now you can get the associated fee filtered by the tag:

response = shopper.get_cost_and_usage(
    TimePeriod={
        'Begin': '2023-07-01',
        'Finish': '2023-07-15'
        },
    Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': [
                ‘SageMaker:Pipeline’
            ]
        },
        'Tags': {
            'Key': 'keyName',
            'Values': [
                'keyValue',
                ]
        }
    }
)

Design patterns for some frequent situations

On this part, we talk about design patterns for some frequent use instances with SageMaker Pipelines.

Run a light-weight Python perform utilizing a Lambda step

Python capabilities are omnipresent in ML workflows; they’re utilized in preprocessing, postprocessing, analysis, and extra. Lambda is a serverless compute service that permits you to run code with out provisioning or managing servers. With Lambda, you’ll be able to run code in your most well-liked language that features Python. You should use this to run customized Python code as a part of your pipeline. A Lambda step allows you to run Lambda capabilities as a part of your SageMaker pipeline. Begin with the next code:

%%writefile lambdafunc.py

import json

def lambda_handler(occasion, context):
    str1 = occasion["str1"]
    str2 = occasion["str2"]
    str3 = str1 + str2
    return {
        "str3": str3
    }

Create the Lambda perform utilizing the SageMaker Python SDK’s Lambda helper:

from sagemaker.lambda_helper import Lambda

def create_lambda(function_name, script, handler):
    response = Lambda(
        function_name=function_name,
        execution_role_arn=position,
        script= script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    function_arn = response['FunctionArn']
    return function_arn

fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Name the Lambda step:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Lambda Step
step_lambda1 = LambdaStep(
    title="LambdaStep1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Good day",
        "str2": " World"
    },
    outputs=[str3],
)

Cross information between steps

Enter information for a pipeline step is both an accessible information location or information generated by one of many earlier steps within the pipeline. You’ll be able to present this info as a ProcessingInput parameter. Let’s take a look at a number of situations of how you should use ProcessingInput.

Situation 1: Cross the output (primitive information varieties) of a Lambda step to a processing step

Primitive information varieties discuss with scalar information varieties like string, integer, Boolean, and float.

The next code snippet defines a Lambda perform that returns a dictionary of variables with primitive information varieties. Your Lambda perform code will return a JSON of key-value pairs when invoked from the Lambda step inside the SageMaker pipeline.

def handler(occasion, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

Within the pipeline definition, you’ll be able to then outline SageMaker pipeline parameters which are of a particular information sort and set the variable to the output of the Lambda perform:

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

position = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# 1. Outline the output params of the Lambda Step

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

# 2. Lambda step invoking the lambda perform and returns the Output

step_lambda = LambdaStep(
    title="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:perform:sagemaker_test_lambda",
        session=PipelineSession(),
        ),
    inputs={"arg1": "foo", "arg2": "foo1"},
    outputs=[
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
        ],
)

# 3. Extract the output of the Lambda

str_outputParam = step_lambda.properties.Outputs["output1"]

# 4. Use it in a subsequent step. For ex. Processing step

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
    position=position
)

processor_args = sklearn_processor.run(
    code="code/preprocess.py", #python script to run
    arguments=["--input-args", str_outputParam]
)

step_process = ProcessingStep(
    title="processstep1",
    step_args=processor_args,
)

Situation 2: Cross the output (non-primitive information varieties) of a Lambda step to a processing step

Non-primitive information varieties discuss with non-scalar information varieties (for instance, NamedTuple). You will have a state of affairs when it’s important to return a non-primitive information sort from a Lambda perform. To do that, it’s important to convert your non-primitive information sort to a string:

# Lambda perform code returning a non primitive information sort

from collections import namedtuple

def lambda_handler(occasion, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    [
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    ]
                )
return{
    "named_tuple_string": str(named_tuple)
}
#Pipeline step that makes use of the Lambda output as a “Parameter Enter”

output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Then you should use this string as an enter to a subsequent step within the pipeline. To make use of the named tuple within the code, use eval() to parse the Python expression within the string:

# Decipher the string in your processing logic code

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", sort=str, required=True)
    args = parser.parse_args()
    #use eval to acquire the named tuple from the string
    named_tuple = eval(args.named_tuple_string)

Situation 3: Cross the output of a step by means of a property file

You too can retailer the output of a processing step in a property JSON file for downstream consumption in a ConditionStep or one other ProcessingStep. You should use the JSONGet function to question a property file. See the next code:

# 1. Outline a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    position=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=[
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ],
            code="./native/preprocess.py",
            arguments=["--input-data", "s3://my-input"],
)

# 2. Outline a PropertyFile the place the output_name matches that with the one used within the Processor
hyperparam_report = PropertyFile(
    title="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

Let’s assume the property file’s contents had been the next:

{
    "hyperparam": {
        "eta": {
            "worth": 0.6
        }
    }
}

On this case, it may be queried for a particular worth and utilized in subsequent steps utilizing the JsonGet perform:

# 3. Question the property file
eta = JsonGet(
    step_name=step_process.title,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.worth",
)

Parameterize a variable in pipeline definition

Parameterizing variables in order that they can be utilized at runtime is commonly fascinating—for instance, to assemble an S3 URI. You’ll be able to parameterize a string such that it’s evaluated at runtime utilizing the Join perform. The next code snippet exhibits outline the variable utilizing the Be part of perform and use that to set the output location in a processing step:

# outline the variable to retailer the s3 URI
s3_location = Be part of(
    on="/", 
    values=[
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# outline the processing step
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    position=position,
)

# use the s3uri because the output location in processing step
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ],
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    title="PreprocessingJob”,
    step_args=processor_run_args,
)

Run parallel code over an iterable

Some ML workflows run code in parallel for-loops over a static set of things (an iterable). It might probably both be the identical code that will get run on totally different information or a distinct piece of code that must be run for every merchandise. For instance, you probably have a really massive variety of rows in a file and wish to velocity up the processing time, you’ll be able to depend on the previous sample. If you wish to carry out totally different transformations on particular sub-groups within the information, you may need to run a distinct piece of code for each sub-group within the information. The next two situations illustrate how one can design SageMaker pipelines for this goal.

Situation 1: Implement a processing logic on totally different parts of knowledge

You’ll be able to run a processing job with a number of cases (by setting instance_count to a worth larger than 1). This distributes the enter information from Amazon S3 into all of the processing cases. You’ll be able to then use a script (course of.py) to work on a particular portion of the information primarily based on the occasion quantity and the corresponding ingredient within the listing of things. The programming logic in course of.py will be written such {that a} totally different module or piece of code will get run relying on the listing of things that it processes. The next instance defines a processor that can be utilized in a ProcessingStep:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type="ml.m5.4xlarge",
    instance_count=4, #variety of parallel executions / cases
    base_job_name="parallel-step",
    sagemaker_session=session,
    position=position,
)

step_args = sklearn_processor.run(
    code="course of.py",
    arguments=[
        "--items", 
        list_of_items, #data structure containing a list of items
        inputs=[
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ],
    ]
)

Situation 2: Run a sequence of steps

When you have got a sequence of steps that should be run in parallel, you’ll be able to outline every sequence as an impartial SageMaker pipeline. The run of those SageMaker pipelines can then be triggered from a Lambda perform that’s a part of a LambdaStep within the father or mother pipeline. The next piece of code illustrates the state of affairs the place two totally different SageMaker pipeline runs are triggered:

import boto3
def lambda_handler(occasion, context):
    objects = [1, 2]
    #sagemaker shopper
    sm_client = boto3.shopper("sagemaker")
    
    #title of the pipeline that must be triggered.
    #if there are a number of, you'll be able to fetch obtainable pipelines utilizing boto3 api
    #and set off the suitable one primarily based in your logic.
    pipeline_name="child-pipeline-1"

    #set off pipeline for each merchandise
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name="child-pipeline-2"
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

Conclusion

On this put up, we mentioned some greatest practices for the environment friendly use and upkeep of SageMaker pipelines. We additionally supplied sure patterns which you can undertake whereas designing workflows with SageMaker Pipelines, whether or not you’re authoring new pipelines or are migrating ML workflows from different orchestration instruments. To get began with SageMaker Pipelines for ML workflow orchestration, discuss with the code samples on GitHub and Amazon SageMaker Model Building Pipelines.

In regards to the Authors

Pinak Panigrahi works with prospects to construct machine studying pushed options to resolve strategic enterprise issues on AWS. When not occupied with machine studying, he will be discovered taking a hike, studying a e-book or watching sports activities.

Meenakshisundaram Thandavarayan works for AWS as an AI/ ML Specialist. He has a ardour to design, create, and promote human-centered information and analytics experiences. Meena focusses on creating sustainable programs that ship measurable, aggressive benefits for strategic prospects of AWS. Meena is a connector, design thinker, and strives to drive enterprise to new methods of working by means of innovation, incubation and democratization.