© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2021
R. C. L'EsteveThe Definitive Guide to Azure Data Engineeringhttps://doi.org/10.1007/978-1-4842-7182-7_6

6. Load Data Lake Storage Gen2 Files into a Synapse Analytics Dedicated SQL Pool

Ron C. L’Esteve1  
(1)
Chicago, IL, USA
 

Chapter 4 showed you how to create a dynamic, parameterized, and metadata-driven process to fully load data from an on-premises SQL Server to Azure Data Lake Storage Gen2. This chapter will demonstrate how to fully load all of the snappy compressed parquet data files from ADLS Gen2 into an Azure dedicated SQL pool.

Azure Data Factory’s sink Copy activity allows three different copy methods for loading data into an Azure dedicated SQL pool, which is part of the Azure Synapse Analytics ecosystem. This chapter will explore these three methods using a dynamic and parameterized ADF pipeline:
  1. 1)

    PolyBase

     
  2. 2)

    Copy command

     
  3. 3)

    Bulk insert

     

First, there is some prep work to do by creating the datasets and the pipelines demonstrated in the first few sections of the chapter. Toward the end of the chapter, the three methods for loading data that have been listed previously will be discussed and demonstrated.

Recreate the Pipeline Parameter Table

Begin by recreating the pipeline_parameter table in the ADF_DB that you had created in Chapter 4 to make it much more robust in preparation for the ADF pipeline that will be built in this chapter.

The following is the code to recreate the table. It first drops the table if the table exists. Then it creates the table anew:
USE [ADF_DB]
go
/****** Object:  Table [dbo].[pipeline_parameter]    ******/
IF EXISTS (SELECT *
           FROM   sys.objects
           WHERE  object_id = Object_id(N'[dbo].[pipeline_parameter]')
                  AND type IN ( N'U' ))
  DROP TABLE [dbo].[pipeline_parameter]
go
/****** Object:  Table [dbo].[pipeline_parameter]  ******/
SET ansi_nulls ON
go
SET quoted_identifier ON
go
CREATE TABLE [dbo].[pipeline_parameter]
  (
     [parameter_id]                       [INT] IDENTITY(1, 1) NOT NULL,
     [server_name]                        [NVARCHAR](500) NULL,
     [src_type]                           [NVARCHAR](500) NULL,
     [src_schema]                         [NVARCHAR](500) NULL,
     [src_db]                             [NVARCHAR](500) NULL,
     [src_name]                           [NVARCHAR](500) NULL,
     [dst_type]                           [NVARCHAR](500) NULL,
     [dst_schema]                         [NVARCHAR](500) NULL,
     [dst_name]                           [NVARCHAR](500) NULL,
     [include_pipeline_flag]              [NVARCHAR](500) NULL,
     [partition_field]                    [NVARCHAR](500) NULL,
     [process_type]                       [NVARCHAR](500) NULL,
     [priority_lane]                      [NVARCHAR](500) NULL,
     [pipeline_date]                      [NVARCHAR](500) NULL,
     [pipeline_status]                    [NVARCHAR](500) NULL,
     [load_synapse]                       [NVARCHAR](500) NULL,
     [load_frequency]                     [NVARCHAR](500) NULL,
     [dst_folder]                         [NVARCHAR](500) NULL,
     [file_type]                          [NVARCHAR](500) NULL,
     [lake_dst_folder]                    [NVARCHAR](500) NULL,
     [spark_flag]                         [NVARCHAR](500) NULL,
     [dst_schema]                         [NVARCHAR](500) NULL,
     [distribution_type]                  [NVARCHAR](500) NULL,
     [load_sqldw_etl_pipeline_date]       [DATETIME] NULL,
     [load_sqldw_etl_pipeline_status]     [NVARCHAR](500) NULL,
     [load_sqldw_curated_pipeline_date]   [DATETIME] NULL,
     [load_sqldw_curated_pipeline_status] [NVARCHAR](500) NULL,
     [load_delta_pipeline_date]           [DATETIME] NULL,
     [load_delta_pipeline_status]         [NVARCHAR](500) NULL,
     PRIMARY KEY CLUSTERED ( [parameter_id] ASC )WITH (statistics_norecompute =
     OFF, ignore_dup_key = OFF) ON [PRIMARY]
  )
ON [PRIMARY]
go

Notice from the columns listed in the code that there are quite a few new metadata fields that have been added and can be captured within the ADF pipelines by creating dynamic datasets and pipelines.

Create the Datasets

In this next section, create a source dataset for the ADLS Gen2 snappy compressed parquet files and a sink dataset for the Azure dedicated SQL pool.

