Databricks is a data engineering product built on top of Apache Spark that provides a unified, cloud-optimized platform so that you can perform Extract, Transform, and Load (ETL), Machine Learning (ML), and Artificial Intelligence (AI) tasks on a large quantity of data.
Azure Databricks, as its name suggests, is the Databricks integration with Azure, which also provides fully managed Spark clusters, an interactive workspace for data visualization and exploration, integration with data sources such as Azure Blob Storage, Azure Data Lake Storage, Azure Cosmos DB, and Azure SQL Data Warehouse.
Azure Databricks can process data from multiple and diverse data sources, such as SQL or NoSQL, structured or unstructured data, and streaming data sources, and also scale up as many servers as required to cater to any data growth.
By the end of the chapter, you will have learned how to configure Databricks, work with storage accounts, process data using Scala, store processed data in Delta Lake, and visualize the data in Power BI.
In this chapter, we’ll cover the following recipes:
For this chapter, you will need the following:
In this recipe, we’ll learn how to configure the Azure Databricks environment by creating an Azure Databricks workspace, cluster, and cluster pools.
To get started, log in to https://portal.azure.com using your Azure credentials.
An Azure Databricks workspace is the starting point for writing solutions in Azure Databricks. A workspace is where you create clusters, write notebooks, schedule jobs, and manage the Azure Databricks environment.
An Azure Databricks workspace can be created in an Azure-managed virtual network or customer-managed virtual network. In this recipe, we will create a Databricks cluster in an Azure-managed network. Let’s get started:
Figure 7.1 – Creating a Databricks resource
Figure 7.2 – Creating a Databricks workspace
Once the resource is created, go to the Databricks workspace that we created (go to portal.azure.com, click on All resources and search for pactadedatabricks). Perform the following steps to create a Databricks cluster:
Figure 7.3 – Launching the workspace
Figure 7.4 – Creating a cluster
There are two types of cluster: Interactive and Automated. Interactive clusters are created manually by users so that they can interactively analyze the data while working on, or even developing, a data engineering solution. Automated clusters are created automatically when a job starts and are terminated as and when the job completes.
Figure 7.5 – Creating a cluster configuration
There are two major cluster modes: Standard and High Concurrency. Standard cluster mode uses single-user clusters, optimized to run tasks one at a time. The High Concurrency cluster mode is optimized to run multiple tasks in parallel; however, it only supports R, Python, and SQL workloads, and doesn’t support Scala.
These autoscaling options allow Databricks to provision as many nodes as required to process a task within the limit, as specified by the Min workers and Max workers options.
The Terminate after option terminates the clusters when there’s no activity for a given amount of time. In our case, the cluster will auto-terminate after 10 minutes of inactivity. This option helps save costs.
There are two types of cluster nodes: Worker type and Driver type. The Driver type node is responsible for maintaining a notebook’s state information, interpreting the commands being run from a notebook or a library, and co-ordinates with Spark executors. The Worker type nodes are the Spark executor nodes, which are responsible for distributed data processing.
The Advanced options section can be used to configure Spark configuration parameters, environment variables, and tags, configure Secure Shell (SSH) in the clusters, enable logging, and run custom initialization scripts at the time of cluster creation.
Azure Databricks pools reduce cluster startup and autoscaling time by keeping a set of idle, ready-to-use instances without the need for creating instances when required. To create Azure Databricks pools, execute the following steps:
Figure 7.6 – Creating a Databricks cluster pool
Min Idle specifies the number of instances that will be kept idle and available without terminating. The Idle Instance Auto Terminate setting doesn’t apply to these instances. Max Capacity limits the maximum number of instances to this number, including idle and running ones. This helps with managing cloud quotas and their costs.
The Azure Databricks runtime is a set of core components or software that runs on your clusters. There are different runtimes, depending on the type of workload you have.
Figure 7.7 – Attaching a Databricks cluster to a pool
Figure 7.8 – Databricks clusters attached to a pool
We can add multiple clusters to a pool. Whenever an instance, such as dbcluster01, requires an instance, it’ll attempt to allocate the pool’s idle instance. If an idle instance isn’t available, the pool expands to get new instances, as long as the number of instances is under the maximum capacity.
Azure Key Vault is a useful service for storing keys and secrets that are used by various other services and applications. It is important to integrate Azure Key Vault with Databricks, as you could store the credentials of objects such as a SQL database or data lake inside the key vault. Once integrated, Databricks can reference the key vault, obtain the credentials, and access the database or data lake account. In this recipe, we will cover how you can integrate Databricks with Azure Key Vault.
Create a Databricks workspace and a cluster as explained in the Configuring the Azure Databricks environment recipe of this chapter.
Log in to portal.azure.com, click Create a resource, search for Key Vault, and click Create. Provide the key vault details, as shown in the following screenshot, and click Review + create:
Figure 7.9 – Creating a key vault
Perform the following steps to integrate Databricks with Azure Key Vault:
Figure 7.10 – The Databricks URL
Figure 7.11 – The key vault resource ID
Figure 7.12 – Key vault scope creation
You will receive confirmation that a secret scope called datalakekey has been successfully added. This completes the integration between Databricks and Azure Key Vault.
Upon completion of the preceding steps, all users with access to the Databricks workspace will be able to extract secrets and keys from the key vault and use them in a Databricks notebook to perform the desired operations.
Behind the scenes, Databricks uses a service principal to access the key vault. As we create the scope in the Databricks portal, Azure will grant the relevant permissions required for the Databricks service principal on the key vault. You can verify as much using the following steps:
Figure 7.13 – Verifying permissions
Accessing data from Azure Data Lake is one of the fundamental steps of performing data processing in Databricks. In this recipe, we will learn how to mount an Azure Data Lake container in Databricks using the Databricks service principal. We will use Azure Key Vault to store the Databricks service principal ID and the Databricks service principal secret that will be used to mount a data lake container in Databricks.
Create a Databricks workspace and a cluster, as explained in the Configuring the Azure Databricks environment recipe of this chapter.
Create a key vault in Azure and integrate it with Azure Databricks, as explained in the Integrating Databricks with Azure Key Vault recipe.
Create an Azure Data Lake account, as explained in the Provisioning an Azure Storage account using the Azure portal recipe of Chapter 1, Creating and Managing Data in Azure Data Lake.
Go to the Azure Data Lake Storage account created in the Azure portal (click All resources, then search for packtadestoragev2). Click Containers. Click + Container:
Figure 7.14 – Adding a container
Provide a container name of databricks and click Create:
Figure 7.15 – Creating a container
Mounting the container in Databricks will involve the following high-level steps:
The detailed steps are as follows:
Figure 7.16 – App registration
Figure 7.17 – Registering an application
Figure 7.18 – Copying the Application ID
Figure 7.19 – Adding a client secret
Figure 7.20 – The secret value
Figure 7.21 – Creating secrets
Figure 7.22 – Storing secrets
Figure 7.23 – secrets added
Figure 7.24 – Adding role assignment
Figure 7.25 – Storage Blob Data Contributor
Figure 7.26 – Adding members to the Storage Blob Data Contributor role
Figure 7.27 – Selecting members for the Storage Blob Data Contributor role
Figure 7.28 – Creating a notebook
Figure 7.29 – The notebook name
val appsecret = dbutils.secrets.get(scope="datalakekey",key="appsecret")
val ApplicationID = dbutils.secrets.get(scope="datalakekey",key="ApplicationID")
val DirectoryID = dbutils.secrets.get(scope="datalakekey",key="DirectoryID")
val endpoint = "https://login.microsoftonline.com/" + DirectoryID + "/oauth2/token"
val configs = Map(
"fs.azure.account.auth.type" -> "OAuth",
"fs.azure.account.oauth.provider.type" -> "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> ApplicationID,
"fs.azure.account.oauth2.client.secret" -> appsecret,
"fs.azure.account.oauth2.client.endpoint" -> endpoint)
// Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
source = "abfss://[email protected]/",
mountPoint = "/mnt/datalakestorage",
extraConfigs = configs)
Upon running the script, the data lake container will be successfully mounted:
Figure 7.30 – Mounting the data lake container notebook
On Azure AD, we registered an application that created a service principal. Service principals are entities that applications can use to authenticate themselves to Azure services. We provided permissions for the application ID on the container to be accessed, which grants permission to the service principal created. We stored the credentials of the service principal (the application ID and secret) in Azure Key Vault to ensure secure access to credentials. Databricks obtains the service principal credentials (the application ID and secret) from the key vault and uses the security context of the service principal to access the Azure Data Lake account. Databricks, while mounting the data lake account, retrieves the application ID, directory ID, and secret from the key vault and uses the service principal context to access the Azure Data Lake account. This process makes for a very secure method of accessing a data lake account for the following reasons:
Databricks notebooks are the fundamental component in Databricks for performing data processing tasks. In this recipe, we will perform operations such as reading, filtering, cleaning a Comma-Separated Value (CSV) file, and gaining insights from it using a Databricks notebook written in Scala code.
Create a Databricks workspace and a cluster, as explained in the Configuring the Azure Databricks environment recipe.
Download the covid-data.csv file from the path at https://github.com/PacktPublishing/Azure-Data-Engineering-Cookbook-2nd-edition/blob/main/chapter07/covid-data.csv.
Let’s process some data using Scala in a Databricks notebook by following the steps provided here:
Figure 7.31 – Creating a notebook
Figure 7.32 – Creating the processdata notebook
Figure 7.33 – Uploading data
Figure 7.34 – Uploading data
Figure 7.35 – Uploading the covid-data.csv file
val covid_raw_data = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/FileStore/shared_uploads/[email protected]/covid_data.csv")
The result of the command is provided here:
Figure 7.36 – Reading the CSV data
display(covid_raw_data)
The result of the command is provided here:
Figure 7.37 – Displaying the CSV data
covid_raw_data.count()
The result of the command is provided here:
Figure 7.38 – Displaying the row count
val covid_remove_duplicates = covid_raw_data.dropDuplicates()
The result of the command is provided here:
Figure 7.39 – Removing duplicates
covid_remove_duplicates.printSchema()
The result of the preceding command is provided in the following screenshot:
Figure 7.40 – Removing duplicates
val covid_selected_columns = covid_remove_duplicates.select("iso_code","location","continent","date","new_deaths_per_million","people_fully_vaccinated","population")
The result of the preceding command is provided in the following screenshot:
Figure 7.41 – Loading selected columns
val covid_clean_data = covid_selected_columns.na.drop()
covid_clean_data.count()
The result of the preceding command is provided in the following screenshot:
Figure 7.42 – Removing NULL values
covid_clean_data.createOrReplaceTempView("covid_view")
The result of the preceding command is provided in the following screenshot:
Figure 7.43 – Creating a temporary view
%sql
SELECT iso_code, location, continent,
SUM(new_deaths_per_million) as death_sum,
MAX(people_fully_vaccinated * 100 / population) as percentage_vaccinated FROM covid_view
WHERE population > 1000000
GROUP BY iso_code,location,continent
ORDER BY death_sum desc
The result of the command is provided here:
Figure 7.44 – Insights using the SQL query
Figure 7.45 – Generating a bar graph
Figure 7.46 – Plot options
Figure 7.47 – Customizing the plot
Figure 7.48 – Visual insights
DataFrames are the fundamental objects used to store runtime data during data processing in Databricks. DataFrames are in-memory objects and extremely well-optimized for performing advanced analytics operations.
A CSV file was loaded to the Databricks File System (DBFS) storage, which is the default local storage available when a Databricks workspace is created. We can perform the same activities in a data lake account too, by uploading the CSV file to the data lake container and mounting the data lake container, as explained in the Mounting an Azure Data Lake container in Databricks recipe.
After loading the data to a DataFrame, we were able to cleanse the data by performing operations such as removing unwanted columns, dropping duplicates, and deleting rows with NULL values easily using Spark functions. Finally, by creating a temporary view out of the DataFrame, we were able to analyze the DataFrame’s data using SQL queries and get visual insights using Databricks' visualization capabilities.
Data processing can be performed using notebooks, but to operationalize it, we need to execute it at a specific scheduled time, depending upon the demands of the use case or problem statement. After a notebook has been created, you can schedule a notebook to be executed at a preferred frequency using job clusters. This recipe will demonstrate how you could schedule a notebook using job clusters.
Create a Databricks workspace, as explained in the Configuring the Azure Databricks environment recipe.
In the following steps, we will import the SampleJob.dbc notebook file into the Databricks workspace and schedule it to be run daily:
Figure 7.49 – Creating folder insights
Figure 7.50 – Create a Job folder
Figure 7.51 – Importing a notebook
Figure 7.52 – Importing a notebook
Figure 7.53 – The imported notebook
Figure 7.54 – Jobs in the Create menu
Figure 7.55 – Create Job
Figure 7.56 – Editing the cluster configuration
Figure 7.57 – Save the cluster configuration
Figure 7.58 – Edit schedule configuration
Figure 7.59 – Setting the schedule configuration
Figure 7.60 – Run now
Figure 7.61 – Job runs
Figure 7.62 – Job runs
Figure 7.63 – A completed run
Figure 7.64 – The notebook output
Figure 7.65 – Adding a new task
Figure 7.66 – Adding a new task
Figure 7.67 – Multiple tasks
The imported notebook was set to run at a specific schedule using the Databricks job scheduling capabilities. While scheduling the jobs, New Cluster was selected, instead of picking any cluster available in the workspace. Picking New Cluster implies a cluster will be created each time the job runs and will be destroyed once the job completes. This also means the jobs need to wait for an additional 2 minutes for the cluster to be created for each run.
Adding multiple notebooks to the same job via additional tasks allows us to reuse the job cluster created for the first notebook execution, and the second task needn’t wait for another cluster to be created. Usage of multiple tasks and the dependency option allows us to orchestrate complex data processing flows using Databricks notebooks.
Delta Lake databases are Atomicity, Consistency, Isolation, and Durability (ACID) property-compliant databases available in Databricks. Delta Lake tables are tables in Delta Lake databases that use Parquet files to store data and are highly optimized for performing analytic operations. Delta Lake tables can be used in a data processing notebook for storing preprocessed or processed data. The data stored in Delta Lake tables can be easily consumed in visualization tools such as Power BI.
In this recipe, we will create a Delta Lake database and Delta Lake table, load data from a CSV file, and perform additional operations such as UPDATE, DELETE, and MERGE on the table.
Create a Databricks workspace and a cluster, as explained in the Configuring the Azure Databricks environment recipe of this chapter.
Download the covid-data.csv file from this link: https://github.com/PacktPublishing/Azure-Data-Engineering-Cookbook-2nd-edition/blob/main/chapter07/covid-data.csv.
Upload the covid-data.csv file to the workspace, as explained in step 1 to step 6 of the How to do it… section of the Processing data using notebooks recipe.
In this recipe, let’s create a Delta Lake database and tables, and process data using the following steps:
Figure 7.68 – A new notebook
Figure 7.69 – Delta notebook creation
CREATE DATABASE covid
The output is displayed in the following screenshot:
Figure 7.70 – Delta database creation
CREATE TEMPORARY VIEW covid_data
USING CSV
OPTIONS (path "/FileStore/shared_uploads/[email protected]/covid_data.csv", header "true", mode "FAILFAST")
The output is displayed in the following screenshot:
Figure 7.71 – Temporary view creation
The table name is provided in <database name>.<table name> to ensure it belongs to the Delta Lake database created. To insert the data from the view into the new table, the table is created using the CREATE TABLE command, followed by the AS command, followed by the SELECT statement against the view:
CREATE OR REPLACE TABLE covid.covid_data_delta
USING DELTA
LOCATION ‘/FileStore/shared_uploads/[email protected]/covid_data_delta’
AS
SELECT iso_code,location,continent,date,new_deaths_per_million,people_fully_vaccinated,population FROM covid_data
The output is displayed in the following screenshot:
Figure 7.72 – Creating the Delta table
Figure 7.73 – The created Delta table
DELETE FROM covid.covid_data_delta where population is null or people_fully_vaccinated is null or new_deaths_per_million is null or location is null
The output is displayed in the following screenshot:
Figure 7.74 – Deleting a few rows in a Delta table
delete from covid.covid_data_delta;
Select count(*) from covid.covid_data_delta;
The output is displayed in the following screenshot:
Figure 7.75 – Deleting all rows
select * from covid.covid_data_delta version as of 0;
The output is displayed in the following screenshot:
Figure 7.76 – Check out an older version of the Delta table
RESTORE TABLE covid_data_delta TO VERSION AS OF 0
The output is displayed in the following screenshot:
Figure 7.77 – Restoring the table to an older version
UPDATE covid_data_delta SET population = population * 1.2 WHERE continent = ‘Asia’;
DELETE FROM covid_data_delta WHERE continent = ‘Europe’;
The output is displayed in the following screenshot:
Figure 7.78 – Update and delete
This can be achieved using the following code:
MERGE INTO covid_data_delta source
USING covid_data_delta TIMESTAMP AS OF "2022-02-19 16:45:00" target
ON source.location = target.location and source.date = target.date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
The MERGE command takes the covid_data_delta table as the source table to be updated or inserted. Instead of specifying version numbers, we can also use timestamps to obtain older versions of the table. covid_data_delta TIMESTAMP AS OF "2022-02-19 16:45:00" takes the version of the table as of February 19, 2022, 16:45:00 – UTC time. WHEN MATCHED THEN UPDATE SET * updates all the columns in the table when the condition specified in the ON clause matches. When the condition doesn’t match, the rows are inserted from the older version to the current version of the table. The output, as expected, shows that the rows that were deleted in step 11 were successfully reinserted:
Figure 7.79 – The merge statement
Delta tables offer advanced capabilities for processing data, such as support for UPDATE, DELETE, and MERGE statements. MERGE statements and the versioning capabilities of Delta tables are very powerful in ETL scenarios, where we need to perform UPSERT (update if it matches, insert if it doesn’t) operations against various tables.
These capabilities for supporting data modifications and row versioning are made possible because Delta tables maintain the changes to the table via a transaction log file stored in JSON format. The transaction files are located in the same location where the table was created but in a subfolder called _delta_log. By default, the log files are retained for 30 days and can be controlled using the delta.logRetentionDuration table property. The ability to read older versions is also controlled by the delta.logRetentionDuration property.
Delta Lake databases are commonly used to store processed data in Delta Lake tables, which is then ready to be consumed by reporting-layer applications such as Power BI. Delta Lake tables are best suited for handling analytic workloads from Power BI, as Delta Lake tables use Parquet files as storage, which offer optimal performance for analytic workloads.
In this recipe, we will use Power BI Desktop, connect to a Delta Lake table, and build a simple report in Power BI.
Create a Databricks workspace and a cluster as explained in the Configuring the Azure Databricks environment recipe.
Download the latest version of Power BI Desktop from https://powerbi.microsoft.com/en-us/downloads/ and install Power BI Desktop on your machine.
Perform the following steps to connect a Delta Lake table to a Power BI report and create visualizations:
Figure 7.80 – Compute
Figure 7.81 – Advanced options for the cluster
Figure 7.82 – The JDBC/ODBC connection string
Figure 7.83 – Importing the notebook
Figure 7.84 – Import notebook
Figure 7.85 – Run all cells
Figure 7.86 – Start Power BI Desktop
Figure 7.87 – Starting Power BI Desktop
Figure 7.88 – Connecting to Databricks
Figure 7.89 – Connecting to the Delta database
Figure 7.90 – Connecting to Azure
Figure 7.91 – Connecting to the Delta table
Figure 7.92 – Loading the Delta table
Figure 7.93 – Adding a clustered column chart
Figure 7.94 – Adding another visual
Figure 7.95 – Get insights using visuals
We used the Azure AD credentials to sign in to Azure Databricks and extracted the connection string from the Databricks cluster. When the connection request comes from Power BI, Databricks authenticates using Azure AD credentials. For the authentication to succeed, the Databricks cluster needs to be up and running. Once authenticated, Delta Lake tables are accessible, just as with any other database tables using Power BI. We added two simple visuals to explore the data visually. Clicking on one of the visuals automatically filters the other visual, which allows us to get insights out of the data easily.
52.15.160.43