{"id":310,"date":"2020-09-30T03:16:16","date_gmt":"2020-09-30T03:16:16","guid":{"rendered":"https:\/\/machine-learning.webcloning.com\/2020\/09\/30\/running-on-demand-serverless-apache-spark-data-processing-jobs-using-amazon-sagemaker-managed-spark-containers-and-the-amazon-sagemaker-sdk\/"},"modified":"2020-09-30T03:16:16","modified_gmt":"2020-09-30T03:16:16","slug":"running-on-demand-serverless-apache-spark-data-processing-jobs-using-amazon-sagemaker-managed-spark-containers-and-the-amazon-sagemaker-sdk","status":"publish","type":"post","link":"https:\/\/salarydistribution.com\/machine-learning\/2020\/09\/30\/running-on-demand-serverless-apache-spark-data-processing-jobs-using-amazon-sagemaker-managed-spark-containers-and-the-amazon-sagemaker-sdk\/","title":{"rendered":"Running on-demand, serverless Apache Spark data processing jobs using Amazon SageMaker managed Spark containers and the Amazon SageMaker SDK"},"content":{"rendered":"<div id=\"\">\n<p>Apache Spark is a unified analytics engine for large scale, distributed data processing. Typically, businesses with Spark-based workloads on AWS use their own stack built on top of <a href=\"https:\/\/aws.amazon.com\/ec2\/\" target=\"_blank\" rel=\"noopener noreferrer\">Amazon Elastic Compute Cloud<\/a> (Amazon EC2), or Amazon EMR to run and scale Apache Spark, Hive, Presto, and other big data frameworks. This is useful for persistent workloads, in which you want these Spark clusters to be up and running 24\/7, or at best, would have to come up with an architecture to spin up and spin down the cluster on a schedule or on demand.<\/p>\n<p>Amazon SageMaker Processing lets you easily run preprocessing, postprocessing, model evaluation or other fairly generic transform workloads on a fully managed infrastructure. Previously, Amazon SageMaker Processing included a built-in container for Scikit-learn style preprocessing. For using other libraries like Spark, you have the flexibility to bring in your own Docker containers. Amazon SageMaker Processing jobs can also be part of your Step Functions workflow for ML involving pre- and post-processing steps. For more information, see <a href=\"https:\/\/aws.amazon.com\/about-aws\/whats-new\/2020\/08\/aws-step-functions-adds-support-for-amazon-sagemaker-processing\/\" target=\"_blank\" rel=\"noopener noreferrer\">AWS Step Functions adds support for Amazon SageMaker Processing<\/a>.<\/p>\n<p>Several machine learning(ML) workflows involve preprocessing data with Spark (or other libraries) and then passing in training data to a training step. The following workflow shows an Extract, Transform and Load (ETL) step that leads to model training and finally to model endpoint deployment using AWS Step Functions.<\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-16360 aligncenter\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/1-Flowchart-4.jpg\" alt=\"\" width=\"441\" height=\"531\"><\/p>\n<p>Including Spark steps in such workflows requires additional steps to provision and set up these clusters. Alternatively, you can do using AWS Glue, a fully managed ETL service that makes it easy for customers to write Python or Scala based scripts to preprocess data for ML training.<\/p>\n<p>We\u2019re happy to add a managed Spark container and associated SDK enhancements to Amazon SageMaker Processing, which lets you perform large scale, distributed processing on Spark by simply submitting a PySpark or Java\/Scala Spark application. You can use this feature in Amazon SageMaker Studio and Amazon SageMaker notebook instances.<\/p>\n<p>To demonstrate, the following code example runs a PySpark script on Amazon SageMaker Processing by using the\u00a0<code>PySparkProcessor<\/code>:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">from sagemaker.spark.processing import PySparkProcessor\r\n\r\nspark_processor = PySparkProcessor(\r\n    base_job_name=\"sm-spark\",\r\n    framework_version=\"2.4\",\r\n    role=role,\r\n    instance_count=2,\r\n    instance_type=\"ml.c5.xlarge\",\r\n    max_runtime_in_seconds=1200,\r\n) \r\n\r\nspark_processor.run(\r\n    submit_app_py=\".\/path\/to\/your\/preprocess.py\",\r\n    arguments=['s3_input_bucket', bucket,\r\n               's3_input_key_prefix', input_prefix,\r\n               's3_output_bucket', bucket,\r\n               's3_output_key_prefix', input_preprocessed_prefix],\r\n    spark_event_logs_s3_uri='s3:\/\/' + bucket + '\/' + prefix + '\/spark_event_logs',\r\n    logs=False\r\n)\r\n<\/code><\/pre>\n<\/div>\n<p>We can look at this example in some more detail. The PySpark script name \u2018<code>preprocess.py<\/code>\u2019 such as the one shown below, that loads a large CSV file from Amazon Simple Storage Service (Amazon S3) into a Spark dataframe, fits and transforms this dataframe into an output dataframe, and converts and saves a CSV back to Amazon S3:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">import time\r\nimport sys\r\nimport os\r\nimport shutil\r\nimport csv\r\n\r\nimport pyspark\r\nfrom pyspark.sql import SparkSession\r\nfrom pyspark.ml import Pipeline\r\nfrom pyspark.sql.types import StructField, StructType, StringType, DoubleType\r\nfrom pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler\r\nfrom pyspark.sql.functions import *\r\n\r\n\r\ndef csv_line(data):\r\n    r = ','.join(str(d) for d in data[1])\r\n    return str(data[0]) + \",\" + r\r\n\r\n\r\ndef main():\r\n    spark = SparkSession.builder.appName(\"PySparkApp\").getOrCreate()\r\n    \r\n    # Convert command line args into a map of args\r\n    args_iter = iter(sys.argv[1:])\r\n    args = dict(zip(args_iter, args_iter))\r\n\r\n    spark.sparkContext._jsc.hadoopConfiguration().set(\"mapred.output.committer.class\",\r\n                                                      \"org.apache.hadoop.mapred.FileOutputCommitter\")\r\n    \r\n    # Defining the schema corresponding to the input data. The input data does not contain the headers\r\n    schema = StructType([StructField(\"sex\", StringType(), True), \r\n                         StructField(\"length\", DoubleType(), True),\r\n                         StructField(\"diameter\", DoubleType(), True),\r\n                         StructField(\"height\", DoubleType(), True),\r\n                         StructField(\"whole_weight\", DoubleType(), True),\r\n                         StructField(\"shucked_weight\", DoubleType(), True),\r\n                         StructField(\"viscera_weight\", DoubleType(), True), \r\n                         StructField(\"shell_weight\", DoubleType(), True), \r\n                         StructField(\"rings\", DoubleType(), True)])\r\n\r\n    # Downloading the data from S3 into a Dataframe\r\n    total_df = spark.read.csv(('s3:\/\/' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],'abalone.csv')), header=False, schema=schema)\r\n\r\n    #StringIndexer on the sex column which has categorical value\r\n    sex_indexer = StringIndexer(inputCol=\"sex\", outputCol=\"indexed_sex\")\r\n    \r\n    #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)\r\n    sex_encoder = OneHotEncoder(inputCol=\"indexed_sex\", outputCol=\"sex_vec\")\r\n\r\n    #vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format\r\n    assembler = VectorAssembler(inputCols=[\"sex_vec\", \r\n                                           \"length\", \r\n                                           \"diameter\", \r\n                                           \"height\", \r\n                                           \"whole_weight\", \r\n                                           \"shucked_weight\", \r\n                                           \"viscera_weight\", \r\n                                           \"shell_weight\"], \r\n                                outputCol=\"features\")\r\n    \r\n    # The pipeline comprises of the steps added above\r\n    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])\r\n    \r\n    # This step trains the feature transformers\r\n    model = pipeline.fit(total_df)\r\n    \r\n    # This step transforms the dataset with information obtained from the previous fit\r\n    transformed_total_df = model.transform(total_df)\r\n    \r\n    # Split the overall dataset into 80-20 training and validation\r\n    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])\r\n    \r\n    # Convert the train dataframe to RDD to save in CSV format and upload to S3\r\n    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\r\n    train_lines = train_rdd.map(csv_line)\r\n    train_lines.saveAsTextFile('s3:\/\/' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))\r\n    \r\n    # Convert the validation dataframe to RDD to save in CSV format and upload to S3\r\n    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\r\n    validation_lines = validation_rdd.map(csv_line)\r\n    validation_lines.saveAsTextFile('s3:\/\/' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))\r\n\r\n\r\nif __name__ == \"__main__\":\r\n    main()\r\n\r\n<\/code><\/pre>\n<\/div>\n<p>You can easily start a Spark based processing job by using the PySparkProcessor() class as shown below:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">from sagemaker.spark.processing import PySparkProcessor\r\n\r\n# Upload the raw input dataset to S3\r\ntimestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\r\nprefix = 'sagemaker\/spark-preprocess-demo\/' + timestamp_prefix\r\ninput_prefix_abalone = prefix + '\/input\/raw\/abalone'\r\ninput_preprocessed_prefix_abalone = prefix + '\/input\/preprocessed\/abalone'\r\nmodel_prefix = prefix + '\/model'\r\n\r\nsagemaker_session.upload_data(path='.\/data\/abalone.csv', bucket=bucket, key_prefix=input_prefix_abalone)\r\n\r\n# Run the processing job\r\nspark_processor = PySparkProcessor(\r\n    base_job_name=\"sm-spark\",\r\n    framework_version=\"2.4\",\r\n    role=role,\r\n    instance_count=2,\r\n    instance_type=\"ml.c5.xlarge\",\r\n    max_runtime_in_seconds=1200,\r\n)\r\n\r\nspark_processor.run(\r\n    submit_app_py=\".\/code\/preprocess.py\",\r\n    arguments=['s3_input_bucket', bucket,\r\n               's3_input_key_prefix', input_prefix_abalone,\r\n               's3_output_bucket', bucket,\r\n               's3_output_key_prefix', input_preprocessed_prefix_abalone],\r\n    spark_event_logs_s3_uri='s3:\/\/' + bucket + '\/' + prefix + '\/spark_event_logs',\r\n    logs=False\r\n)\r\n<\/code><\/pre>\n<\/div>\n<p>When running this in Amazon SageMaker Studio or Amazon SageMaker notebook instance, the output shows the job\u2019s progress:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">Job Name:  sm-spark-&lt;...&gt;\r\nInputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3:\/\/&lt;bucketname&gt;\/&lt;prefix&gt;\/preprocess.py', 'LocalPath': '\/opt\/ml\/processing\/input\/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]\r\nOutputs:  [{'OutputName': 'output-1', 'S3Output': {'S3Uri': 's3:\/\/&lt;bucketname&gt;\/&lt;prefix&gt;', 'LocalPath': '\/opt\/ml\/processing\/spark-events\/', 'S3UploadMode': 'Continuous'}}]\r\n<\/code><\/pre>\n<\/div>\n<p>In Amazon SageMaker Studio, you can describe your processing jobs and view relevant details by choosing the processing job name (right-click), and choosing <strong>Open in trial details<\/strong>.<\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-16361\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/2-Interface.jpg\" alt=\"\" width=\"900\" height=\"409\"><\/p>\n<p>You can also track the processing job\u2019s settings, logs, and metrics on the Amazon SageMaker console as shown in the following screenshot.<\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-16359\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/2-Screenshot-5.jpg\" alt=\"\" width=\"900\" height=\"244\"><\/p>\n<p>After a job completes, if the spark_event_logs_s3_uri was specified in the run() function, the Spark UI can be viewed by running the history server:<\/p>\n<p><code>spark_processor.start_history_server()<\/code><\/p>\n<p>If run from an Amazon SageMaker Notebook instance, the output will include a proxy URL where the history server can be accessed:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-code\">Starting history server...\r\nHistory server is up on https:\/\/&lt;your-notebook&gt;.notebook.us-west-2.sagemaker.aws\/proxy\/15050\r\n<\/code><\/pre>\n<\/div>\n<p>Visiting this URL will bring you to the history server web interface as shown in the screenshot below:<\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-16362\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/3-HistoryServer.jpg\" alt=\"\" width=\"900\" height=\"219\"><\/p>\n<p>Additional python and jar file dependencies can also be specified in your Spark jobs. For example, if you want to serialize an MLeap model, you can specify these additional dependencies by modifying the call to the run() function of PySparkProcessor:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">spark_processor.run(\r\n    submit_app_py=\".\/code\/preprocess-mleap.py\",\r\n    submit_py_files=[\".\/spark-mleap\/mleap-0.15.0.zip\"],\r\n    submit_jars=[\".\/spark-mleap\/mleap-spark-assembly.jar\"],\r\n    arguments=['s3_input_bucket', bucket,\r\n               's3_input_key_prefix', input_prefix_abalone,\r\n               's3_output_bucket', bucket,\r\n               's3_output_key_prefix', input_preprocessed_prefix_abalone],\r\n    logs=False\r\n)\r\n<\/code><\/pre>\n<\/div>\n<p>Finally, overriding Spark configuration is crucial for several tasks such as tuning your Spark application or configuring the Hive metastore. You can override Spark, Hive, Hadoop configurations using our Python SDK.<\/p>\n<p>For example, the following code overrides <code>spark.executor.memory<\/code> and <code>spark.executor.cores<\/code>:<\/p>\n<div class=\"hide-language\">\n<pre class=\"unlimited-height-code\"><code class=\"lang-python\">spark_processor = PySparkProcessor(\r\n    base_job_name=\"sm-spark\",\r\n    framework_version=\"2.4\",\r\n    role=role,\r\n    instance_count=2,\r\n    instance_type=\"ml.c5.xlarge\",\r\n    max_runtime_in_seconds=1200,\r\n)\r\n\r\nconfiguration = [{\r\n  \"Classification\": \"spark-defaults\",\r\n  \"Properties\": {\"spark.executor.memory\": \"2g\", \"spark.executor.cores\": \"1\"},\r\n}]\r\n\r\nspark_processor.run(\r\n    submit_app_py=\".\/code\/preprocess.py\",\r\n    arguments=['s3_input_bucket', bucket,\r\n               's3_input_key_prefix', input_prefix_abalone,\r\n               's3_output_bucket', bucket,\r\n               's3_output_key_prefix', input_preprocessed_prefix_abalone],\r\n    configuration=configuration,\r\n    logs=False\r\n)\r\n<\/code><\/pre>\n<\/div>\n<p>Try out <a href=\"https:\/\/github.com\/awslabs\/amazon-sagemaker-examples\/tree\/master\/sagemaker_processing\/spark_distributed_data_processing\" target=\"_blank\" rel=\"noopener noreferrer\">this example<\/a> on your own by navigating to the examples tab in your Amazon SageMaker notebook instance, or by cloning the Amazon SageMaker examples directory and navigating to the folder with Amazon SageMaker Processing examples.<\/p>\n<p>Additionally, you can set up an end-to-end Spark workflow for your use cases using Amazon SageMaker and other AWS services:<\/p>\n<h2>Conclusion<\/h2>\n<p>Amazon SageMaker makes extensive use of Docker containers to allow users to build a runtime environment for data preparation, training, and inference code. Amazon SageMaker\u2019s built-in Spark container for Amazon SageMaker Processing provides a managed Spark runtime including all library components and dependencies needed to run distributed data processing workloads. The example discussed in the blog shows how developers and data scientists can take advantage of the built-in Spark container on Amazon SageMaker to focus on more important aspects of preparing and preprocessing data. Instead of spending time tuning, scaling, or managing Spark infrastructure, developers can focus on core implementation.<\/p>\n<hr>\n<h3>About the Authors<\/h3>\n<p><strong>\u00a0<img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-16369 alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/Shreyas-Subramanian-100.jpg\" alt=\"\" width=\"100\" height=\"134\">Shreyas Subramanian<\/strong> is a AI\/ML specialist Solutions Architect, and helps customers by using Machine Learning to solve their business challenges using the AWS platform.<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p><strong><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-16368 alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/Andrew-Packer-100.jpg\" alt=\"\" width=\"100\" height=\"133\"><\/strong><strong>A<\/strong><strong>ndrew Packer<\/strong> is a Software Engineer in Amazon AI where he is excited about building scalable, distributed machine learning infrastructure for the masses. In his spare time, he likes playing guitar and exploring the PNW.<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p>\u00a0<\/p>\n<p><strong><img decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-16370 alignleft\" src=\"https:\/\/d2908q01vomqb2.cloudfront.net\/f1f836cb4ea6efb2a0b1b99f41ad8b103eff4b59\/2020\/09\/25\/vidhi-kastuar-100.jpg\" alt=\"\" width=\"100\" height=\"135\"><\/strong><strong>Vidhi Kastuar<\/strong> is a Sr. Product Manager for Amazon SageMaker, focusing on making machine learning and artificial intelligence simple, easy to use and scalable for all users and businesses. Prior to AWS, Vidhi was Director of Product Management at Veritas Technologies. For fun outside work, Vidhi loves to sketch and paint, work as a career coach, and spend time with her family and friends.<\/p>\n<\/p><\/div>\n","protected":false},"excerpt":{"rendered":"<p>https:\/\/aws.amazon.com\/blogs\/machine-learning\/running-on-demand-serverless-apache-spark-data-processing-jobs-using-amazon-sagemaker-managed-spark-containers-and-the-amazon-sagemaker-sdk\/<\/p>\n","protected":false},"author":0,"featured_media":311,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[3],"tags":[],"_links":{"self":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/310"}],"collection":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/comments?post=310"}],"version-history":[{"count":0,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/posts\/310\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media\/311"}],"wp:attachment":[{"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/media?parent=310"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/categories?post=310"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/salarydistribution.com\/machine-learning\/wp-json\/wp\/v2\/tags?post=310"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}