Begin by creating three datasets and name the datasets as follows:
  1. 1)

    DS_ADLS_TO_SYNAPSE

     
  2. 2)

    DS_ADLS_TO_SYNAPSE_MI

     
  3. 3)

    DS_SYNAPSE_ANALYTICS_DW

     

Then the following subsections will show how to create each one.

DS_ADLS_TO_SYNAPSE

Start by creating a source ADLS Gen2 dataset with parameterized paths. Remember that the pipeline_date has been added to the pipeline_parameter table that you had created in Chapter 4 since the pipeline_date captures the date when the data was loaded to ADLS Gen2. In this step, you are loading data from ADLS Gen2 into a Synapse Analytics dedicated SQL pool. You could either rerun the pipeline from Chapter 4 or manually enter a date in this pipeline_date column , which would ideally contain the latest folder date. Chapter 8 will discuss how to automate the insertion of the max folder date into this pipeline_date column to ensure this column always has the latest and max folder date that can be passed into the parameterized ADF pipeline. This will be achieved by using a Stored Procedure activity that runs immediately following the success of a Copy activity.

Figure 6-1 illustrates how to set up the parameterized connection properties for reading the source ADLS Gen2 parquet directories and folders that are driven by the pipeline_parameter control table.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig1_HTML.jpg
Figure 6-1

ADF parameterized connections for ADLS Gen2 parquet folders and files

Here is the code that has been added to the File path section in Figure 6-1:
@{item().dst_folder}
@{item().dst_name}/parquet/ @{item().pipeline_date}/ @{item().dst_name}.parquet
Figure 6-2 shows how to add the parameters that will be needed.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig2_HTML.jpg
Figure 6-2

ADF parameters for ADLS Gen2 parquet folders and files

The linked service details are illustrated in Figure 6-3. An Azure Key Vault is being used to store the credential secrets. This will be relevant in the later sections when the pipelines begin to run and if any authentication errors are noticed.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig3_HTML.jpg
Figure 6-3

ADF linked service connection properties using Azure Key Vault

DS_ADLS_TO_SYNAPSE_MI

The ADF dataset connection in Figure 6-4 uses Managed Identity connection credentials. The difference between this dataset shown in Figure 6-4 and the last one is that this linked service connection does not use an Azure Key Vault. Use this to test and switch between the Key Vault connection and non-Key Vault connection when errors are noticed later.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig4_HTML.jpg
Figure 6-4

ADF dataset connection properties using Managed Identity

Here is the code that has been added to the File path section in Figure 6-4:
@{item().dst_folder}
@{item().dst_name}/parquet/ @{item().pipeline_date}/ @{item().dst_name}.parquet
Similar to the previous dataset, add the parameters as shown in Figure 6-5.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig5_HTML.jpg
Figure 6-5

ADF parameters for ADLS Gen2 parquet folders and files – Managed Identity

The linked service details are in Figure 6-6. An Azure Key Vault is not being used here. Again, this will be relevant in the later sections when the pipelines are executed and if any authentication errors are noticed.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig6_HTML.jpg
Figure 6-6

ADF linked service connections

In this section, a new ADF linked service connection has been created using Managed Identity .

DS_SYNAPSE_ANALYTICS_DW

The sink connection will be to an Azure Synapse Analytics dedicated SQL pool, shown in Figure 6-7. Also, parameters are being used to specify the schema and table name from the pipeline_parameter table. This will be a good feature when the ForEach loop activity will be used to create multiple tables using the same sink dataset.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig7_HTML.png
Figure 6-7

ADF Synapse DW linked service connection properties

Here is the code that has been added to the File path section in Figure 6-7 :
@{item().src_schema}
@{item().dst_name}

Create the Pipeline

Now that the datasets have been created, also create a new pipeline. When doing so, add a Lookup activity connected to a ForEach loop activity as shown in Figure 6-8.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig8_HTML.png
Figure 6-8

ADF pipeline canvas containing Lookup and ForEach loop activities

The lookup query shown in Figure 6-9 will get a list of tables that will need to be loaded to the Azure Synapse Analytics dedicated SQL pool. Note that currently there is a filter applied to the query, which would only include records WHERE load_synapse = 1.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig9_HTML.jpg
Figure 6-9

ADF Lookup activity query setting

The code snippet that is included in Figure 6-9 is the following:
SELECT [server_name],
       [src_type],
       [src_schema],
       [src_db],
       [src_name],
       [dst_type],
       [dst_name],
       [include_pipeline_flag],
       [partition_field],
       [process_type],
       [priority_lane],
       [pipeline_date],
       [pipeline_status],
       [dst_folder],
       [file_type]
