Chapter 12: Orchestrating Amazon EMR Jobs with AWS Step Functions and Apache Airflow/MWAA

In the previous few chapters, we explained how you can leverage the EMR cluster for on-demand ETL jobs or long-running clusters that either execute a real-time streaming application or serve as a backend for interactive development using notebooks. But when we build a data pipeline to automate data ingestion, cleansing, or transformations, we look for orchestration tools with which we can build workflows that either get kicked off through a schedule or through an event.

There are two primary orchestration tools – AWS Step Functions and Apache Airflow, which are very popular in building data pipelines with Amazon EMR. AWS also provides a managed offering of Airflow, called Amazon Managed Workflows for Apache Airflow (MWAA).

In this chapter, we will provide an overview of AWS Step Functions and MWAA services and then explain how you can leverage them to orchestrate a data pipeline that can create EMR clusters, submit jobs, and terminate clusters as required. The following are the high-level topics we will be covering:

  • Overview of AWS Step Functions
  • Integrating AWS Step Functions to orchestrate EMR jobs
  • Overview of Apache Airflow and MWAA
  • Integrating Airflow to trigger EMR jobs

Getting an overview of these orchestration tool options will give you a starting point and you should be able to build more complex data pipelines that not only integrate Amazon EMR jobs but also other AWS and non-AWS services to automate your workflow.

Technical requirements

In this chapter, we will showcase the features of AWS Step Functions and MWAA and demonstrate how you can integrate them to trigger EMR jobs. So, before getting started, make sure you have the following requirements to hand.

  • An AWS account with access to create Amazon S3, Amazon EMR, AWS Step Functions, and MWAA resources
  • An IAM user with access to create IAM roles, which will be used to trigger or execute jobs

Now let's get an overview of these orchestration tools and learn how we can integrate them.

Overview of AWS Step Functions

AWS Step Functions is a serverless workflow service that provides integration with several AWS services natively, which means you can create a workflow that is able to integrate or invoke actions of all the supported AWS services.

AWS Step Functions provides both a visual interface and a JSON based-definition approach to design workflows. With the visual interface, you can drag and drop different AWS service actions and modify their parameters to integrate a workflow. In addition to the visual interface, Step Functions also provides the option to code your workflow with a JSON-based definition called a state machine, where each step is referred to as a state. Step Functions also provides a few sample projects that are frequently in use, which you can inherit and modify for your use case.

You can integrate AWS Step Functions to automate IT business processes or build data or machine learning pipelines, or can integrate it to design real-time, event-based streaming applications. When you start designing a workflow using Step Functions, you can choose either of the following types.

  • Standard: This is great for long-running batch workflows that can be related to building data analytics pipelines or automating IT processes that need durable and auditable workflows. Standard state machines can stay active for a year and are billed based on the number of state transitions.
  • Express: This is useful when you need to build an event-driven workflow that has a higher volume of requests compared to a batch workload that runs on a schedule. These workflows have a maximum execution timeout of 5 minutes and are billed based on the number of requests and the duration of the workflow.

When you design a workflow using Step Functions, each step or state might be any of the following types:

  • Pass: This allows you to skip and move to the next step.
  • Task: This allows you to invoke the actions of other AWS services, such as invoking an AWS Lambda function or triggering an EMR Spark Job.
  • Choice: This is similar to Switch-Case statements in programming languages, where you can define conditions and actions based on parameter values.
  • Wait: This enables you to introduce a wait time (in seconds) into the workflow.
  • Succeed: This enables you to successfully terminate the workflow.
  • Fail: This enables you to terminate the workflow with a fail status.
  • Parallel: This is an important feature that enables you to run multiple tasks in parallel.
  • Map: This is also an important type that allows you to iterate the complete workflow steps for a given set of records.

Out of all the state types that AWS Step Functions supports, the Task type is the most commonly used one as that enables you to invoke actions of other AWS services to build the workflow. Now, let's learn how you can leverage AWS Step Functions to build a workflow using Amazon EMR actions.

Integrating AWS Step Functions to orchestrate EMR jobs

AWS Step Functions supports createCluster, createCluster.sync, terminateCluster, terminateCluster.sync, addStep, cancelStep, setClusterTerminationProtection, modifyInstanceFleetByName, and modifyInstanceGroupByName EMR actions, which provides a great flexibility to build workflows on top of EMR.

Let's assume that you would like to build a workflow that gets triggered as soon as a file arrives in S3 and the objective of the workflow is to execute a Spark + Hudi job to process the input file. The workflow is supposed to create a transient EMR cluster, submit a Spark job that does ETL transforms, and then, upon completion of the job, terminate the cluster. You can easily build this workflow using AWS Step Functions' createCluster, addStep, and terminateCluster actions.

