{"id":805,"date":"2021-09-05T15:52:51","date_gmt":"2021-09-05T15:52:51","guid":{"rendered":"https:\/\/salarydistribution.com\/machine-learning\/2021\/09\/05\/define-and-run-machine-learning-pipelines-on-step-functions-using-python-workflow-studio-or-states-language\/"},"modified":"2021-09-05T15:52:51","modified_gmt":"2021-09-05T15:52:51","slug":"define-and-run-machine-learning-pipelines-on-step-functions-using-python-workflow-studio-or-states-language","status":"publish","type":"post","link":"https:\/\/salarydistribution.com\/machine-learning\/2021\/09\/05\/define-and-run-machine-learning-pipelines-on-step-functions-using-python-workflow-studio-or-states-language\/","title":{"rendered":"Define and run Machine Learning pipelines on Step Functions using Python, Workflow Studio, or States Language"},"content":{"rendered":"<div id=\"\">\n<p>You can use various tools to define and run machine learning (ML) pipelines or DAGs (Directed Acyclic Graphs). Some popular options include <a href=\"http:\/\/aws.amazon.com\/step-functions\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Step Functions<\/a>, Apache Airflow, KubeFlow Pipelines (KFP), TensorFlow Extended (TFX), Argo, Luigi, and <a href=\"https:\/\/aws.amazon.com\/sagemaker\/pipelines\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker Pipelines<\/a>. All these tools help you compose pipelines in various languages (JSON, YAML, Python, and more), followed by viewing and running them using a workflow orchestrator. A deep comparison of each of these options is out of scope for this post, and involves appropriately selecting and benchmarking tools for your specific use case.<\/p>\n<p>In this post, we discuss how to author end-to-end ML pipelines in Step Functions using three different methods:<\/p>\n<h2>Solution overview<\/h2>\n<p>In this post, we create a simple workflow that involves a training step, creating a model, configuring an endpoint, and deploying the model<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image001.png\"><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-27584 aligncenter\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image001.png\" alt=\"\" width=\"556\" height=\"99\"><\/a><\/p>\n<p>You can also create more complex workflows involving other steps such as <a href=\"https:\/\/aws.amazon.com\/blogs\/aws\/amazon-sagemaker-processing-fully-managed-data-processing-and-model-evaluation\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker Processing<\/a>, or automatic model tuning (HPO). You can also use Step Functions to integrate with other AWS services such as <a href=\"http:\/\/aws.amazon.com\/lambda\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Lambda<\/a>, <a href=\"https:\/\/aws.amazon.com\/dynamodb\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon DynamoDB<\/a>, <a href=\"https:\/\/aws.amazon.com\/glue\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Glue<\/a>, <a href=\"http:\/\/aws.amazon.com\/emr\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon EMR<\/a>, <a href=\"http:\/\/aws.amazon.com\/athena\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Athena<\/a>, <a href=\"https:\/\/aws.amazon.com\/eks\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Elastic Kubernetes Service<\/a> (Amazon EKS), and <a href=\"https:\/\/aws.amazon.com\/fargate\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Fargate<\/a>. For more information on supported services, see <a href=\"https:\/\/docs.aws.amazon.com\/step-functions\/latest\/dg\/connect-supported-services.html\" target=\"_blank\" rel=\"noopener noreferrer\">Supported AWS Service Integrations for Step Functions<\/a>. We provide guidance on other similar pipeline tools later in this post.<\/p>\n<p>In this post, we use the MNIST dataset, which is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28\u00d728 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images. There are 10 classes (one for each of the 10 digits). The code used here closely follows a <a href=\"https:\/\/github.com\/aws\/amazon-sagemaker-examples\/tree\/master\/step-functions-data-science-sdk\/training_pipeline_pytorch_mnist\" target=\"_blank\" rel=\"noopener noreferrer\">similar use case<\/a> where the task is to classify each input image as one of the 10 digits (0\u20139).<\/p>\n<p>The main training code uses a class from the<a href=\"https:\/\/github.com\/aws\/amazon-sagemaker-examples\/blob\/master\/step-functions-data-science-sdk\/training_pipeline_pytorch_mnist\/mnist.py\" target=\"_blank\" rel=\"noopener noreferrer\"> standard PyTorch example<\/a> for the model definition:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">class Net(nn.Module):\n    def __init__(self):\n        super(Net, self).__init__()\n        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)\n        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)\n        self.conv2_drop = nn.Dropout2d()\n        self.fc1 = nn.Linear(320, 50)\n        self.fc2 = nn.Linear(50, 10)\n\n    def forward(self, x):\n        x = F.relu(F.max_pool2d(self.conv1(x), 2))\n        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))\n        x = x.view(-1, 320)\n        x = F.relu(self.fc1(x))\n        x = F.dropout(x, training=self.training)\n        x = self.fc2(x)\n        return F.log_softmax(x, dim=1)<\/code><\/pre>\n<\/p><\/div>\n<p>The main training function works for both single instance and distributed training, and does so by checking ARGs:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">is_distributed = len(args.hosts) &gt; 1 and args.backend is not None\nif is_distributed:\n        # Initialize the distributed environment.\n        world_size = len(args.hosts)\n        os.environ['WORLD_SIZE'] = str(world_size)\n        host_rank = args.hosts.index(args.current_host)\n        os.environ['RANK'] = str(host_rank)\n        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)\n        logger.info('Initialized the distributed environment: '{}' backend on {} nodes. '.format(\n            args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(\n            dist.get_rank(), args.num_gpus))            <\/code><\/pre>\n<\/p><\/div>\n<p>The number of hosts is conveniently stored in an <a href=\"https:\/\/aws.amazon.com\/sagemaker\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker<\/a> environment variable, which can also be passed in as an argument:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))<\/code><\/pre>\n<\/p><\/div>\n<p>Next, we load the datasets from the default data directory:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">(os.environ['SM_MODEL_DIR']):\n\ntrain_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)\ntest_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs)\n<\/code><\/pre>\n<\/p><\/div>\n<p>We then enter the main training loop:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">for\u00a0epoch in\u00a0range(1, args.epochs + 1):\nmodel.train()<\/code><\/pre>\n<\/p><\/div>\n<p>Finally, we save the model:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">def save_model(model, model_dir):\n    logger.info(\"Saving the model.\")\n    path = os.path.join(model_dir, 'model.pth')\n    # recommended way from http:\/\/pytorch.org\/docs\/master\/notes\/serialization.html\n    torch.save(model.cpu().state_dict(), path)<\/code><\/pre>\n<\/p><\/div>\n<p>This code is stored in a file called <code>mnist.py<\/code> and used in later steps (see the full code on <a href=\"https:\/\/github.com\/aws\/amazon-sagemaker-examples\/blob\/master\/step-functions-data-science-sdk\/training_pipeline_pytorch_mnist\/mnist.py\" target=\"_blank\" rel=\"noopener noreferrer\">GitHub<\/a>). The following are two important takeaways in connection to the pipeline:<\/p>\n<ul>\n<li>You can use the data input location on <a href=\"http:\/\/aws.amazon.com\/s3\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Simple Storage Service<\/a> (Amazon S3) as a parameter for the training step in a pipeline. This data is delivered to the training container, the local path of which is stored in an environment variable (for example, <code>SAGEMAKER_CHANNEL_TRAINING<\/code>).<\/li>\n<li>The model is shown here as being saved locally in <code>model_dir<\/code>; the local path of the model directory (<code>\/opt\/ml\/model<\/code>) is stored in an environment variable (<code>SM_MODEL_DIR<\/code>). At the end of the SageMaker training job, the model is copied to an Amazon S3 location so that model and endpoint related pipeline steps can access the model.<\/li>\n<\/ul>\n<p>Now let\u2019s look at our three methods to author end-to-end ML pipelines in Step Functions.<\/p>\n<h2>Use the Step Functions Data Science SDK<\/h2>\n<p>The Step Functions Data Science SDK is an open-source library that lets you create workflows entirely in Python. Installing this SDK is as simple as entering the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-bash\">pip\u00a0install\u00a0stepfunctions<\/code><\/pre>\n<\/p><\/div>\n<p>The SDK allows you to do the following:<\/p>\n<ul>\n<li>Create steps that accomplish tasks<\/li>\n<li>Chain those steps together into workflows<\/li>\n<li>Branch out to run steps in parallel or based on conditions<\/li>\n<li>Include retry, succeed, or fail steps<\/li>\n<li>Review a graphical representation and definition for your workflow<\/li>\n<li>Create a workflow in Step Functions<\/li>\n<li>Start and review runs in Step Functions<\/li>\n<\/ul>\n<p>Although we don\u2019t use many of these functions, the Step Functions Data Science SDK can include the following:<\/p>\n<p>To get started, we first create a PyTorch estimator with the mnist.py file. We configure the estimator with the training script, an <a href=\"http:\/\/aws.amazon.com\/iam\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Identity and Access Management<\/a> (IAM) role, the number of training instances, the training instance type, and hyperparameters:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">from sagemaker.pytorch import PyTorch\n\nestimator = PyTorch(entry_point='mnist.py',\n                    role=sagemaker_execution_role,\n                    framework_version='1.2.0',\n                    py_version='py3',\n                    train_instance_count=2,\n                    train_instance_type='ml.c4.xlarge',\n                    hyperparameters={\n                        'epochs': 6\n                    })<\/code><\/pre>\n<\/p><\/div>\n<p>The Data Science SDK provides two ways of defining a pipeline. Firstly, you can use individual steps. For example, you can define a training step with the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">training_step = steps.TrainingStep(\"Train Step\",estimator=estimator,...)<\/code><\/pre>\n<\/p><\/div>\n<p>Then you create a model step:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">model_step\u00a0=\u00a0steps.ModelStep(\"Savemodel\",\u00a0model=training_step.get_expected_model(),...):<\/code><\/pre>\n<\/p><\/div>\n<p>Finally, you chain all the steps using the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">workflow_definition\u00a0=\u00a0steps.Chain([training_step,\u00a0model_step,\u00a0transform_step,\u00a0endpoint_config_step,\u00a0endpoint_step])<\/code><\/pre>\n<\/p><\/div>\n<p>For more information, see <a href=\"https:\/\/github.com\/aws\/amazon-sagemaker-examples\/blob\/master\/step-functions-data-science-sdk\/machine_learning_workflow_abalone\/machine_learning_workflow_abalone.ipynb\" target=\"_blank\" rel=\"noopener noreferrer\">Build a machine learning workflow using Step Functions and SageMaker<\/a>.<\/p>\n<p>Alternatively, you can use a <a href=\"https:\/\/aws-step-functions-data-science-sdk.readthedocs.io\/en\/stable\/pipelines.html#stepfunctions.template.pipeline.train.TrainingPipeline\" target=\"_blank\" rel=\"noopener noreferrer\">standard training pipeline class <\/a>that is built in to the SDK:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">pipeline = TrainingPipeline(\n                estimator=estimator,\n                role=workflow_execution_role,\n                inputs=inputs,\n                s3_bucket=bucket\n            )<\/code><\/pre>\n<\/p><\/div>\n<p>The workflow execution role allows you to create and run workflows in Step Functions. The following code creates the desired workflow and lets you render the same:<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image003.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27585\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image003.jpg\" alt=\"\" width=\"1120\" height=\"200\"><\/a><\/p>\n<p>Finally, you can create and run the workflow using\u00a0<code>pipeline.create()<\/code> and <code>pipeline.execute()<\/code>.<\/p>\n<p>An example output from the <code>execute()<\/code> statement looks as follows, and provides you with a link to Step Functions where you can view and monitor your execution:<\/p>\n<p><a href=\"https:\/\/console.aws.amazon.com\/states\/home?region=us-east-1#\/executions\/details\/arn:aws:states:us-east-1:497456752804:execution:training-pipeline-2021-06-24-15-28-55:training-pipeline-2021-06-24-15-29-31\" target=\"_blank\" rel=\"noopener noreferrer\">arn:aws:states:us-east-1:account-number:execution:training-pipeline-generated-date:training-pipeline-generated-time<\/a>.<\/p>\n<p>You can also render the current state of your workflow as it runs from a notebook using the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-bash\">execution.render_progress()<\/code><\/pre>\n<\/p><\/div>\n<h2>Use Step Functions Workflow Studio<\/h2>\n<p>To use Step Functions Workflow Studio, complete the following steps:<\/p>\n<ol>\n<li>On the Step Functions console, choose <strong>Create state machine<\/strong>.<\/li>\n<li>Select <strong>Design your workflow visually<\/strong>.<\/li>\n<li>Choose <strong>Next<\/strong>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image005.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27586\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image005.jpg\" alt=\"\" width=\"2180\" height=\"1070\"><\/a><\/li>\n<li>Enter and filter your SageMaker steps, then drag and drop them to the training step.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image007.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27587\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image007.jpg\" alt=\"\" width=\"2686\" height=\"762\"><\/a><\/li>\n<li>In a similar fashion, drag and drop the following states in order:\n<ol type=\"a\">\n<li>Create Model<\/li>\n<li>Create Endpoint Config<\/li>\n<li>Create Endpoint<\/li>\n<\/ol>\n<\/li>\n<\/ol>\n<p>Your workflow should now look like the following diagram.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image009.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27588\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image009.jpg\" alt=\"\" width=\"456\" height=\"706\"><\/a><\/p>\n<p>Now, let\u2019s configure each of these steps.<\/p>\n<ol start=\"6\">\n<li>Choose <strong>SageMaker CreateTrainingJob<\/strong> and edit the API parameters in the <strong>Form <\/strong>box.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image011.jpg\"><img decoding=\"async\" class=\"alignnone size-full wp-image-27589\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image011.jpg\" alt=\"\" height=\"532\"><\/a><\/li>\n<li>Use the following JSON object if you are following this example:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-json\">{\n  \"AlgorithmSpecification\": {\n    \"TrainingImage\": \"763104351884.dkr.ecr.us-east-1.amazonaws.com\/pytorch-training:1.2.0-cpu-py3\",\n    \"TrainingInputMode\": \"File\"\n  },\n  \"HyperParameters\": {\n    \"epochs\": \"6\",\n    \"sagemaker_submit_directory.$\": \"$$.Execution.Input.sourcedir\",\n    \"sagemaker_program.$\": \"$$.Execution.Input.trainfile\"\n  },\n  \"InputDataConfig\": [\n    {\n      \"ChannelName\": \"training\",\n      \"DataSource\": {\n        \"S3DataSource\": {\n          \"S3DataType\": \"S3Prefix\",\n          \"S3Uri\": \"s3:\/\/sagemaker-us-east-1-497456752804\/sagemaker\/DEMO-pytorch-mnist\",\n          \"S3DataDistributionType\": \"FullyReplicated\"\n        }\n      }\n    }\n  ],\n  \"StoppingCondition\": {\n    \"MaxRuntimeInSeconds\": 1000\n  },\n  \"ResourceConfig\": {\n    \"InstanceCount\": 2,\n    \"InstanceType\": \"ml.c4.xlarge\",\n    \"VolumeSizeInGB\": 30\n  },\n  \"OutputDataConfig\": {\n    \"S3OutputPath\": \"s3:\/\/sagemaker-us-east-1-497456752804\/stepfunctions-workflow-training-job-v1\/models\"\n  },\n  \"RoleArn\": \"arn:aws:iam::497456752804:role\/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK\",\n  \"TrainingJobName.$\": \"States.Format('trainingjob-{}',$$.Execution.Name)\"\n}\n<\/code><\/pre>\n<\/p><\/div>\n<ol start=\"8\">\n<li>Select <strong>Wait for task to complete<\/strong>.<\/li>\n<li>Edit the API parameters for <code>CreateModel<\/code>:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-json\">{\n  \"ExecutionRoleArn.$\": \"$.RoleArn\",\n  \"ModelName.$\": \"States.Format('model-{}',$$.Execution.Name)\",\n  \"PrimaryContainer\": {\n    \"Image.$\": \"$.AlgorithmSpecification.TrainingImage\",\n    \"Environment\": {\n      \"SAGEMAKER_PROGRAM.$\": \"$$.Execution.Input.trainfile\",\n      \"SAGEMAKER_SUBMIT_DIRECTORY.$\": \"$$.Execution.Input.sourcedir\"\n    },\n    \"ModelDataUrl.$\": \"$.ModelArtifacts.S3ModelArtifacts\"\n  }\n}\n<\/code><\/pre>\n<\/p><\/div>\n<ol start=\"10\">\n<li>Edit the API parameters for <code>CreateEndpointConfig<\/code>:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-json\">{\n  \"EndpointConfigName.$\": \"States.Format('config-{}',$$.Execution.Name)\",\n  \"ProductionVariants\": [\n    {\n      \"InitialInstanceCount\": 1,\n      \"InitialVariantWeight\": 1,\n      \"InstanceType\": \"ml.m4.xlarge\",\n      \"ModelName.$\": \"States.Format('model-{}',$$.Execution.Name)\",\n      \"VariantName\": \"AllTraffic\"\n    }\n  ]\n}<\/code><\/pre>\n<\/p><\/div>\n<ol start=\"11\">\n<li>Edit the API parameters for <code>CreateEndpoint<\/code>:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-json\">{\n  \"EndpointConfigName.$\": \"States.Format('config-{}',$$.Execution.Name)\",\n  \"EndpointName.$\": \"States.Format('endpoint-{}',$$.Execution.Name)\"\n}\n<\/code><\/pre>\n<\/p><\/div>\n<ol start=\"12\">\n<li>Choose <strong>Next<\/strong>, review the generated code, and choose <strong>Next<\/strong>.<\/li>\n<\/ol>\n<p>Step Functions can look at the resources you use and create a role. However, you may see the following message:<\/p>\n<p>\u201cStep Functions cannot generate an IAM policy if the RoleArn for SageMaker is from a Path. Hardcode the SageMaker RoleArn in your state machine definition, or choose an existing role with the proper permissions for Step Functions to call SageMaker.\u201d<\/p>\n<p>We use a role that we created in the Data Science SDK section instead.<\/p>\n<ol start=\"13\">\n<li>Select <strong>Use existing role<\/strong> and use the role <code>StepFunctionsWorkflowExecutionRole<\/code>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image013.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27590\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image013.jpg\" alt=\"\" width=\"1754\" height=\"638\"><\/a><\/li>\n<li>Choose <strong>Create state machine<\/strong>.<\/li>\n<li>When you receive the message that the machine was successfully created, run it with the following input:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-bash\">{\n    \"trainfile\":\"mnist.py\",\n    \"sourcedir\":\"s3:\/\/path\/to\/sourcedir.tar.gz\"\n}\n<\/code><\/pre>\n<\/p><\/div>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image015.jpg\"><img decoding=\"async\" class=\"alignnone size-full wp-image-27591\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image015.jpg\" alt=\"\" height=\"840\"><\/a><\/p>\n<p>Monitor and wait for the run to finish.<\/p>\n<h2>Use the Amazon States Language<\/h2>\n<p>Both the methods we just discussed are great ways to quickly prototype a state machine on Step Functions. When you need to edit the Step Functions definition directly, you can use the States language. See the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">{\n  \"Comment\": \"This is your state machine\",\n  \"StartAt\": \"SageMaker CreateTrainingJob\",\n  \"States\": {\n    \"SageMaker CreateTrainingJob\": {\n      \"Type\": \"Task\",\n      \"Resource\": \"arn:aws:states:::sagemaker:createTrainingJob.sync\",\n      \"Parameters\": {\n        \"AlgorithmSpecification\": {\n          \"TrainingImage\": \"763104351884.dkr.ecr.us-east-1.amazonaws.com\/pytorch-training:1.2.0-cpu-py3\",\n          \"TrainingInputMode\": \"File\"\n        },\n        \"HyperParameters\": {\n          \"epochs\": \"6\",\n          \"sagemaker_submit_directory.$\": \"$$.Execution.Input.sourcedir\",\n          \"sagemaker_program.$\": \"$$.Execution.Input.trainfile\"\n        },\n        \"InputDataConfig\": [\n          {\n            \"ChannelName\": \"training\",\n            \"DataSource\": {\n              \"S3DataSource\": {\n                \"S3DataType\": \"S3Prefix\",\n                \"S3Uri\": \"s3:\/\/sagemaker-us-east-1-497456752804\/sagemaker\/DEMO-pytorch-mnist\",\n                \"S3DataDistributionType\": \"FullyReplicated\"\n              }\n            }\n          }\n        ],\n        \"StoppingCondition\": {\n          \"MaxRuntimeInSeconds\": 1000\n        },\n        \"ResourceConfig\": {\n          \"InstanceCount\": 2,\n          \"InstanceType\": \"ml.c4.xlarge\",\n          \"VolumeSizeInGB\": 30\n        },\n        \"OutputDataConfig\": {\n          \"S3OutputPath\": \"s3:\/\/sagemaker-us-east-1-497456752804\/stepfunctions-workflow-training-job-v1\/models\"\n        },\n        \"RoleArn\": \"arn:aws:iam::497456752804:role\/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK\",\n        \"TrainingJobName.$\": \"States.Format('trainingjob-{}',$$.Execution.Name)\"\n      },\n      \"Next\": \"SageMaker CreateModel\"\n    },\n    \"SageMaker CreateModel\": {\n      \"Type\": \"Task\",\n      \"Resource\": \"arn:aws:states:::sagemaker:createModel\",\n      \"Parameters\": {\n        \"ExecutionRoleArn.$\": \"$.RoleArn\",\n        \"ModelName.$\": \"States.Format('model-{}',$$.Execution.Name)\",\n        \"PrimaryContainer\": {\n          \"Image.$\": \"$.AlgorithmSpecification.TrainingImage\",\n          \"Environment\": {\n            \"SAGEMAKER_PROGRAM.$\": \"$$.Execution.Input.trainfile\",\n            \"SAGEMAKER_SUBMIT_DIRECTORY.$\": \"$$.Execution.Input.sourcedir\"\n          },\n          \"ModelDataUrl.$\": \"$.ModelArtifacts.S3ModelArtifacts\"\n        }\n      },\n      \"Next\": \"SageMaker CreateEndpointConfig\"\n    },\n    \"SageMaker CreateEndpointConfig\": {\n      \"Type\": \"Task\",\n      \"Resource\": \"arn:aws:states:::sagemaker:createEndpointConfig\",\n      \"Parameters\": {\n        \"EndpointConfigName.$\": \"States.Format('config-{}',$$.Execution.Name)\",\n        \"ProductionVariants\": [\n          {\n            \"InitialInstanceCount\": 1,\n            \"InitialVariantWeight\": 1,\n            \"InstanceType\": \"ml.m4.xlarge\",\n            \"ModelName.$\": \"States.Format('model-{}',$$.Execution.Name)\",\n            \"VariantName\": \"AllTraffic\"\n          }\n        ]\n      },\n      \"Next\": \"SageMaker CreateEndpoint\"\n    },\n    \"SageMaker CreateEndpoint\": {\n      \"Type\": \"Task\",\n      \"Resource\": \"arn:aws:states:::sagemaker:createEndpoint\",\n      \"Parameters\": {\n        \"EndpointConfigName.$\": \"States.Format('config-{}',$$.Execution.Name)\",\n        \"EndpointName.$\": \"States.Format('endpoint-{}',$$.Execution.Name)\"\n      },\n      \"End\": true\n    }\n  }\n}\n<\/code><\/pre>\n<\/p><\/div>\n<p>You can create a new Step Functions state machine on the Step Functions console by selecting <strong>Write your workflow in code<\/strong>.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image017.jpg\"><img decoding=\"async\" class=\"alignnone size-full wp-image-27592\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image017.jpg\" alt=\"\" height=\"938\"><\/a><\/p>\n<p>A successful run shows each state in green.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image019.jpg\"><img decoding=\"async\" class=\"alignnone wp-image-27596 size-full\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image019.jpg\" alt=\"\" height=\"494\"><\/a><\/p>\n<p>Each state points to resources in SageMaker. In the following screenshot, the link under <strong>Resource<\/strong> points to the model created as a result of the <code>CreateModel<\/code> step.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image021.jpg\"><img decoding=\"async\" class=\"alignnone size-full wp-image-27594\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/31\/ML4675-image021.jpg\" alt=\"\" height=\"638\"><\/a><\/p>\n<h2>When to use what?<\/h2>\n<p>The following table summarizes the supported service integrations.<\/p>\n<p>Although most of what you typically need for your pipelines is included in the Step Functions Data Science SDK, you may need to integrate with other supported services that are supported by other choices shown in the preceding table.<\/p>\n<p>In addition, consider the skillsets in your existing team\u2014teams that are used to working with a particular tool may prefer sticking to the same for maximizing productivity. This is true when considering the options within Step Functions explored here, but also others such as the <a href=\"https:\/\/aws.amazon.com\/cdk\/\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Cloud Development Kit<\/a> (AWS CDK), <a href=\"https:\/\/aws.amazon.com\/serverless\/sam\/\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Serverless Application Model<\/a> (AWS SAM), Airflow, KubeFlow, and SageMaker Pipelines. Specifically around Pipelines, consider that data scientists and ML engineers may benefit from working on a single platform that includes the ability to not only maintain and run pipelines, but also manage models, endpoints, notebooks and other features.<\/p>\n<p>Lastly, consider a hybrid set of services for using the right tool for the right job. For example:<\/p>\n<ul>\n<li>You can use <a href=\"http:\/\/aws.amazon.com\/codepipeline\" target=\"_blank\" rel=\"noopener noreferrer\">AWS CodePipeline<\/a> along with Step Functions for orchestrating ML pipelines that require custom containers. CodePipeline invokes Step Functions and passes the container image URI and the unique container image tag as parameters to Step Functions. For more information, see <a href=\"https:\/\/aws.amazon.com\/blogs\/machine-learning\/build-a-ci-cd-pipeline-for-deploying-custom-machine-learning-models-using-aws-services\/\" target=\"_blank\" rel=\"noopener noreferrer\">Build a CI\/CD pipeline for deploying custom machine learning models using AWS services<\/a>.<\/li>\n<li>You can use Kubeflow pipelines to define the training pipeline, and SageMaker to host trained models on the cloud. For more information, see <a href=\"https:\/\/aws.amazon.com\/blogs\/machine-learning\/cisco-uses-amazon-sagemaker-and-kubeflow-to-create-a-hybrid-machine-learning-workflow\/\" target=\"_blank\" rel=\"noopener noreferrer\">Cisco uses Amazon SageMaker and Kubeflow to create a hybrid machine learning workflow<\/a>.<\/li>\n<li>You can use Pipelines for automating feature engineering pipelines using <a href=\"https:\/\/aws.amazon.com\/sagemaker\/data-wrangler\/\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Data Wrangler<\/a> and <a href=\"https:\/\/aws.amazon.com\/sagemaker\/feature-store\/\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Feature Store<\/a>. Pipelines is a purpose-built CI\/CD tool for ML model building and deployment that not only includes workflow orchestration, but is also related to concepts such as model registry, lineage tracking, and projects. When upstream processes are already using Step Functions, for example, to prepare data using AWS services, consider using a hybrid architecture where both Step Functions and Pipelines are used. For more information, see <a href=\"https:\/\/aws.amazon.com\/blogs\/machine-learning\/automate-feature-engineering-pipelines-with-amazon-sagemaker\/\" target=\"_blank\" rel=\"noopener noreferrer\">Automate feature engineering pipelines with Amazon SageMaker<\/a>.<\/li>\n<li>When developers and data scientists need to write infrastructure as code with unit testing, consider using tools like the <a href=\"https:\/\/aws-step-functions-data-science-sdk.readthedocs.io\/en\/latest\/\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Data Science SDK<\/a> and <a href=\"https:\/\/aws.amazon.com\/managed-workflows-for-apache-airflow\/\" target=\"_blank\" rel=\"noopener noreferrer\">Apache Airflow<\/a>, because you can use Python to define end-to-end architectures and pipelines.<\/li>\n<\/ul>\n<h2>Summary<\/h2>\n<p>In this post, we looked at three different ways of authoring and running Step Functions pipelines, specifically for end-to-end ML workflows. Choosing a pipelining tool for ML is an important step for a team, and is a decision that needs to consider the context, existing skillsets, connections to various other teams with these skillsets in an organization, available service integration, service limits, and applicable quotas. Contact your AWS team to help guide you through these decisions; we are eager to help!<\/p>\n<p>For further reading, check out the following:<\/p>\n<hr>\n<h3>About the Author<\/h3>\n<p><strong><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2019\/02\/14\/Shreyas-Subramanian-100.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-7674 alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2019\/02\/14\/Shreyas-Subramanian-100.jpg\" alt=\"\" width=\"100\" height=\"134\"><\/a>Shreyas Subramanian<\/strong> is a AI\/ML specialist Solutions Architect, and helps customers by using Machine Learning to solve their business challenges on the AWS Cloud.<\/p>\n<p>       <!-- '\"` -->\n      <\/div>\n","protected":false},"excerpt":{"rendered":"<p>https:\/\/aws.amazon.com\/blogs\/machine-learning\/define-and-run-machine-learning-pipelines-on-step-functions-using-python-workflow-studio-or-states-language\/<\/p>\n","protected":false},"author":0,"featured_media":806,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[3],"tags":[],"_links":{"self":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/805"}],"collection":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/comments?post=805"}],"version-history":[{"count":0,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/805\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media\/806"}],"wp:attachment":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media?parent=805"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/categories?post=805"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/tags?post=805"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}