FROM   [dbo].[pipeline_parameter]
WHERE  load_synapse = 1
Within the settings of the ForEach loop activity, add the output value of the Lookup activity, shown in Figure 6-10. Remember to leave the “Sequential” box unchecked to ensure multiple tables will process in parallel. The default “Batch count” if left blank is 20, and the max is 50.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig10_HTML.png
Figure 6-10

ADF ForEach activity settings

Also add one Copy activity to the ForEach loop activity, as shown in Figure 6-11. Click the pencil icon to view the Copy activity.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig11_HTML.png
Figure 6-11

ADF ForEach activities

The source is set to DS_ADLS_TO_SYNAPSE, which uses an Azure Key Vault in the linked service connection. Add the dynamic parameters that will be needed. Note that the parameters were defined in the dataset. Figure 6-12 shows how and where to add these values.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig12_HTML.jpg
Figure 6-12

ADF Copy activity source dataset properties

Finally, choose the DS_SYNAPSE_ANALYTICS_DW dataset as the sink and select “Bulk insert” with the “Auto create table” option enabled, as shown in Figure 6-13.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig13_HTML.png
Figure 6-13

ADF Copy activity sink dataset properties

Based on the current configurations of the pipeline, since it is driven by the pipeline_parameter table, when (n) number of tables/records are added to the pipeline parameter table and the load_synapse flag is set to 1, then the pipeline will execute and load all tables to Azure Synapse Analytics dedicated SQL pools in parallel based on the copy method that is selected.

Choose the Copy Method

Now, finally, we have arrived at the choice of copy method. There are three options for the sink copy method. Bulk insert, PolyBase, and Copy command are all options that you will learn to use in this section.

BULK INSERT

SQL Server provides the BULK INSERT statement to perform large imports of data into SQL Server using T-SQL efficiently, quickly, and with minimal logging operations.

Within the Sink tab of the ADF Copy activity, set the copy method to Bulk insert. “Auto create table” automatically creates the table if it does not exist using the schema from the source file. This isn’t supported when the sink specifies a stored procedure or the Copy activity is equipped with the staging settings. For this scenario, the source file is a parquet snappy compressed file that does not contain incompatible data types such as VARCHAR(MAX), so there should be no issues with the “Auto create table” option.

Note that the pre-copy script will run before the table is created, so in a scenario using “Auto create table” when the table does not exist, run it without the pre-copy script first to prevent errors and then add the pre-copy script back once the table has been created for ongoing full loads. Figure 6-14 shows the sink settings along with where to add any pre-copy script such as a TRUNCATE script.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig14_HTML.png
Figure 6-14

ADF Copy activity sink pre-copy script

If the default Auto create table option does not meet the distribution needs for custom distributions based on tables, then there is “Add dynamic content” that can be leveraged to use a distribution method specified in the pipeline parameter table per table.

Here is the code that has been added to the Pre-copy script section in Figure 6-14 :
TRUNCATE TABLE @{item().src_schema}.@{item().dst_name}
After running the pipeline, it succeeded using the Bulk insert copy method, as shown in the activity run monitor illustrated in Figure 6-15.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig15_HTML.png
Figure 6-15

ADF pipeline success for Bulk insert

Figure 6-16 shows the details of the Bulk insert Copy pipeline status .
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig16_HTML.jpg
Figure 6-16

ADF pipeline run details for Bulk insert

After querying the Synapse table, notice that there is the same number of rows in the table, as shown in Figure 6-17.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig17_HTML.jpg
Figure 6-17

Query Synapse Analytics dedicated SQL pool tables to verify Bulk insert ADF pipeline results

The Bulk insert method also works for an on-premises SQL Server as the source with a Synapse Analytics dedicated SQL pool being the sink .

PolyBase

Using PolyBase is an efficient way to load a large amount of data into Azure Synapse Analytics with high throughput. You'll see a large gain in throughput by using PolyBase instead of the default Bulk insert mechanism.

For this next exercise, select PolyBase, shown in Figure 6-18, to test this copy method.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig18_HTML.png
Figure 6-18

ADF pipeline sink dataset properties to select PolyBase

PolyBase will need Managed Identity credentials to provision Azure AD and grant Data Factory full access to the database.

