Configuring and using EMR-Spark clusters

In this section, we will present two simple examples of EMR clusters suitable for basic Spark development. In the first example, we will spin up an EMR cluster, start the Spark shell, and do some Spark-Scala work. In the second example, we will spin up an EMR cluster and run a simple Spark program.

Follow the step-by-step instructions specified next for this hands-on exercise:

  1. Log in to the AWS Management Console and open the Amazon EMR console and click on the Create cluster button:
  1. We will use the Create Cluster - Quick Options for selecting the options for our cluster. Specify a name for the cluster (FirstEMRSparkClusterUsingQuickOptions). Choose Launch mode as Cluster

In the Software configuration section, ensure you have selected the latest available version of EMR and select the Spark option:

Leave the defaults for Hardware configuration unchanged. We will be creating a three-node cluster with suggested instance types:

Select your EC2 key pair from the dropdown list. Click on the Create cluster button:

  1. It can take a few minutes for the cluster to start up. You should see the following screen with a Starting message displayed (in green):
  1. After the cluster is up and running, the Starting message changes to Waiting:
  1. You can click on the AWS CLI export button to capture the EMR create cluster command that you can use to create this cluster (in the future) from the command line in one step:
  1. You can view the nodes and their status by clicking on the Hardware tab:
  1. You can view the defined jobs or steps by clicking on the Steps tab:
  1. Click on the Clusters link on the left-side menu for a listing of your clusters and their status:
  1. You can also list all the EMR clusters using AWS CLI command. Note the ID value for the cluster:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws emr list-clusters
  1. Go to the IAM console and edit the default security group inbound rules to include port 22. SSH into the EMR Master node as shown here (copy the public DNS for the master node from the Summary tab):
ssh -i ./AWSBook2EdKeyPair.pem [email protected]
  1. You can display the details of your cluster using the describe-cluster command and specifying the cluster ID (copy the cluster ID value from the EMR Clusters console). The output provides cluster-level details such as status, hardware and software configuration, VPC settings, bootstrap actions, and instance groups.
[hadoop@ip-172-31-23-43 ~]$ aws emr describe-cluster --cluster-id j-1O96RAZBTX0VI

{
"Cluster": {
"Status": {
"Timeline": {
"ReadyDateTime": 1517139426.457,
"CreationDateTime": 1517139044.608
},
"State": "WAITING",
"StateChangeReason": {
"Message": "Cluster ready after last step completed."
}
},
"Ec2InstanceAttributes": {
"EmrManagedMasterSecurityGroup": "sg-c086ddbc",
"RequestedEc2AvailabilityZones": [],
"RequestedEc2SubnetIds": [
"subnet-3305a255"
],
"Ec2SubnetId": "subnet-3305a255",
"IamInstanceProfile": "EMR_EC2_DefaultRole",
"Ec2KeyName": "AWSBook2EdKeyPair",
"Ec2AvailabilityZone": "us-west-2a",
"EmrManagedSlaveSecurityGroup": "sg-8482d9f8"
},
"Name": "FirstEMRSparkClusterUsingQuickOptions",
"ServiceRole": "EMR_DefaultRole",
"Tags": [],
"TerminationProtected": false,
"ReleaseLabel": "emr-5.11.1",
"NormalizedInstanceHours": 0,
"InstanceCollectionType": "INSTANCE_GROUP",
"Applications": [
{
"Version": "3.7.2",
"Name": "Ganglia"
},
{
"Version": "2.2.1",
"Name": "Spark"
},
{
"Version": "0.7.3",
"Name": "Zeppelin"
}
],
"KerberosAttributes": {},
"MasterPublicDnsName": "ec2-34-216-77-144.us-west-2.compute.amazonaws.com",
"ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
"InstanceGroups": [
{
"RequestedInstanceCount": 2,
"Status": {
"Timeline": {
"ReadyDateTime": 1517139426.484,
"CreationDateTime": 1517139044.625
},
"State": "RUNNING",
"StateChangeReason": {
"Message": ""
}
},
"Name": "Core Instance Group",
"InstanceGroupType": "CORE",
"EbsBlockDevices": [],
"ShrinkPolicy": {},
"Id": "ig-2BEMVHG1BGSXT",
"Configurations": [],
"InstanceType": "m3.xlarge",
"Market": "ON_DEMAND",
"RunningInstanceCount": 2
},
{
"RequestedInstanceCount": 1,
"Status": {
"Timeline": {
"ReadyDateTime": 1517139395.559,
"CreationDateTime": 1517139044.624
},
"State": "RUNNING",
"StateChangeReason": {
"Message": ""
}
},
"Name": "Master Instance Group",
"InstanceGroupType": "MASTER",
"EbsBlockDevices": [],
"ShrinkPolicy": {},
"Id": "ig-3VB29AP5DG07X",
"Configurations": [],
"InstanceType": "m3.xlarge",
"Market": "ON_DEMAND",
"RunningInstanceCount": 1
}
],
"VisibleToAllUsers": true,
"BootstrapActions": [],
"LogUri": "s3n://aws-logs-450394462648-us-west-2/elasticmapreduce/",
"AutoTerminate": false,
"Id": "j-1O96RAZBTX0VI",
"Configurations": [
{
"Properties": {
"maximizeResourceAllocation": "true"
},
"Classification": "spark"
}
]
}
}
  1. You can start the Spark shell as shown here:
[hadoop@ip-172-31-23-43 ~]$ spark-shell
  1. For illustration purposes, we will show a few processing steps in the Spark shell (using Scala). First, download a book (in the .txt format) from:  http://www.gutenberg.org/ebooks/35688. We will use the downloaded file as input. Go to the S3 console to create an S3 bucket. Specify the Bucket name (in our example, we have named it aurobindo-book). Click on the Next button: 
  1. Leave the default selections unchanged and click on the Next button:
  1. Review the Bucket parameters and click on the Create bucket button:
  1. You should see the newly created S3 bucket listed on the S3 console as shown in the following screenshot:
  1. Upload the input file to the newly created S3 bucket as shown here:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws s3 mv pg35688.txt s3://aurobindo-book/aliceinwonderland.txt
  1. Go to the specific S3 bucket's contents to see the file listed:
  1. Switch back to the Spark shell. Read the input file into a Spark RDD (Resilient Distributed Dataset) and display the first five lines (in the RDD) as shown next:
scala> val fileRDD = sc.textFile("s3://aurobindo-book/aliceinwonderland.txt")

scala> fileRDD.take(5).foreach(println)
The Project Gutenberg EBook of Alice in Wonderland, by Alice Gerstenberg
This eBook is for the use of anyone anywhere at no cost and with
almost no restrictions whatsoever. You may copy it, give it away or
re-use it under the terms of the Project Gutenberg License included
  1. The logic used in the following code basically computes the word count for all the words in the book:
scala> val counts = fileRDD.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _)
  1. We display the results for 10 words as shown here:
scala> counts.take(10).foreach(println)
(goals,1)
(brother,1)
(lessons;,1)
(dashes,3)
(alice,369)
(therefore,1)
(rate,1)
(kettles,1)
(hay,3)
(_picked_,1)
  1. Next, we sort the list and show the top 10 words and their count:
scala> val sorted_counts = counts.collect().sortBy(wc => -wc._2)

scala> sorted_counts.take(10).foreach(println)
(,4969)
(the,834)
(and,459)
(you,441)
(to,398)
(a,386)
(of,381)
(alice,369)
(i,328)
(it,236)
  1. We can store the sorted results in an S3 bucket as shown here:
scala> sc.parallelize(sorted_counts).saveAsTextFile("s3://aurobindo-book/wordcount-alice-in-wonderland")
  1. Finally, we go back to the EMR console and terminate the cluster (by clicking on the Terminate button).
  1. You should see a terminating message temporarily on the console. After a few minutes, the cluster is terminated:

In our second example, we create an EMR cluster as we did in the first example. We want to get comfortable spinning up cluster, perform the computations required, and terminate the cluster immediately after.

  1. Use your favorite editor to create a Spark Python program (containing the following code) and name it wordcount.py. Upload the program to a S3 bucket:
from __future__ import print_function
from pyspark import SparkContext
import sys

if __name__ == "__main__":
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])

# Compute the word count
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

#Save the results to S3
counts.saveAsTextFile(sys.argv[2])

sc.stop()
  1. From the EMR console, create a new EMR cluster and name it MSecondEMRSparkClusterUsingQuickOptions

Choose Spark in the Software configuration section choose Application options as shown here: 

Choose the default selections for Hardware configuration as shown:

Select the EC2 key pair as shown:

  1. List all the EMR clusters using AWS CLI command. Note the ID value for the cluster:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws emr list-clusters --active
{    "Clusters": [
{
"Status": {
"Timeline": {
"ReadyDateTime": 1517146171.077,
"CreationDateTime": 1517145843.445
},
"State": "WAITING",
"StateChangeReason": {
"Message": "Cluster ready after last step failed."
}
},
"NormalizedInstanceHours": 96,
"Id": "j-2996LIDHV5QLJ",
"Name": "SecondEMRSparkClusterUsingQuickOptions"
}
]
}
  1. After your cluster is up and running, click on the Add step button (from the Steps tab):
  1. Specify the parameters in the Add step screen to include the step type to be Spark application, give a name for the application, choose Cluster as the Deploy mode parameter, specify the location of the wordcount.py program on S3 (s3://aurobindo-book/wordcount.py) , and specify the input and output S3 buckets as arguments to the program as shown here. Then, click on the Add button:
aws emr add-steps --cluster-id j-2996LIDHV5QLJ --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,s3://aurobindo-book/wordcount.py,s3://aurobindo-book/aliceinwonderland.txt,s3://aurobindo-
book/pywordcount],ActionOnFailure=CONTINUE
{
"StepIds": [
"s-2R55949CWV0KH"
]
}
  1. You should see the newly created step in the Steps tab on the Clusters console. The Status field will change from Pending to Running to Completed:
  1. Go to the S3 console to check the contents of the output. Click on the pywordcount bucket (output S3 bucket), download one of the files and view the contents downloaded file:
(,4969)
(the,834)
(and,459)
(you,441)
(to,398)
(a,386)
(of,381)
(alice,369)
(i,328)
(it,236)
(in,217)
(_],208)
(queen,164)
(with,142)
(is,120)
(that,118)
...
(there,30)
(think,30)
(up,30)
(said,29)
(did,29)
(down,28)
(gutenberg,28)
(been,28)
(never,28)
(time,28)
(way,28)
  1. Apache Spark provides browser-based UIs for its jobs. In order to enable the Spark UI, click on the Enable Web Connection link (in the Summary tab) as shown here:
8. You should see the following screen with instructions of enabling Spark UI.
For more details on enabling Spark Web UIs, refer to the AWS documentation: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-history.html
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.63.231