The following JSON definition is an example of a Step Functions' step that is of the Task type and invokes the EMR createCluster action with parameters that are required to create the cluster:

"Launch_EMR_Cluster":{

"Type":"Task",

"Resource":"arn:aws:states:::elasticmapreduce:createCluster.sync",

"Parameters":{

    "Name":"StepFn-EMR-Hudi",

    "ServiceRole":"EMR_DefaultRole",

    "JobFlowRole":"EMR_EC2_DefaultRole",

    "EbsRootVolumeSize":10,

    "ReleaseLabel":"emr-6.4.0",

    "Applications":[{"Name":"Hadoop"},{"Name":"Spark"},{"Name":"Hive"},{"Name":"Livy"}],

    "LogUri":"s3://<bucket-name>/emr/logs",

    "ManagedScalingPolicy":{

       "ComputeLimits":{

          "MaximumCapacityUnits":2,

          "MaximumCoreCapacityUnits":2,

          "MaximumOnDemandCapacityUnits":2,

          "MinimumCapacityUnits":1,

          "UnitType":"InstanceFleetUnits"

       }

    },

    "VisibleToAllUsers":true,

    "Instances":{

       "KeepJobFlowAliveWhenNoSteps":true,

       "Ec2KeyName":"<key-pair-name>",

       "Ec2SubnetId":"<subnet-id>",

       "InstanceFleets":[

          {

             "InstanceFleetType":"MASTER",

             "Name":"Master",

             "TargetOnDemandCapacity":1,

             "InstanceTypeConfigs":[{"InstanceType":"m5.xlarge"}]

          },

          {

             "InstanceFleetType":"CORE",

             "TargetOnDemandCapacity":1,

    "InstanceTypeConfigs":[{"InstanceType":"m5.xlarge"}]

          }

       ]

    }

},

"Next":"Copy_Hudi_JARs"

There are a few variables, including <bucket-name>, <key-pair-name>, and <subnet-id>, that you must replace before integrating this state.

The following JSON definition represents an example of a Step Functions' step that adds a step or job to the EMR cluster. You may observe that it references the $.ClusterId variable from the previous createCluster action to submit a job to the same cluster:

"Trigger_Spark_Job":{

"Type":"Task",

"ResultPath":"$.Result",

"Catch":[

    {

       "ErrorEquals":["States.ALL"],

       "ResultPath":"$.error-info",

       "Next":"Terminate_EMR_Cluster"

    }

],

"Resource":"arn:aws:states:::elasticmapreduce:addStep.sync",

"Parameters":{

    "ClusterId.$":"$.ClusterId",

    "Step":{

       "Name":"Spark Transform Step",

       "ActionOnFailure":"CONTINUE",

       "HadoopJarStep":{

          "Jar":"command-runner.jar",

          "Args":["spark-submit", "--deploy-mode", "cluster", "--jars", "/usr/lib/hudi/hudi-spark-bundle.jar", "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer", "s3://<bucket-name>/<script-path-name>.py"

          ]

       }

    }

},

"Next":"Terminate_EMR_Cluster"

},

As you can see, it receives the PySpark script path as a parameter. You must replace the <bucket-name> and <script-path-name> variables before integrating this into your parent state machine definition.

After you complete all your ETL transformation steps, if you plan to integrate the terminateCluster step, then you can refer to the following JSON definition that invokes the EMR terminateCluster step:

"Terminate_EMR_Cluster":{

"Type":"Task",

"Resource":"arn:aws:states:::elasticmapreduce:terminateCluster.sync",

"Parameters":{"ClusterId.$":"$.ClusterId"},

"End":true

}

Please note that .sync in all these EMR actions represents the fact that Step Functions will wait for the step to be completed before moving on to the next step.

The complete state machine definition is available here: https://github.com/PacktPublishing/Simplify-Big-Data-Analytics-with-Amazon-EMR-/blob/main/chapter_12/emr-cluster-job-step-functions.json. This can be downloaded, modified, and integrated into your AWS account. The following is a snapshot of Step Functions' visual representation of the workflow when it is being executed. If you notice, it has an additional step after creating a cluster, which is to copy the Hudi JARs. If you recollect from Chapter 11, Implementing UPSERT on S3 Data Lake with Apache Hudi, we have performed this manual JAR copy operation by SSHing to master nodes, which can also be automated using the Step Functions' task.

Figure 12.1 – AWS Step Functions visual workflow for EMR jobs

Figure 12.1 – AWS Step Functions visual workflow for EMR jobs

The workflow visualization uses color-coding to represent the status of the tasks. Green indicates that the task has been completed successfully, while blue indicates that the task is currently being executed, and red means that the task failed with an error. While being executed, you can click each task to see its status, the input parameters passed to it, and the output or error it generated following its execution.

After learning how AWS Step Functions can be integrated to trigger EMR jobs, next, let's get an overview of Apache Airflow and MWAA and see how they help to trigger EMR jobs.

Overview of Apache Airflow and MWAA

Apache Airflow is an open source workflow management framework that allows you to build workflows using the Python programming language. It has the following fundamental differences compared to AWS Step Functions:

  • Being an Apache open source project, Airflow not only supports AWS services, but also supports integration with other public cloud providers and open source projects such as Apache Sqoop, Apache Spark, and many more.
  • AWS Step Functions provides a low-code, JSON-based definition, whereas Airflow is more popular with programmers as you need to design a workflow by writing Python scripts.
  • AWS Step Functions provides a serverless offering, whereas Airflow needs infrastructure provisioned to act as a cluster on top of which you can run multiple jobs.

From a use case perspective, Airflow is a great fit when your workflow involves AWS and non-AWS services. For example, not all your applications are in AWS; a few are on-premises and a few are in another cloud. You would like to build a workflow that invokes an on-premises job, and then triggers an Amazon EMR job and some other cloud job. In this case, AWS Step Functions is not useful as you need to invoke non-AWS jobs and Airflow is a great fit for that. It is very popular in the open source world for designing complex workflows.

To take away the infrastructure provisioning and management overhead, AWS started offering Amazon Managed Workflows for Apache Airflow (MWAA), which is highly available, secure, and is also scalable to serve resource requirements. It is the same as open source Airflow, with support for the same open source integrations, but is a managed offering where you can create a cluster using just a few clicks. It also integrates with AWS CloudWatch and CloudTrail for monitoring and auditing capabilities.

Now, let's see how you can integrate Airflow to invoke EMR jobs using Python.

Integrating Airflow to trigger EMR jobs

Airflow provides the following API functions to interact with the Amazon EMR cluster:

  • EmrCreateJobFlowOperator: This method enables you to create an EMR cluster.
  • EmrJobFlowSensor: This helps to check the status of the EMR cluster.
  • EmrAddStepsOperator: With this, you can add a step to the EMR cluster.
  • EmrStepSensor: This helps to check the status of an existing step in your EMR cluster.
  • EmrModifyClusterOperator: This is used to modify an existing cluster.
  • EmrTerminateJobFlowOperator: This enables you to terminate an existing cluster.

As explained, you can design a workflow in Airflows using the Python programming language, where you can define each action and then define the sequence of execution. The following is sample Python code that executes the EmrCreateJobFlowOperator method of Airflow that triggers an EMR create cluster action:

cluster_create_action = EmrCreateJobFlowOperator(

    task_id='create_job_flow_task',

    job_flow_overrides=<JOB_FLOW_OVERRIDES>

)

You can pass a cluster configuration through the <JOB_FLOW_OVERRIDES> parameter that follows a JSON structure.

After a cluster is created, if you need to add a step to the existing EMR cluster, then you can use the EmrAddStepsOperator method, as shown in the following sample code, which takes EMR step details through a JSON configuration that is a <SPARK_STEPS> variable:

add_step_action = EmrAddStepsOperator(

    task_id='add_steps',

    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",

    aws_conn_id='aws_default',

    steps=<SPARK_STEPS>,

)

After you have defined all your steps, you can define the sequence of execution by writing something like the following:

cluster_create_action >> add_step_action

These are a few sample code blocks to explain how you can write Python scripts to define a workflow in Airflow. You can refer to the Airflow documentation to learn more about its methods and way of integration.

Summary

Over the course of this chapter, we have provided an overview of AWS Step Functions, Apache Airflow, and MWAA. In addition, we have shared example code blocks to explain how you can define Step Functions' state machine or write Python code to design a workflow for Airflow.

That concludes this chapter! Hopefully, this helped you get an idea of how you integrate these services to design workflows and will provide a starting point to design more complex data or machine learning pipelines. In the next chapter, we will explain how you can migrate your on-premises Hadoop workloads to Amazon EMR.

Test your knowledge

Before moving on to the next chapter, test your knowledge with the following questions:

  1. Assume you are designing a data pipeline that needs to process two input files as two parallel steps and then invoke a common ETL process to aggregate the output of these parallel steps. You have decided to leverage AWS Step Functions to orchestrate the pipeline. Which Task types will you be integrating and how?
  2. Assume you have a few Hadoop workloads running on-premises and a few Spark ETL jobs running in Amazon EMR. To simplify orchestration and monitoring, you are looking for an orchestration tool. While comparing different options, you found that AWS Step Functions and MWAA are the two best options. Which of them is better suited to your workload and why?

Further reading

The following are a few resources you can refer to for further reading:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.175.101