For more details on verifying the access, review and run the following queries on a Synapse Analytics dedicated SQL pool:
select * from sys.database_scoped_credentials
select * from sys.database_role_members
select * from sys.database_principals
Also, when external tables, data sources, and file formats need to be created, the following queries can help with verifying that the required objects have been created:
select * from sys.external_tables
select * from sys.external_data_sources
select * from sys.external_file_formats
After configuring the pipeline and running it, you might notice the pipeline fail with the following error:
"ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error happened when loading data into SQL Data Warehouse.,Source=Microsoft.DataTransfer.ClientLibrary,''Type=System.Data.SqlClient.SqlException,Message=External file access failed due to internal error: 'Error occurred while accessing HDFS: Java exception raised on call to HdfsBridge_IsDirExist. Java exception message: HdfsBridge::isDirExist - Unexpected error encountered checking whether directory exists or not: AbfsRestOperationException: Operation failed: "This request is not authorized to perform this operation.", 403, HEAD, https://lake.dfs.core.windows.net/lake     //?upn=false&action=getAccessControl&timeout=90',Source=.Net SqlClient Data Provider,SqlErrorNumber=105019,Class=16,ErrorCode=-2146232060,State=1,Errors=[{Class=16,Number=105019,State=1,Message=External file access failed due to internal error: 'Error occurred while accessing HDFS: Java exception raised on call to HdfsBridge_IsDirExist. Java exception message: HdfsBridge::isDirExist - Unexpected error encountered checking whether directory exists or not: AbfsRestOperationException: Operation failed: "This request is not authorized to perform this operation.", 403, HEAD, https://lake.dfs.core.windows.net/lake     //?upn=false&action=getAccessControl&timeout=90',},],'",

After researching the error, the reason is because the original Azure Data Lake Storage linked service from source dataset DS_ADLS_TO_SYNAPSE is using an Azure Key Vault to store authentication credentials, which is an unsupported Managed Identity authentication method at this time for using PolyBase and Copy command .

Change the source dataset to DS_ADLS_TO_SYNAPSE_MI, which no longer uses an Azure Key Vault, and notice in Figure 6-19 that the pipeline succeeds using the PolyBase copy method.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig19_HTML.jpg
Figure 6-19

ADF pipeline execution showing success after changing to Managed Identity

Copy Command

Copy command will function similar to PolyBase, so the permissions needed for PolyBase will be more than sufficient for Copy command as well. For more information on COPY INTO, revisit Chapter 5, which covers details on permissions, use cases, and the SQL syntax for COPY INTO. Figure 6-20 shows how to configure Copy command in the ADF sink activity.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig20_HTML.png
Figure 6-20

Configure Copy command in ADF sink

Similar to the PolyBase copy method using an Azure Key Vault, you’ll notice the following slightly different error message:
ErrorCode=UserErrorSqlDWCopyCommandError,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=SQL DW Copy Command operation failed with error 'Not able to validate external location because The remote server returned an error: (403) Forbidden.',Source=Microsoft.DataTransfer.ClientLibrary,''Type=System.Data.SqlClient.SqlException,Message=Not able to validate external location because The remote server returned an error: (403) Forbidden.,Source=.Net SqlClient Data Provider,SqlErrorNumber=105215,Class=16,ErrorCode=-2146232060,State=1,Errors=[{Class=16,Number=105215,State=1,Message=Not able to validate external location because The remote server returned an error: (403) Forbidden.,},],'", "failureType": "UserError", "target": "Copy data1", "details": []
Switch to the linked service that does not use an Azure Key Vault, and notice that the pipeline succeeds, as shown in Figure 6-21.
../images/511918_1_En_6_Chapter/511918_1_En_6_Fig21_HTML.jpg
Figure 6-21

ADF pipeline execution results

Note that it is important to consider scheduling and triggering your ADF pipelines once they have been created and tested. Triggers determine when the pipeline execution will be fired, based on the trigger type and criteria defined in the trigger. There are three main types of Azure Data Factory triggers: the Schedule trigger that executes the pipeline on a wall-clock schedule, the Tumbling window trigger that executes the pipeline on a periodic interval and retains the pipeline state, and the event-based trigger that responds to a blob-related event. Additionally, ADF features alerts to monitor pipeline and trigger failures and send notifications via email, text and more.

Summary

In this chapter, I showed you how to create a source Azure Data Lake Storage Gen2 dataset and a sink Synapse Analytics dedicated SQL pool dataset along with an Azure Data Factory pipeline driven by a parameter table. You also learned how to load snappy compressed parquet files into a Synapse Analytics dedicated SQL pool by using three copy methods: Bulk insert, PolyBase, and Copy command. This chapter taught you about the various ingestion options that are available in Azure Data Factory.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.142.146