{"id":1371,"date":"2021-12-15T00:19:30","date_gmt":"2021-12-15T00:19:30","guid":{"rendered":"https:\/\/salarydistribution.com\/machine-learning\/2021\/12\/15\/process-amazon-redshift-data-and-schedule-a-training-pipeline-with-amazon-sagemaker-processing-and-amazon-sagemaker-pipelines\/"},"modified":"2021-12-15T00:19:30","modified_gmt":"2021-12-15T00:19:30","slug":"process-amazon-redshift-data-and-schedule-a-training-pipeline-with-amazon-sagemaker-processing-and-amazon-sagemaker-pipelines","status":"publish","type":"post","link":"https:\/\/salarydistribution.com\/machine-learning\/2021\/12\/15\/process-amazon-redshift-data-and-schedule-a-training-pipeline-with-amazon-sagemaker-processing-and-amazon-sagemaker-pipelines\/","title":{"rendered":"Process Amazon Redshift data and schedule a training pipeline with Amazon SageMaker Processing and Amazon SageMaker Pipelines"},"content":{"rendered":"<div id=\"\">\n<p>Customers in many different domains tend to work with multiple sources for their data: object-based storage like <a href=\"https:\/\/aws.amazon.com\/s3\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Simple Storage Service<\/a> (Amazon S3), relational databases like <a href=\"https:\/\/aws.amazon.com\/rds\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Relational Database Service<\/a> (Amazon RDS), or data warehouses like <a href=\"https:\/\/aws.amazon.com\/redshift\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Redshift<\/a>. Machine learning (ML) practitioners are often driven to work with objects and files instead of databases and tables from the different frameworks they work with. They also prefer local copies of such files in order to reduce the latency of accessing them.<\/p>\n<p>Nevertheless, ML engineers and data scientists might be required to directly extract data from data warehouses with SQL-like queries to obtain the datasets that they can use for training their models.<\/p>\n<p>In this post, we use the <a href=\"https:\/\/aws.amazon.com\/sagemaker\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker<\/a> Processing API to run a query against an Amazon Redshift cluster, create CSV files, and perform distributed processing. As an extra step, we also train a simple model to predict the total sales for new events, and build a pipeline with <a href=\"https:\/\/aws.amazon.com\/sagemaker\/pipelines\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker Pipelines<\/a> to schedule it.<\/p>\n<h2>Prerequisites<\/h2>\n<p>This post uses the sample data that is available when creating a <a href=\"https:\/\/aws.amazon.com\/redshift\/free-trial\/\" target=\"_blank\" rel=\"noopener noreferrer\">Free Tier cluster in Amazon Redshift<\/a>. As a prerequisite, you should create your cluster and attach to it an <a href=\"http:\/\/aws.amazon.com\/iam\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Identity and Access Management<\/a> (IAM) role with the correct permissions. For instructions on creating the cluster with the sample dataset, see <a href=\"https:\/\/docs.aws.amazon.com\/redshift\/latest\/gsg\/sample-data-load.html\" target=\"_blank\" rel=\"noopener noreferrer\">Using a sample dataset<\/a>. For instructions on associating the role with the cluster, see <a href=\"https:\/\/docs.aws.amazon.com\/redshift\/latest\/mgmt\/data-api.html#data-api-access\" target=\"_blank\" rel=\"noopener noreferrer\">Authorizing access to the Amazon Redshift Data API<\/a>.<\/p>\n<p>You can then use your IDE of choice to open the notebooks. This content has been developed and tested using <a href=\"https:\/\/aws.amazon.com\/sagemaker\/studio\/\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Studio<\/a> on a ml.t3.medium instance. For more information about using Studio, refer to the following resources:<\/p>\n<h2>Define the query<\/h2>\n<p>Now that your Amazon Redshift cluster is up and running, and loaded with the sample dataset, we can define the query to extract data from our cluster. According to the documentation for the <a href=\"https:\/\/docs.aws.amazon.com\/redshift\/latest\/dg\/c_sampledb.html\" target=\"_blank\" rel=\"noopener noreferrer\">sample database<\/a>, this application helps analysts track sales activity for the fictional TICKIT website, where users buy and sell tickets online for sporting events, shows, and concerts. In particular, analysts can identify ticket movement over time, success rates for sellers, and the best-selling events, venues, and seasons.<\/p>\n<p>Analysts may be tasked to solve a very common ML problem: predict the number of tickets sold given the characteristics of an event. Because we have two fact tables and five dimensions in our sample database, we have some data that we can work with. For the sake of this example, we try to use information from the venue in which the event takes place as well as its date. The SQL query looks like the following:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-sql\">SELECT sum(s.qtysold) AS total_sold, e.venueid, e.catid, d.caldate, d.holiday\nfrom sales s, event e, date d\nWHERE s.eventid = e.eventid and e.dateid = d.dateid\nGROUP BY e.venueid, e.catid, d.caldate, d.holiday<\/code><\/pre>\n<\/p><\/div>\n<p>We can <a href=\"https:\/\/docs.aws.amazon.com\/redshift\/latest\/gsg\/rs-gsg-sample-data-load-query.html\" target=\"_blank\" rel=\"noopener noreferrer\">run this query in the query editor<\/a> to test the outcomes and change it to include additional information if needed.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/ML-5901-image001.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-31672\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/ML-5901-image001.png\" alt=\"\" width=\"2876\" height=\"1318\"><\/a><\/p>\n<h2>Extract the data from Amazon Redshift and process it with SageMaker Processing<\/h2>\n<p>Now that we\u2019re happy with our query, we need to make it part of our training pipeline.<\/p>\n<p>A typical training pipeline consists of three phases:<\/p>\n<ul>\n<li><strong>Preprocessing<\/strong> \u2013 This phase reads the raw dataset and transforms it into a format that matches the input required by the model for its training<\/li>\n<li><strong>Training<\/strong> \u2013 This phase reads the processed dataset and uses it to train the model<\/li>\n<li><strong>Model registration<\/strong> \u2013 In this phase, we save the model for later usage<\/li>\n<\/ul>\n<p>Our first task is to use a <a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/dg\/processing-job.html\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Processing job<\/a> to load the dataset from Amazon Redshift, preprocess it, and store it to Amazon S3 for the training model to pick up. SageMaker Processing allows us to directly read data from different resources, including Amazon S3, <a href=\"https:\/\/aws.amazon.com\/athena\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Athena<\/a>, and Amazon Redshift. SageMaker Processing allows us to configure access to the cluster by providing the cluster and database information, and use our previously defined SQL query as part of a <a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/APIReference\/API_RedshiftDatasetDefinition.html\" target=\"_blank\" rel=\"noopener noreferrer\">RedshiftDatasetDefinition<\/a>. We use the SageMaker Python SDK to create this object, and you can check the definition and the parameters needed on the . See the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">from sagemaker.dataset_definition.inputs import RedshiftDatasetDefinition\n\nrdd = RedshiftDatasetDefinition(\n    cluster_id=\"THE-NAME-OF-YOUR-CLUSTER\",\n    database=\"THE-NAME-OF-YOUR-DATABASE\",\n    db_user=\"YOUR-DB-USERNAME\",\n    query_string=\"THE-SQL-QUERY-FROM-THE-PREVIOUS-STEP\",\n    cluster_role_arn=\"THE-IAM-ROLE-ASSOCIATED-TO-YOUR-CLUSTER\",\n    output_format=\"CSV\",\n    output_s3_uri=\"WHERE-IN-S3-YOU-WANT-TO-STORE-YOUR-DATA\"\n)<\/code><\/pre>\n<\/p><\/div>\n<p>Then, you can define the <a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/APIReference\/API_DatasetDefinition.html\" target=\"_blank\" rel=\"noopener noreferrer\">DatasetDefinition<\/a>. This object is responsible for defining how SageMaker Processing uses the dataset loaded from Amazon Redshift:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">from sagemaker.dataset_definition.inputs import DatasetDefinition\n\ndd = DatasetDefinition(\n    data_distribution_type='ShardedByS3Key', # This tells SM Processing to shard the data across instances\n    local_path='\/opt\/ml\/processing\/input\/data\/', # Where SM Processing will save the data in the container\n    redshift_dataset_definition=rdd # Our ResdhiftDataset\n)<\/code><\/pre>\n<\/p><\/div>\n<p>Finally, you can use this object as input of your processor of choice. For this post, we wrote a very simple scikit-learn script that cleans the dataset, performs some transformations, and splits the dataset for training and testing. You can check the code in the file <code>processing.py<\/code>.<\/p>\n<p>We can now instantiate the <a href=\"https:\/\/sagemaker.readthedocs.io\/en\/stable\/frameworks\/sklearn\/sagemaker.sklearn.html#sagemaker.sklearn.processing.SKLearnProcessor\" target=\"_blank\" rel=\"noopener noreferrer\">SKLearnProcessor<\/a> object, where we define the framework version that we plan on using, the amount and type of instances that we spin up as part of our processing cluster, and the execution role that contains the right permissions. Then, we can pass the parameter <code>dataset_definition<\/code> as the input of the <code>run()<\/code> method. This method accepts our <code>processing.py<\/code> script as the code to run, given some inputs (namely, our <code>RedshiftDatasetDefinition<\/code>), generates some outputs (a train and a test dataset), and stores both to Amazon S3. We run this operation synchronously thanks to the parameter <code>wait=True<\/code>:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">from sagemaker.sklearn import SKLearnProcessor\nfrom sagemaker.processing import ProcessingInput, ProcessingOutput\nfrom sagemaker import get_execution_role\n\nskp = SKLearnProcessor(\n    framework_version='0.23-1',\n    role=get_execution_role(),\n    instance_type='ml.m5.large',\n    instance_count=1\n)\nskp.run(\n    code='processing\/processing.py',\n    inputs=[ProcessingInput(\n        dataset_definition=dd,\n        destination='\/opt\/ml\/processing\/input\/data\/',\n        s3_data_distribution_type='ShardedByS3Key'\n    )],\n    outputs = [\n        ProcessingOutput(\n            output_name=\"train\", \n            source=\"\/opt\/ml\/processing\/train\"\n        ),\n        ProcessingOutput(\n            output_name=\"test\", \n            source=\"\/opt\/ml\/processing\/test\"\n        ),\n    ],\n    wait=True\n)<\/code><\/pre>\n<\/p><\/div>\n<p>With the outputs created by the processing job, we can move to the training step, by means of the <code>sagemaker.sklearn.SKLearn()<\/code> Estimator:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">from sagemaker.sklearn import SKLearn\n\ns = SKLearn(\n    entry_point='training\/script.py',\n    framework_version='0.23-1',\n    instance_type='ml.m5.large',\n    instance_count=1,\n    role=get_execution_role()\n)\ns.fit({\n    'train':skp.latest_job.outputs[0].destination, \n    'test':skp.latest_job.outputs[1].destination\n})<\/code><\/pre>\n<\/p><\/div>\n<p>To learn more about the SageMaker Training API and Scikit-learn Estimator, see <a href=\"https:\/\/sagemaker.readthedocs.io\/en\/stable\/frameworks\/sklearn\/using_sklearn.html\" target=\"_blank\" rel=\"noopener noreferrer\">Using Scikit-learn with the SageMaker Python SDK<\/a>.<\/p>\n<h2>Define a training pipeline<\/h2>\n<p>Now that we have proven that we can read data from Amazon Redshift, preprocess it, and use it to train a model, we can define a pipeline that reproduces these steps, and schedule it to run. To do so, we use <a href=\"https:\/\/aws.amazon.com\/sagemaker\/pipelines\/\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Pipelines<\/a>. Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI\/CD) service for ML. With Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.<\/p>\n<p>Pipelines are composed of steps. These steps define the actions that the pipeline takes, and the relationships between steps using properties. We already know that our pipelines are composed of three steps:<\/p>\n<p>Furthermore, to make the pipeline definition dynamic, Pipelines allows us to define parameters, which are values that we can provide at runtime when the pipeline starts.<\/p>\n<p>The following code is a snippet that shows the definition of a processing step. The step requires the definition of a processor, which is very similar to the one defined previously during the preprocessing discovery phase, but this time using the parameters of Pipelines. The others parameters, code, inputs, and outputs are the same as we have defined previously:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">#### PROCESSING STEP #####\n\n# PARAMETERS\nprocessing_instance_type = ParameterString(name='ProcessingInstanceType', default_value='ml.m5.large')\nprocessing_instance_count = ParameterInteger(name='ProcessingInstanceCount', default_value=2)\n\n# PROCESSOR\nskp = SKLearnProcessor(\n    framework_version='0.23-1',\n    role=get_execution_role(),\n    instance_type=processing_instance_type,\n    instance_count=processing_instance_count\n)\n\n# DEFINE THE STEP\nprocessing_step = ProcessingStep(\n    name='ProcessingStep',\n    processor=skp,\n    code='processing\/processing.py',\n    inputs=[ProcessingInput(\n        dataset_definition=dd,\n        destination='\/opt\/ml\/processing\/input\/data\/',\n        s3_data_distribution_type='ShardedByS3Key'\n    )],\n    outputs = [\n        ProcessingOutput(output_name=\"train\", source=\"\/opt\/ml\/processing\/output\/train\"),\n        ProcessingOutput(output_name=\"test\", source=\"\/opt\/ml\/processing\/output\/test\"),\n    ]\n)<\/code><\/pre>\n<\/p><\/div>\n<p>Very similarly, we can define the training step, but we use the outputs from the processing step as inputs:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\"># TRAININGSTEP\ntraining_step = TrainingStep(\n    name='TrainingStep',\n    estimator=s,\n    inputs={\n        \"train\": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri),\n        \"test\": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[\"test\"].S3Output.S3Uri)\n    }\n)<\/code><\/pre>\n<\/p><\/div>\n<p>Finally, let\u2019s add the model step, which registers the model to SageMaker for later use (for real-time endpoints and batch transform):<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\"># MODELSTEP\nmodel_step = CreateModelStep(\n    name=\"Model\",\n    model=model,\n    inputs=CreateModelInput(instance_type='ml.m5.xlarge')\n)<\/code><\/pre>\n<\/p><\/div>\n<p>With all the pipeline steps now defined, we can define the pipeline itself as a pipeline object comprising a series of those steps. <code>ParallelStep<\/code> and <code>Condition<\/code> steps also are possible. Then we can update and insert (<em>upsert<\/em>) the definition to Pipelines with the <code>.upsert()<\/code> command:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">#### PIPELINE ####\npipeline = Pipeline(\n    name = 'Redshift2Pipeline',\n    parameters = [\n        processing_instance_type, processing_instance_count,\n        training_instance_type, training_instance_count,\n        inference_instance_type\n    ],\n    steps = [\n        processing_step, \n        training_step,\n        model_step\n    ]\n)\npipeline.upsert(role_arn=role)<\/code><\/pre>\n<\/p><\/div>\n<p>After we upsert the definition, we can start the pipeline with the pipeline object\u2019s start() method, and wait for the end of its run:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">execution = pipeline.start()\nexecution.wait()<\/code><\/pre>\n<\/p><\/div>\n<p>After the pipeline starts running, we can view the run on the SageMaker console. In the navigation pane, under <strong>Components and registries<\/strong>, choose <strong>Pipelines<\/strong>. Choose the <code>Redshift2Pipeline<\/code> pipeline, and then choose the specific run to see its progress. You can choose each step to see additional details such as the output, logs, and additional information. Typically, this pipeline should take about 10 minutes to complete.<\/p>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/ML-5901-image002-1.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone wp-image-31674 size-full\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/ML-5901-image002-1.png\" alt=\"\" width=\"500\" height=\"605\"><\/a><\/p>\n<h2>Conclusions<\/h2>\n<p>In this post, we created a SageMaker pipeline that reads data from Amazon Redshift natively without requiring additional configuration or services, processed it via SageMaker Processing, and trained a scikit-learn model. We can now do the following:<\/p>\n<p>If you want additional notebooks to play with, check out the following:<\/p>\n<hr>\n<h3>About the Author<\/h3>\n<p><strong><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/David-G.png\"><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-31675 alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/12\/10\/David-G.png\" alt=\"\" width=\"100\" height=\"105\"><\/a>Davide Gallitelli<\/strong> is a Specialist Solutions Architect for AI\/ML in the EMEA region. He is based in Brussels and works closely with customers throughout Benelux. He has been a developer since he was very young, starting to code at the age of 7. He started learning AI\/ML at university, and has fallen in love with it since then.<\/p>\n<p>       <!-- '\"` -->\n      <\/div>\n","protected":false},"excerpt":{"rendered":"<p>https:\/\/aws.amazon.com\/blogs\/machine-learning\/process-amazon-redshift-data-and-schedule-a-training-pipeline-with-amazon-sagemaker-processing-and-amazon-sagemaker-pipelines\/<\/p>\n","protected":false},"author":0,"featured_media":1372,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[3],"tags":[],"_links":{"self":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/1371"}],"collection":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/comments?post=1371"}],"version-history":[{"count":0,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/1371\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media\/1372"}],"wp:attachment":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media?parent=1371"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/categories?post=1371"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/tags?post=1371"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}