In the previous two chapters, we learned how to implement a batch ETL pipeline with Amazon EMR and real-time streaming with Spark Streaming. In this chapter, we will learn how to implement UPSERT or merge on your Amazon S3 data lake using the Apache Hudi framework integrated with Apache Spark.
Amazon S3 is immutable by default, which means you cannot update the content of an object or file in S3. Instead, you have to read its content, then modify it and write a new object. Currently, as data lake and lake house architectures are becoming popular, organizations look for update capability on Amazon S3 or other object stores. Frameworks such as Apache Hudi, Apache Iceberg, and AWS Lake Formation Governed Tables have started offering ACID transactions and UPSERT capabilities on data lakes.
Apache Hudi is a popular open source framework that is integrated into Amazon EMR and AWS Glue and is also very popular within the open source community.
In this chapter, we will learn how you can integrate the Apache Hudi framework with Apache Spark to update and delete data from your data lake. To showcase this capability, we will use an EMR notebook so that you can learn how you can do interactive development on a long-running EMR cluster. The following are the topics that we will cover in this chapter:
Getting an overview of Apache Hudi and its integration with Spark and Amazon EMR will give you a starting point to learn how to integrate the UPSERT feature in a data lake, which might help you with General Data Protection Regulation (GDPR) compliance or other regulatory requirements that compel you to update or delete data based on certain filter criteria.
In this chapter, we will showcase interactive development using an EMR notebook and the Apache Spark and Apache Hudi frameworks. So, before getting started, make sure you have the following:
Now, let's dive deep into the use case and hands-on implementation steps starting with the overview of Apache Hudi.
Check out the following video to see the Code in Action at https://bit.ly/3svY3i9
Apache Hudi is an open source framework, which is popular for providing record-level transaction support on top of data lakes. The Hudi framework supports integration with open file formats such as Parquet and stores additional metadata for its operations.
Apache Hudi provides several capabilities and the following are the most popular ones:
Hudi supports both read and write-heavy workloads. When you write data to an Amazon S3 data lake using Hudi APIs, you have the option to specify either of the following storage types:
After writing the data with the appropriate storage type, you can query it using any of the following logical views that Hudi offers:
The following diagram shows how CoW works for different insert and update transactions:
Let's explain the preceding diagram:
As you can see, the snapshot shows all the records as the merging happens instantly during writing action itself.
Now, let's learn how the MoR table type handles the same transactions. The following diagram represents the flow:
As explained earlier, CoW does the delta data merging instantly, whereas MoR does the merging while reading. As you can see in the preceding diagram, there is a new view called the Read Optimized view that shows all latest data by getting it from the MoR table and the latest committed data from CoW tables.
Now that we understand how Hudi works, let's look at some of the popular use cases for which the Hudi framework is most appropriate.
The following are some of the popular use cases for which the Hudi framework is widely adopted:
From the 5.28.0 release, Amazon EMR supports integration with Hudi, which means it installs Hudi-related libraries when you create an EMR cluster with Hive, Spark, or Presto.
Like other Hive metastore tables, if you register your Hudi dataset with a Hive metastore then you can query your Hudi table using the Hive, Spark SQL, or Presto query engines. In addition, you can also integrate Hudi tables with your AWS Glue catalog.
As explained at the beginning of this chapter, Hudi provides two options when you write to a dataset: one is CoW and the other is MoR. When you register a table as MoR, then in your metastore, you will see two separate tables. One table has the original name that you specified, and another additional table will have a suffix of _rt to provide a real-time view of the data.
If you are using Spark to register a table with Hudi, then you should set the HIVE_SYNC_ENABLED_OPT_KEY option to true. Alternatively, you can also use the hive_sync_tool CLI utility to register your Hudi data as a metastore table in Hive or as a Glue catalog metastore.
Before getting started with our use case, we need to create an EMR cluster and then create an EMR notebook that points to the EMR cluster we have created. Let's assume this EMR cluster is a long-running cluster that is active to support your development workloads as you plan to do interactive development with EMR notebooks.
Now let's learn how to create the EMR cluster and notebook.
As explained in Chapter 5, Setting Up and Configuring EMR Clusters, to create an EMR cluster, follow these steps:
The following is a screenshot of the EMR release and applications we have selected:
After a few minutes, you will notice the cluster status changes to Running when the initial Setup hadoop debugging default job runs, and after the job is complete, it changes to Waiting, which means all the resources are provisioned and we are ready to submit jobs to the cluster.
Let's see how to create an EMR notebook that points to our EMR cluster.
To create an EMR notebook, follow these steps:
The following screenshot shows the Create notebook form in the EMR console:
Clicking on the Create notebook button will take you to the EMR Notebook's detail page, with a status of Starting. In a few minutes, the status will change to Ready.
Now you can click the Open in Jupyter button on the notebook detail page, as shown in the following screenshot:
This will take you to Jupyter Notebook, where you can create interactive notebooks for development. Jupyter Notebook provides options to create a notebook in languages such as PySpark, SparkR, Python 3, or the Linux command-line Terminal.
As a next step, before moving onto the Spark and Hudi implementation, let's first create an Amazon S3 bucket, which we can use to store Hudi datasets.
Please refer to the following steps to create the S3 bucket, which we have followed in previous chapters too:
We have called our bucket hudi-data-repository and kept everything else as the default values.
The following screenshot shows the AWS console, using which we have created the bucket:
As we have now created all the resources, next we will dive deep into our use case implementation with Spark and Hudi.
Our EMR cluster and notebook are now ready for use. Let's learn how to do interactive development using an EMR notebook.
For interactive development, we are considering a use case where we will integrate the Hudi framework with Spark to do UPSERT (update/merge) operations on top of an S3 data lake.
Let's navigate to our EMR notebook to get started.
To get started, in Jupyter Notebook, choose New and then PySpark, as shown in the following screenshot:
This will create a new PySpark notebook. In every cell, you can write scripts and execute them line by line for easy development or debugging.
Next, we will learn how to integrate Hudi libraries with the notebook.
By default, Hudi libraries are not available in our EMR notebook. To make them available, you need to copy the Hudi Java ARchive (JAR) files from the EMR cluster's master node to HDFS so that the EMR notebook can refer to them. Follow these steps to do so:
The following screenshot shows the cluster details page, from where you can copy the master node's public DNS URL:
For our implementation, we used PuTTY to SSH to the EMR master node, and to do that, we specified the master public DNS URL in the Session | Host Name field and then specified the EC2 key pair private key (.ppk) under Connection | SSH | Auth, as shown in the following screenshot:
When you click Open and try to connect for the first time, PuTTY will ask you to confirm whether you trust the connection. Click Yes, as shown in the following screenshot:
After you click Yes, you will be prompted to enter the login user name, for which you should type hadoop. That will connect successfully, as shown in the following screenshot:
Once you are successfully logged in, you can execute the following commands to copy the Hudi JAR files from the master node's local filesystem to HDFS.
First, create a new directory path in HDFS. We have used the /applications/hudi/lib path in HDFS, but you can use your own path. You need to keep a note of this path so that you can use it in your EMR notebook:
hdfs dfs -mkdir -p /applications/hudi/lib
Then execute the following command to copy the hudi-spark-bundle.jar file to HDFS:
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /applications/hudi/lib/hudi-spark-bundle.jar
Finally, execute the following command to copy the spark-avro.jar file to HDFS:
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /applications/hudi/lib/spark-avro.jar
Once these two files are copied to HDFS, you can refer to them in your EMR notebook.
After you have the Hudi JARs available in HDFS, you can navigate back to your EMR PySpark notebook and execute the following command:
%%configure -f
{
"conf": {
"spark.jars":"hdfs:///applications/hudi/lib/hudi-spark-bundle.jar,hdfs:///applications/hudi/lib/spark-avro.jar",
"spark.sql.hive.convertMetastoreParquet":"false",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer"
}
}
This makes the Hudi JARs available to SparkContext so that the Spark code can use its libraries. The following screenshot shows the output you should see in your notebook:
After our setup is complete, let's look at a few Hudi APIs and example scripts using which you can create CoW or MoR tables and can do transactions on top of the tables.
Now that our notebook is ready, with all the required JARs, let's dive into Hudi APIs that enable ACID transactions on top of our data lake. We will use PySpark scripts to interact with Hudi functions.
For our use case, we have created an example product inventory dataset, which has product_id, category, product_name, quantity_available, and last_update_time fields. We will write the input data to our S3 data lake in Hudi format, then will update and delete some records and then query to validate the transactional updates.
Let's now work throughout this step by step.
The first step for us is to ingest new product inventory data in our data lake. product_id is the unique key to identify a product and the category field is used to partition the data in S3.
With the following code, we create a Spark DataFrame using some product records:
# Create a DataFrame that represents Product Inventory
inputDF = spark.createDataFrame(
[
("100", "Furniture", "Product 1", "25", "2021-12-01T09:51:39.340396Z"),
("101", "Cosmetic", "Product 2", "20", "2021-12-01T10:14:58.597216Z"),
("102", "Furniture", "Product 3", "30", "2021-12-01T11:51:40.417052Z"),
("103", "Electronics", "Product 4", "10", "2021-12-01T11:51:40.519832Z"),
("104", "Electronics", "Product 5", "50", "2021-12-01T11:58:00.512679Z")
],
["product_id", "category", "product_name", "quantity_available", "last_update_time"]
)
Next, we create the hudiOptions configuration variable, which specifies the Hudi parameters that represent the table name, record key, partition key, and more:
# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'product_inventory',
'hoodie.datasource.write.recordkey.field': 'product_id',
'hoodie.datasource.write.partitionpath.field': 'category',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'product_inventory',
'hoodie.datasource.hive_sync.partition_fields': 'category',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
Next, we can write the product records to Amazon S3 using the hudiOptions configuration variable:
# Write the product Inventory DataFrame as a Hudi dataset to S3
inputDF.write.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'insert')
.options(**hudiOptions)
.mode('overwrite')
.save('s3://hudi-data-repository/product-inventory/')
As you can see, the hoodie.datasource.write.operation parameter has a value of insert to represent its new insert. The following screenshot shows the output of the execution:
After the successful execution of the preceding code, you can navigate to your S3 bucket to validate the data being written in Hudi format with partition folders for the category field. The following screenshot shows the output:
After the initial data is written, we can try to update and delete a few records.
Before updating records, let's first query the data and validate how it looks. The following code block reads data from s3://hudi-data-repository/product-inventory with /*/* so that it reads all partitions:
# Read the Hudi dataset from S3 and validate your field output
HudiProductDF = spark.read.format('org.apache.hudi').load('s3://hudi-data-repository/product-inventory' + '/*/*')
HudiProductDF.select("product_id", "category", "product_name", "quantity_available", "last_update_time").show()
After executing this code, you should see the following output, which shows the data of all the five products you have ingested:
Next, let's assume you have sold one unit of product ID 102 and would like to update its quantity to 29. The following code shows how to do this. As you can see, the hoodie.datasource.write.operation parameter value is set to upsert:
# Update quanity of product_id 102
from pyspark.sql.functions import col,lit
newDF = inputDF.filter(inputDF.product_id==102).withColumn('quantity_available',lit('29'))
newDF.write
.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'upsert')
.options(**hudiOptions)
.mode('append')
.save('s3://hudi-data-repository/product-inventory/')
Next, let's assume you have stopped selling product ID 101 and would like to delete it from your inventory table. The following code block shows how to delete a product using Hudi. As you can see, we have passed an additional parameter, hoodie.datasource.write.payload.class, with a value of org.apache.hudi.common.model.EmptyHoodieRecordPayload to represent deleting the record:
# Delete product record with ID 101
deleteDF = inputDF.filter(inputDF.product_id==101)
deleteDF.write
.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'upsert')
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload')
.options(**hudiOptions)
.mode('append')
.save('s3://hudi-data-repository/product-inventory/')
After the executing both the update and delete transactions, we can validate the output by executing the following PySpark script, which reads the updated data from S3:
# Read from S3 to validate the update and delete record
HudiProductNewDF = spark.read.format('org.apache.hudi').load('s3://hudi-data-repository/product-inventory' + '/*/*')
HudiProductNewDF.select("product_id", "category", "product_name", "quantity_available", "last_update_time")
.orderBy("product_id").show()
The following screenshot shows the output of the execution. The record with product_id 101 is missing and the record with product_id 102 has a quantity value of 29:
So far, we have learned how to create Hudi tables, ingest new data, and update and delete datasets using Spark and Hudi. Next, let's learn how to query incremental data and some of the additional metadata attributes Hudi provides.
In our previous execution, when we tried to query data, we defined specific columns to validate our data. Now let's try to print the complete dataset and make a note of the additional attributes Hudi appends:
# List all columns on the dataframe to showcase additional metadata fields Hudi appends
HudiProductNewDF.show()
The following screenshot highlights the additional attributes Hudi appends, and all of them are prefixed with _:
As you can see in this output, all the records have a hoodie commit time of 20211205222848, except product_id 102, which has a hoodie commit time of 20211205225705 because we updated that record after our initial data ingestion.
Next, let's try to query the incremental data by listing all the records changed after a certain timestamp, such as 20211205222848. The following code block shows how to query incremental data using Hudi:
# Incremental query output, that fetches change data beyond certain time
incrementalQueryOptions = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': "20211205222848",
}
incQueryDF = spark.read.format('org.apache.hudi').options(**incrementalQueryOptions)
.load('s3://hudi-data-repository/product-inventory')
incQueryDF.show()
The following screenshot highlights the output we get after executing the incremental query and, as you can see, there is one record, that is, product_id 102:
In this section, we have walked you through the Spark and Hudi code using which you can insert, update, delete, and query Hudi datasets. Having the UPSERT capability on top of our S3 data lake provides a lot of flexibility in terms of meeting compliance requirements or having a golden copy for querying. This should give you a starting point for your Hudi implementation, and you can refer to the Hudi documentation for additional information.
Note
During this chapter, we have executed Hudi scripts using an EMR notebook, but you can automate the execution by saving the script into a Python file and submitting it as an EMR step job, as explained in Chapter 9, Implementing Batch ETL Pipeline with Amazon EMR and Apache Spark.
Over the course of this chapter, we have dived deep into Apache Hudi and looked at its features, use cases, and how it is integrated with AWS and Amazon EMR.
We have covered how to create an EMR notebook that points to a long-running EMR cluster and how to use the notebook for interactive development. To showcase interactive development, we explained a small use case using Spark and Hudi, which can enable you to do UPSERT transactions on top of a data lake.
That concludes this chapter! Hopefully, this has helped you get an idea of how to use EMR notebooks for interactive development. In the next chapter, we will explain how to build a workflow to build a data pipeline using Amazon EMR.
Before moving on to the next chapter, test your knowledge with the following questions:
Here are a few resources you can refer to for further reading:
18.222.20.122