{"id":809,"date":"2021-09-05T15:52:56","date_gmt":"2021-09-05T15:52:56","guid":{"rendered":"https:\/\/salarydistribution.com\/machine-learning\/2021\/09\/05\/schedule-an-amazon-sagemaker-data-wrangler-flow-to-process-new-data-periodically-using-aws-lambda-functions\/"},"modified":"2021-09-05T15:52:56","modified_gmt":"2021-09-05T15:52:56","slug":"schedule-an-amazon-sagemaker-data-wrangler-flow-to-process-new-data-periodically-using-aws-lambda-functions","status":"publish","type":"post","link":"https:\/\/salarydistribution.com\/machine-learning\/2021\/09\/05\/schedule-an-amazon-sagemaker-data-wrangler-flow-to-process-new-data-periodically-using-aws-lambda-functions\/","title":{"rendered":"Schedule an Amazon SageMaker Data Wrangler flow to process new data periodically using AWS Lambda functions"},"content":{"rendered":"<div id=\"\">\n<p>Data scientists can spend up to 80% of their time preparing data for machine learning (ML) projects. This preparation process is largely undifferentiated and tedious work, and can involve multiple programming APIs and custom libraries. Announced at <a href=\"https:\/\/www.youtube.com\/watch?v=PjDysgCvRqY\" target=\"_blank\" rel=\"noopener noreferrer\">AWS re:Invent 2020<\/a>, <a href=\"https:\/\/aws.amazon.com\/sagemaker\/data-wrangler\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon SageMaker Data Wrangler<\/a> reduces the time it takes to aggregate and prepare data for ML from weeks to minutes. With Data Wrangler, you can simplify the process of data preparation and feature engineering. You can complete each step of the data preparation workflow, including data selection, cleansing, exploration, and visualization, from a single visual interface. For more information about how to prepare your datasets for ML training, inference, or other use cases, see <a href=\"https:\/\/aws.amazon.com\/blogs\/aws\/introducing-amazon-sagemaker-data-wrangler-a-visual-interface-to-prepare-data-for-machine-learning\/\" target=\"_blank\" rel=\"noopener noreferrer\">Introducing Amazon SageMaker Data Wrangler, a Visual Interface to Prepare Data for Machine Learning<\/a>.<\/p>\n<p>Data Wrangler natively connects data sources such as <a href=\"https:\/\/aws.amazon.com\/s3\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Simple Storage Service<\/a> (Amazon S3), <a href=\"https:\/\/aws.amazon.com\/athena\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Athena<\/a>, <a href=\"https:\/\/aws.amazon.com\/redshift\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Redshift<\/a>, and Snowflake. Data Wrangler also integrates to multiple SageMaker features like <a href=\"https:\/\/aws.amazon.com\/sagemaker\/clarify\/\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Clarify<\/a>, <a href=\"https:\/\/aws.amazon.com\/sagemaker\/feature-store\/\" target=\"_blank\" rel=\"noopener noreferrer\">Feature Store<\/a>, and <a href=\"https:\/\/aws.amazon.com\/sagemaker\/pipelines\/\" target=\"_blank\" rel=\"noopener noreferrer\">Pipelines<\/a>. The Data Wrangler UI has been launched as part of <a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/dg\/studio.html\" target=\"_blank\" rel=\"noopener noreferrer\">SageMaker Studio<\/a>, which is the primary IDE for SageMaker. You can explore your data within the Data Wrangler UI and create a data flow (.flow file) that defines an exportable series of ML data preparation steps, ready for integration with training and inference ML workflows. After you create a preprocessing flow based on your sample data, you need a way to transfer the pipeline logic into your production workflow to handle incoming data periodically.<\/p>\n<p>This post demonstrates how to schedule your data preparation to run automatically using <a href=\"https:\/\/aws.amazon.com\/lambda\/\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Lambda<\/a> and an existing Data Wrangler .flow file. Lambda is a serverless compute service that lets you run your code with the right execution power and zero administration.<\/p>\n<p>Steps required to schedule your Data Wrangler flow to run regularly:<\/p>\n<ol>\n<li>Export your Data Wrangler .flow file as a SageMaker processing script.<\/li>\n<li>Create a Lambda function from the processing script.<\/li>\n<li>Schedule the Lambda function defined in the previous step to run using <a href=\"https:\/\/aws.amazon.com\/eventbridge\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon EventBridge<\/a>.<\/li>\n<li>Optionally, you can parameterize the Data Wrangler .flow file based on the modularity required (which we demonstrate in this post).<\/li>\n<\/ol>\n<p>This post assumes you have an existing .flow file from your processing step using Data Wrangler.<\/p>\n<h2>Export your Data Wrangler .flow file<\/h2>\n<p>We use an existing .flow file that was generated from two data sources (Amazon S3 and Athena) for demonstration purposes. You can use any existing .flow file to follow along with this post.<\/p>\n<ol>\n<li>Choose the .flow file (right click).<\/li>\n<li>On the <strong>Open With <\/strong>menu, choose <strong>Flow<\/strong>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image001.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27263\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image001.png\" alt=\"\" width=\"335\" height=\"342\"><\/a><\/li>\n<li>On the <strong>Export tab<\/strong>, choose <strong>Save to S3<\/strong>.<\/li>\n<\/ol>\n<p>The notebook can run the processing job and save the output to an Amazon S3 location.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image003.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27264\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image003.png\" alt=\"\" width=\"1023\" height=\"576\"><\/a><\/p>\n<p>Data Wrangler exports the entire processing steps into a notebook. The notebook can run the processing job and save the output to an Amazon S3 location. To run this processing job on a regular basis, let\u2019s say daily, monthly, or quarterly, we need to set up a Lambda function and schedule it for the required frequency. Lambda lets you run code without provisioning or managing servers.<\/p>\n<ol start=\"4\">\n<li>The first step to prepare the Lambda function is to export the notebook into an executable Python file.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image005.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27265\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image005.png\" alt=\"\" width=\"1431\" height=\"851\"><\/a><\/li>\n<\/ol>\n<p>After we have the Python script, we can modify the script to create a Lambda function.<\/p>\n<ol start=\"5\">\n<li>Open the .py script and remove comments that were automatically generated from the notebook, and rename the script <code>lambda_function.py<\/code>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image007.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27266\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image007.png\" alt=\"\" width=\"1033\" height=\"689\"><\/a><\/li>\n<li>Bring all the import statements at the beginning of the script and add one helper function to the script, <code>lambda_handler()<\/code>.<\/li>\n<\/ol>\n<p>This is the main Python function for the Lambda function that is used to schedule the .flow file.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image009.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27267\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image009.png\" alt=\"\" width=\"932\" height=\"461\"><\/a><\/p>\n<p>See the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">def lambda_handler(event, context):\n    \"\"\" main handler function for Lambda \"\"\"\n\n    job_info = run_flow()\n\n    api_response = {\n        'statusCode': 200,\n        'event': event,\n        'job_info': job_info\n    }\n\n    return api_response<\/code><\/pre>\n<\/p><\/div>\n<ol start=\"7\">\n<li>Wrap the auto-generated code within a function, <code>run_flow()<\/code>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image011.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27268\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image011.png\" alt=\"\" width=\"335\" height=\"113\"><\/a><\/li>\n<\/ol>\n<p>This is to call the function we defined in the previous step.<\/p>\n<ol start=\"8\">\n<li>Modify one line where <code>iam_role<\/code> is defined. We need to <a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/dg\/automatic-model-tuning-ex-role.html\" target=\"_blank\" rel=\"noopener noreferrer\">find the SageMaker execution role<\/a> and update <code>iam_role<\/code> with the ARN for that role.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image013.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27269\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image013.png\" alt=\"\" width=\"879\" height=\"169\"><\/a><\/li>\n<li>Add one more line at the end of <code>run_flow()<\/code> function as the following:<\/li>\n<\/ol>\n<p><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image015.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27270\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image015.png\" alt=\"\" width=\"1364\" height=\"536\"><\/a><\/p>\n<p>See the following code:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">def run_flow():\n\n    data_sources = []\n\n    data_sources.append(ProcessingInput(\n        input_name=\"Inpatient_Claim\",\n        dataset_definition=DatasetDefinition(\n            local_path=\"\/opt\/ml\/processing\/Inpatient_Claim\",\n            data_distribution_type=\"FullyReplicated\",\n            # You can override below to point to other database or use different queries\n            athena_dataset_definition=AthenaDatasetDefinition(\n                catalog=\"AwsDataCatalog\",\n                database=\"cms\",\n                query_string=\"select * from inpatient_claim where clm_from_dt between 20080100 and 20080200\",\n                output_s3_uri=\"s3:\/\/sagemaker-us-east-1-123456789\/athena\/Inpatient_Claim\/\",\n                output_format=\"PARQUET\"\n            )\n        )\n    ))\n\n    data_sources.append(ProcessingInput(\n        source=\"s3:\/\/sagemaker-us-east-1-123456789\/DW\/DE1_0_2008_Beneficiary_Summary_File_Sample_20.csv\",  # You can override this to point to other dataset on S3\n        destination=\"\/opt\/ml\/processing\/2008_Beneficiary_Summary\",\n        input_name=\"2008_Beneficiary_Summary\",\n        s3_data_type=\"S3Prefix\",\n        s3_input_mode=\"File\",\n        s3_data_distribution_type=\"FullyReplicated\"\n    ))\n\n    # SageMaker session\n    sess = sagemaker.Session()\n    print(f\"Data Wrangler sagemaker session: {sess}\")\n    \n    # You can configure this with your own bucket name, e.g.\n    # bucket = &lt;my-own-storage-bucket&gt;\n    bucket = sess.default_bucket()\n    print(f\"Data Wrangler export storage bucket: {bucket}\")\n\n    # unique flow export ID\n    flow_export_id = f\"{time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}\"\n    flow_export_name = f\"flow-{flow_export_id}\"\n\n    # Output name is auto-generated from the select node's ID + output name from the flow file.\n    output_name = \"8b392709-d2c4-4b8e-bdda-e75b2d14f35e.default\"\n\n    s3_output_prefix = f\"export-{flow_export_name}\/output\"\n    s3_output_path = f\"s3:\/\/{bucket}\/{s3_output_prefix}\"\n    print(f\"Flow S3 export result path: {s3_output_path}\")\n\n    s3_processing_output = ProcessingOutput(\n        output_name=output_name,\n        source=\"\/opt\/ml\/processing\/output\",\n        destination=s3_output_path,\n        s3_upload_mode=\"EndOfJob\"\n    )\n\n    # name of the flow file which should exist in the current notebook working directory\n    flow_file_name = \"cms.flow\"\n\n    with open(flow_file_name) as f:\n        flow = json.load(f)\n\n    # Upload flow to S3\n    s3_client = boto3.client(\"s3\")\n    s3_client.upload_file(flow_file_name, bucket, f\"data_wrangler_flows\/{flow_export_name}.flow\")\n\n    flow_s3_uri = f\"s3:\/\/{bucket}\/data_wrangler_flows\/{flow_export_name}.flow\"\n\n    print(f\"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}\")\n\n    # Input - Flow: cms.flow\n    flow_input = ProcessingInput(\n        source=flow_s3_uri,\n        destination=\"\/opt\/ml\/processing\/flow\",\n        input_name=\"flow\",\n        s3_data_type=\"S3Prefix\",\n        s3_input_mode=\"File\",\n        s3_data_distribution_type=\"FullyReplicated\"\n    )\n\n    print(f\"ProcessinInput defined\")\n\n    # IAM role for executing the processing job.\n    iam_role = 'arn:aws:iam::123456789:role\/service-role\/AmazonSageMaker-ExecutionRole-20191002T141534'  # sagemaker.get_execution_role()\n\n    # Unique processing job name. Please give a unique name every time you re-execute processing jobs\n    processing_job_name = f\"data-wrangler-flow-processing-{flow_export_id}\"\n\n    # Data Wrangler Container URL.\n    container_uri = \"663277389841.dkr.ecr.us-east-1.amazonaws.com\/sagemaker-data-wrangler-container:1.x\"\n\n    # Processing Job Instance count and instance type.\n    instance_count = 2\n    instance_type = \"ml.m5.4xlarge\"\n\n    # Size in GB of the EBS volume to use for storing data during processing\n    volume_size_in_gb = 30\n\n    # Content type for each output. Data Wrangler supports CSV as default and Parquet.\n    output_content_type = \"CSV\"\n\n    # Network Isolation mode; default is off\n    enable_network_isolation = False\n\n    # Output configuration used as processing job container arguments \n    output_config = {\n        output_name: {\n            \"content_type\": output_content_type\n        }\n    }\n\n    processor = Processor(\n        role=iam_role,\n        image_uri=container_uri,\n        instance_count=instance_count,\n        instance_type=instance_type,\n        network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),\n        sagemaker_session=sess\n    )\n    print(f\"Processor defined\")\n\n    # Start Job\n\n    processor.run(\n        inputs=[flow_input] + data_sources, \n        outputs=[s3_processing_output],\n        arguments=[f\"--output-config '{json.dumps(output_config)}'\"],\n        wait=False,\n        logs=False,\n        job_name=processing_job_name\n    )\n    s3_job_results_path = f\"s3:\/\/{bucket}\/{s3_output_prefix}\/{processing_job_name}\"\n    print(f\"Job results are saved to S3 path: {s3_job_results_path}\")\n\n    job_result = sess.wait_for_processing_job(processing_job_name)\n    print(job_result)\n    return job_result<\/code><\/pre>\n<\/p><\/div>\n<h2>Create a Lambda function<\/h2>\n<p>Now that we have the required Python script, we can create a Lambda function. For instructions on creating a function using the Python runtime, see <a href=\"https:\/\/docs.aws.amazon.com\/lambda\/latest\/dg\/lambda-python.html\" target=\"_blank\" rel=\"noopener noreferrer\">Building Lambda functions with Python<\/a>. For more information about getting started with Lambda, see <a href=\"https:\/\/aws.amazon.com\/getting-started\/hands-on\/run-serverless-code\/\" target=\"_blank\" rel=\"noopener noreferrer\">Run a Serverless \u201cHello, World!\u201d with AWS Lambda<\/a>.<\/p>\n<p>The Lambda <a href=\"https:\/\/docs.aws.amazon.com\/lambda\/latest\/dg\/python-package-create.html#python-package-create-with-dependency\" target=\"_blank\" rel=\"noopener noreferrer\">runtime dependency<\/a> can be any package, module, or other assembly dependency that isn\u2019t included with the Lambda runtime environment for your function\u2019s code. If your code has a dependency on standard Python math or logging libraries or Boto3, you don\u2019t need to include the libraries in your .zip file. However, some ML-related packages, for example Pandas, NumPy, and SageMaker, need to be packaged within the Lambda .zip file.<\/p>\n<ol>\n<li>Run the following commands to include the SageMaker package within the Lambda .zip file:<\/li>\n<\/ol>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\"># Install the Amazon SageMaker modules in the specific folder\n$ pip install sagemaker --target sagemaker-installation\n# Remove tests and cache (to reduce size)\n$ find .\/sagemaker-installation -type d -name \"tests\" -exec rm -rfv {} +\n$ find .\/sagemaker-installation -type d -name \"__pycache__\" -exec rm -rfv {} \n\n# create a zipfile to be used by the AWS Lambda function \n$ zip -r lambda-deployment.zip sagemaker-installation\n\n# add flow file and lambda_function.py\n$ zip -g lambda-deployment.zip lambda_function.py cms.flow<\/code><\/pre>\n<\/p><\/div>\n<p>Now you can use this .zip file to create a Lambda function.<\/p>\n<ol start=\"2\">\n<li>On the Lambda console, create a function with the option <strong>Author from scratch<\/strong>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image017.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27271\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image017.png\" alt=\"\" width=\"1083\" height=\"567\"><\/a><\/li>\n<li>Upload your .zip file.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image019.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27272\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image019.png\" alt=\"\" width=\"1091\" height=\"388\"><\/a><\/li>\n<\/ol>\n<p>You can fulfill the requirement for NumPy within the Lambda runtime environment via Lambda layers.<\/p>\n<ol start=\"4\">\n<li>Add a base Python3.8 Scipy1x layer provided by AWS, which can be found via the Lambda function console.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image021.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27273\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image021.png\" alt=\"\" width=\"783\" height=\"629\"><\/a><\/li>\n<\/ol>\n<h2>Schedule your Lambda function using EventBridge<\/h2>\n<p>After you create and test your Lambda function, it\u2019s time to <a href=\"https:\/\/docs.aws.amazon.com\/eventbridge\/latest\/userguide\/eb-run-lambda-schedule.html\" target=\"_blank\" rel=\"noopener noreferrer\">schedule your function using EventBridge<\/a>. EventBridge is a serverless event bus service that makes it easy to connect your applications with data from a variety of sources. For this post, we schedule data to be processed every hour and saved to a specific S3 bucket.<\/p>\n<ol>\n<li>On the Amazon EventBridge console, on the <strong>Rules<\/strong> page, choose <strong>Create rule<\/strong>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image023.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27274\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image023.png\" alt=\"\" width=\"758\" height=\"311\"><\/a><\/li>\n<li>For <strong>Name<\/strong>, enter a name.<\/li>\n<li>For <strong>Description<\/strong>, enter an optional description.<\/li>\n<li>For <strong>Define pattern<\/strong>, select <strong>Schedule<\/strong>.<\/li>\n<li>Set the fixed rate to every 1 hour.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image025.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27275\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image025.png\" alt=\"\" width=\"660\" height=\"500\"><\/a><\/li>\n<li>For <strong>Target<\/strong>, choose <strong>Lambda function<\/strong>.<\/li>\n<li>For <strong>Function<\/strong>, choose <strong>Schedule_Flow<\/strong>.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image027.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27276\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image027.png\" alt=\"\" width=\"721\" height=\"429\"><\/a><\/li>\n<\/ol>\n<p>Alternatively, we can deploy the Lambda function and schedule using EventBridge with a serverless template file (see the following code). For an example on deploying a serverless template, see <a href=\"https:\/\/github.com\/aws-samples\/serverless-patterns\/tree\/main\/eventbridge-scheduled-lambda\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon EventBridge Scheduled to AWS Lambda<\/a>. The required template file and an example <code>lambda_function.py<\/code> are available on the <a href=\"https:\/\/github.com\/aws-samples\/schedule-sagemaker-data-wrangler-flow\" target=\"_blank\" rel=\"noopener noreferrer\">GitHub repo<\/a>.<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-python\">AWSTemplateFormatVersion: '2010-09-09'\nTransform: AWS::Serverless-2016-10-31\nDescription: Create a Lambda function to run the processing job and schedule from a cron job in EventBridge\n\nResources:\n  ScheduledFunction:\n    Type: AWS::Serverless::Function\n    Properties:\n      CodeUri: \n      Handler: lambda_function.lambda_handler\n      Runtime: python3.8\n      MemorySize: 128\n            \n      Events:\n        ScheduledFunction:\n          Type: Schedule\n          Properties:\n            Schedule: rate(1 hour)<\/code><\/pre>\n<\/p><\/div>\n<h2>Additional parameterization of the .flow file<\/h2>\n<p>Direct export of the .flow file works on the same data source and same query. But you can parameterize the exported script from the .flow file to handle changes in a data source, for example, to point to a different S3 bucket or different file, or change the query to pull incremental data from other data sources. In our preceding example code, we had the following query pull data from an Athena table:<\/p>\n<div class=\"hide-language\">\n<pre><code class=\"lang-sql\">query_string = \"select * from inpatient_claim where clm_from_dt between 20080100 and 20080200\"<\/code><\/pre>\n<\/p><\/div>\n<p>We can easily generate this query string with an additional step using the <code>generate_query()<\/code> function.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image029.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27277\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image029.png\" alt=\"\" width=\"846\" height=\"206\"><\/a><\/p>\n<p>We can call this <code>generate_query()<\/code> function within the <code>run_flow()<\/code> function to create a new query during runtime.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image031.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27278\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image031.png\" alt=\"\" width=\"744\" height=\"355\"><\/a><\/p>\n<p>We can also specify the output S3 bucket instead of using the default one, and parameterize the other auto-generated output paths if needed.<br \/><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image033.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-27279\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/17\/ML-3777-image033.png\" alt=\"\" width=\"1648\" height=\"668\"><\/a><\/p>\n<h2>Clean up<\/h2>\n<p>When you\u2019re finished, we recommend deleting all the AWS resources created in this demo to avoid additional recurring costs.<\/p>\n<ol>\n<li>Delete the EventBridge rule.<\/li>\n<li>Delete the Lambda function.<\/li>\n<li>Delete the .flow files and output files in the S3 bucket.<\/li>\n<li><a href=\"https:\/\/docs.aws.amazon.com\/sagemaker\/latest\/dg\/data-wrangler-shut-down.html\" target=\"_blank\" rel=\"noopener noreferrer\">Shut down Data Wrangler<\/a>.<\/li>\n<\/ol>\n<h2>Conclusion<\/h2>\n<p>In this post, we demonstrated how to automate a Data Wrangler flow to run on a schedule using a Lambda function with EventBridge. Additionally, we showed how we can parameterize the .flow file for different data sources or make changes in the query using custom functions within the Lambda function script. Most importantly, we showed how to automate an entire ML preprocessing workflow with a limited number of lines (about 10) of code.<\/p>\n<p>To get started with Data Wrangler, see <a href=\"https:\/\/aws.amazon.com\/blogs\/aws\/introducing-amazon-sagemaker-data-wrangler-a-visual-interface-to-prepare-data-for-machine-learning\/\" target=\"_blank\" rel=\"noopener noreferrer\">Introducing Amazon SageMaker Data Wrangler, a Visual Interface to Prepare Data for Machine Learning<\/a>. You can also explore advanced topics like <a href=\"https:\/\/aws.amazon.com\/blogs\/aws\/introducing-amazon-sagemaker-data-wrangler-a-visual-interface-to-prepare-data-for-machine-learning\/\" target=\"_blank\" rel=\"noopener noreferrer\">cross-account access for Data Wrangler<\/a> or <a href=\"https:\/\/aws.amazon.com\/blogs\/machine-learning\/prepare-data-from-snowflake-for-machine-learning-with-amazon-sagemaker-data-wrangler\/\" target=\"_blank\" rel=\"noopener noreferrer\">connecting to Snowflake<\/a> as a data source. For the latest information on Data Wrangler, see the <a href=\"https:\/\/aws.amazon.com\/sagemaker\/data-wrangler\/\" target=\"_blank\" rel=\"noopener noreferrer\">product page<\/a>.<\/p>\n<p>Data Wrangler makes it easy to prepare, process, explore without much experience with scripting languages like Python or Spark. With native connection to Feature Store, Processing, and Pipelines, the end-to-end ML model development and deployment is more user-friendly and intuitive.<\/p>\n<h2>Disclaimer<\/h2>\n<p>For our ML use case, we used the public data from <a href=\"https:\/\/www.cms.gov\/Research-Statistics-Data-and-Systems\/Downloadable-Public-Use-Files\/SynPUFs\/DE_Syn_PUF\" target=\"_blank\" rel=\"noopener noreferrer\">cms.gov<\/a> to create our demo .flow file to show an example where real data can come at regular intervals with a similar data model requiring a scheduled preprocessing job.<\/p>\n<h2>Citation<\/h2>\n<p>Centers for Medicare &amp; Medicaid Services. (2018). 2019 Health Insurance Exchange Public Use Files (Medicare Claims Synthetic Public Use Files) [<a href=\"https:\/\/www.cms.gov\/Research-Statistics-Data-and-Systems\/Downloadable-Public-Use-Files\/SynPUFs\/Downloads\/SynPUF_DUG.pdf\" target=\"_blank\" rel=\"noopener noreferrer\">Data file<\/a> and <a href=\"https:\/\/www.cms.gov\/files\/document\/de-10-codebook.pdf-0\" target=\"_blank\" rel=\"noopener noreferrer\">code book<\/a>]. Retrieved from https:\/\/www.cms.gov\/Research-Statistics-Data-and-Systems\/Downloadable-Public-Use-Files\/SynPUFs\/DE_Syn_PUF<\/p>\n<p>\u201cMedicare Claims Synthetic Public Use Files\u201d is made available under the <a href=\"https:\/\/opendatacommons.org\/licenses\/odbl\/1.0\/\" target=\"_blank\" rel=\"noopener noreferrer\">Open Database License<\/a>. Any rights in individual contents of the database are licensed under the <a href=\"https:\/\/opendatacommons.org\/licenses\/dbcl\/1.0\/\" target=\"_blank\" rel=\"noopener noreferrer\">Database Contents License<\/a>.<\/p>\n<hr>\n<h3>About the Authors<\/h3>\n<p><strong><a href=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/20\/Jayeeta-Ghosh2.png.jpg\"><img decoding=\"async\" loading=\"lazy\" class=\"wp-image-27370 size-full alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/20\/Jayeeta-Ghosh2.png.jpg\" alt=\"\" width=\"100\" height=\"149\"><\/a> Jayeeta Ghosh<\/strong> is a Data Scientist within ML ProServe, who works on AI\/ML projects for AWS customers and helps solve customer business problems across industries using deep learning and cloud expertise.<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p><strong><img decoding=\"async\" loading=\"lazy\" class=\"wp-image-27436 size-full alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2021\/08\/24\/Chenyang-Peter-Liu-1.jpg\" alt=\"\" width=\"100\" height=\"134\">Chenyang (Peter) Liu<\/strong> is a Senior Software Engineer on the Amazon SageMaker Data Wrangler team. He is passionate about low-code machine learning systems with state-of-art techniques. In his spare time, he is a foodie and enjoys road trips.<\/p>\n<p>       <!-- '\"` -->\n      <\/div>\n","protected":false},"excerpt":{"rendered":"<p>https:\/\/aws.amazon.com\/blogs\/machine-learning\/schedule-an-amazon-sagemaker-data-wrangler-flow-to-process-new-data-periodically-using-aws-lambda-functions\/<\/p>\n","protected":false},"author":0,"featured_media":810,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[3],"tags":[],"_links":{"self":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/809"}],"collection":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/comments?post=809"}],"version-history":[{"count":0,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/809\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media\/810"}],"wp:attachment":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media?parent=809"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/categories?post=809"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/tags?post=809"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}