© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2021
R. C. L'EsteveThe Definitive Guide to Azure Data Engineeringhttps://doi.org/10.1007/978-1-4842-7182-7_8

8. Build Custom Logs in SQL Database for Pipeline Activity Metrics

Ron C. L’Esteve1  
(1)
Chicago, IL, USA
 

In the previous chapters, I demonstrated how to load data from ADLS Gen2 and then into a Synapse Analytics dedicated SQL pool using Data Factory. In this chapter, I will demonstrate how to leverage the pipelines that have been built to implement a process for tracking the log activity for pipelines that run and persist the data.

Azure Data Factory is a robust cloud-based ELT tool that is capable of accommodating multiple scenarios for logging pipeline audit data including out-of-the-box services such as Log Analytics, Azure Monitor, and more, along with more custom methods of extracting the pipeline metrics and passing them to another custom process. In this chapter, I will show you how to implement three of these possible custom logging options, which are the following:
  1. 1.

    Option 1 – Create a Stored Procedure activity: Updating pipeline status and datetime columns in a static pipeline parameter table using ADF can be achieved by using a Stored Procedure activity.

     
  2. 2.

    Option 2 – Create a CSV log file in Data Lake Storage Gen2: Generating a metadata CSV file for every parquet file that is created and storing the logs as CSV files in hierarchical folders in ADLS Gen2 can be achieved using a Copy data activity.

     
  3. 3.

    Option 3 – Create a log table in Azure SQL Database: Creating a pipeline log table in Azure SQL Database and storing the pipeline activity as records in the table can be achieved by using a Copy data activity.

     
Figure 8-1 illustrates these three options and also shows a visual representation of the data flow from the Copy-Table activity to creating the various logging methods.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig1_HTML.jpg
Figure 8-1

ADF options for logging custom pipeline data

Option 1: Create a Stored Procedure Activity

The Stored Procedure activity will be used to invoke a stored procedure in a Synapse Analytics dedicated SQL pool.

For this scenario, you might want to maintain your pipeline_status and pipeline_date details as columns in your adf_db.dbo.pipeline_parameter table rather than having a separate log table. The downside to this method is that it will not retain historical log data but will simply update the values in the original pipeline_parameter table based on a lookup of the incoming files to records in the pipeline_parameter table. This gives a quick, yet not necessarily robust, method of viewing the status and load date across all items in the pipeline parameter table. Specific use cases for this approach might be to log the max folder date based on the incoming folder and file names and timestamps.

Begin by adding a Stored Procedure activity to your preliminary Copy data activity, which is within the ForEach loop activity, shown in Figure 8-2, to ensure that the process iterates and logs each table using the stored procedure. Notice the green arrow and line connector that indicates the stored procedure must run on the success of the Copy data activity.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig2_HTML.jpg
Figure 8-2

ADF option 1 for logging data using a stored procedure

Next, add the following stored procedure to the database where your pipeline parameter table resides. This procedure simply looks up the destination table name in the pipeline parameter table and updates the status and datetime for each table once the Copy data activity is successful. Similarly, you could just as easily add a new stored procedure to handle errors by linking the stored procedure to the failure constraint after the Copy activity completes:
SET quoted_identifier ON
go
CREATE PROCEDURE [dbo].[Sql2adls_data_files_loaded] @dst_name NVARCHAR(500)
AS
    SET nocount ON
    -- turns off messages sent back to client after DML is run, keep this here
    DECLARE @Currentday DATETIME = Getdate();
    UPDATE [dbo].[pipeline_parameter]
    SET    pipeline_status = 'success',
           pipeline_date = @Currentday
    WHERE  dst_name = @dst_name;
go
After creating your stored procedure, confirm that it has been created in your corresponding database. Notice the confirmation in Figure 8-3, which is a screen capture within SSMS.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig3_HTML.jpg
Figure 8-3

SSMS view showing stored procedure used by ADF option 1

Next, return to the Data Factory pipeline and configure the Stored Procedure activity. In the Stored Procedure tab, select the stored procedure that was just created. Also add a new stored procedure parameter that references the destination name, which was configured in the Copy activity, as illustrated in Figure 8-4.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig4_HTML.jpg
Figure 8-4

ADF stored procedure details and parameters

After saving, publishing, and running the pipeline, notice that the pipeline_date and pipeline_status columns have been updated as a result of the ADF Stored Procedure activity, shown in the pipeline_parameter table view in Figure 8-5. This is meant to be a lightweight testing pattern, which provides you with details on the status and load date for each table within a centralized location. I’ve noticed that ADF does not always provide robust details related to the problematic tables or columns.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig5_HTML.jpg
Figure 8-5

SSMS view of updated pipeline_date and pipeline_status columns from ADF stored procedure

Option 2: Create a CSV Log File in Data Lake Storage Gen2

Since your Copy-Table activity is generating snappy parquet files into hierarchical ADLS Gen2 folders, you may also want to create a CSV file containing pipeline run details for every parquet file in your ADLS Gen2 account. For this scenario, you could set up a Data Factory Event Grid trigger to listen for metadata files and then trigger a process to transform the source table and load it into a curated zone to somewhat replicate a near-real-time ELT process.

Begin designing the ADF pipeline to create a CSV log file in your ADLS Gen2 account by adding a Copy activity for creating the log files and connecting it to the Copy-Table activity, shown in Figure 8-6. Similar to the previous process, which looped through each table, this process will generate a CSV extension metadata file in a metadata folder per table.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig6_HTML.jpg
Figure 8-6

ADF option 2 for logging data using a Copy data activity to create a CSV file

To configure the source dataset, select the source on-premises SQL Server. Next, add the following query shown in Figure 8-7 as the source query. Notice that this query will contain a combination of pipeline activities, Copy-Table activities, and user-defined parameters.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig7_HTML.jpg
Figure 8-7

ADF option 2 source query to extract pipeline activity metrics

Recall the notes from previous chapters that call out the embedding of source queries as code into the ADF pipeline merely for visual demonstration purposes. It is always a better and more efficient process to add the source query as a stored procedure and then call the stored procedure. This will help with ease of maintenance of the code in a centralized location.

Here is the corresponding source code used in Figure 8-7:
SELECT '@{pipeline().DataFactory}'                     AS datafactory_name,
       '@{pipeline().Pipeline}'                        AS pipeline_name,
       '@{pipeline().RunId}'                           AS runid,
       '@{item().src_name}'                            AS source,
       '@{item().dst_name}'                            AS destination,
       '@{pipeline().TriggerType}'                     AS triggertype,
       '@{pipeline().TriggerId}'                       AS triggerid,
       '@{pipeline().TriggerName}'                     AS triggername,
       '@{pipeline().TriggerTime}'                     AS triggertime,
       '@{activity('copy-TABLE').output.rowsCopied}'   AS rowscopied,
       '@{activity('copy-TABLE').output.rowsRead}'     AS rowsread,
       '@{activity('copy-TABLE').output.usedParallelCopies}'                                      AS no_parallelcopies,
       '@{activity('copy-TABLE').output.copyDuration}'                                          AS copyduration_in_secs,
       '@{activity('copy-TABLE').output.effectiveIntegrationRuntime}'         AS effectiveintegrationruntime,
       '@{activity('copy-TABLE').output.executionDetails[0].source.type}'                AS source_type,
       '@{activity('copy-TABLE').output.executionDetails[0].sink.type}'                  AS sink_type,
       '@{activity('copy-TABLE').output.executionDetails[0].status}'                     AS execution_status,
       '@{activity('copy-TABLE').output.executionDetails[0].start}'                      AS copyactivity_start_time,
       '@{utcnow()}'                AS copyactivity_end_time,
       '@{activity('copy-TABLE').output.executionDetails[0].detailedDurations.queuingDuration}'  AS copyactivity_queuingduration_in_secs,
       '@{activity('copy-TABLE').output.executionDetails[0].detailedDurations.timeToFirstByte}'  AS copyactivity_timetofirstbyte_in_secs,
       '@{activity('copy-TABLE').output.executionDetails[0].detailedDurations.transferDuration}' AS copyactivity_transferduration_in_secs
The sink will be a CSV dataset with a CSV extension, as shown in Figure 8-8.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig8_HTML.jpg
Figure 8-8

ADF sink dataset for CSV

Figure 8-9 shows the connection configuration to use for the CSV dataset.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig9_HTML.png
Figure 8-9

ADF sink dataset connection properties for CSV

The following parameterized path will ensure that the file is generated in the correct folder structure. Here is the code shown in Figure 8-9:
@{item().server_name}/@{item().src_db}/@{item().src_schema}/@{item().dst_name}/metadata/@{formatDateTime(utcnow(),'yyyy-MM-dd')}/@{item().dst_name}.csv
After saving, publishing, and running the pipeline, notice how the metadata folder has been created in the following folder structure:
Server>database>schema>date>Destination_table location
Figure 8-10 shows this folder as you will see it in ADSL Gen2.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig10_HTML.jpg
Figure 8-10

ADLS Gen2 folders generated by ADF pipeline option 2

Open the metadata folder and notice that there are CSV folders that have been created per day that the pipeline runs, as shown in Figure 8-11.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig11_HTML.jpg
Figure 8-11

ADLS Gen2 folders by timestamp created by ADF pipeline option 2

Finally, notice in Figure 8-12 that a metadata .csv file with the name of the table has been created.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig12_HTML.jpg
Figure 8-12

ADLS Gen2 metadata file containing ADF pipeline metrics from option 2

Download and open the file and notice that all of the query results have been populated in the .csv file, as shown in Figure 8-13.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig13_HTML.png
Figure 8-13

ADLS Gen2 metadata file containing details for ADF pipeline metrics from option 2

Option 3: Create a Log Table in Azure SQL Database

The final scenario in this chapter involves creating a log table in your database where the parameter table resides and then writing the data as records into the table. For this option, start by adding a Copy data activity connected to the Copy-Table activity, shown in Figure 8-14.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig14_HTML.jpg
Figure 8-14

ADF option 3 for logging data using a Copy data activity written to a log table

Next, create the following table in your ADF_DB database. This table will store and capture the pipeline and Copy activity details:
SET ansi_nulls ON
go
SET quoted_identifier ON
go
CREATE TABLE [dbo].[pipeline_log]
  (
     [log_id]                                [INT] IDENTITY(1, 1) NOT NULL,
     [parameter_id]                          [INT] NULL,
     [datafactory_name]                      [NVARCHAR](500) NULL,
     [pipeline_name]                         [NVARCHAR](500) NULL,
     [runid]                                 [NVARCHAR](500) NULL,
     [source]                                [NVARCHAR](500) NULL,
     [destination]                           [NVARCHAR](500) NULL,
     [triggertype]                           [NVARCHAR](500) NULL,
     [triggerid]                             [NVARCHAR](500) NULL,
     [triggername]                           [NVARCHAR](500) NULL,
     [triggertime]                           [NVARCHAR](500) NULL,
     [rowscopied]                            [NVARCHAR](500) NULL,
     [dataread]                              [INT] NULL,
     [no_parallelcopies]                     [INT] NULL,
     [copyduration_in_secs]                  [NVARCHAR](500) NULL,
     [effectiveintegrationruntime]           [NVARCHAR](500) NULL,
     [source_type]                           [NVARCHAR](500) NULL,
     [sink_type]                             [NVARCHAR](500) NULL,
     [execution_status]                      [NVARCHAR](500) NULL,
     [copyactivity_start_time]               [NVARCHAR](500) NULL,
     [copyactivity_end_time]                 [NVARCHAR](500) NULL,
     [copyactivity_queuingduration_in_secs]  [NVARCHAR](500) NULL,
     [copyactivity_transferduration_in_secs] [NVARCHAR](500) NULL,
     CONSTRAINT [PK_pipeline_log] PRIMARY KEY CLUSTERED ( [log_id] ASC )WITH (
     statistics_norecompute = OFF, ignore_dup_key = OFF) ON [PRIMARY]
  )
ON [PRIMARY]
go
Similar to the last pipeline option, configure the on-premises SQL Server as the source and use the query code provided in option 2 as the source query, shown in Figure 8-15.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig15_HTML.jpg
Figure 8-15

ADF option 3 source query to extract pipeline activity metrics

The sink will be a connection to the SQL database pipeline log table that was created earlier.

After saving, publishing, and running the pipeline, notice that the pipeline Copy activity records have been captured in the dbo.pipeline_log table, as shown in Figure 8-16.
../images/511918_1_En_8_Chapter/511918_1_En_8_Fig16_HTML.jpg
Figure 8-16

SSMS view of pipeline_log table results, which logged ADF option 3 pipeline data to SQL DW

Summary

In this chapter, I demonstrated how to log ADF pipeline data using a custom approach of extracting metrics for each table or file from the ADF pipeline and then either writing the results to the pipeline_parameter table, pipeline_log table, or a CSV file in ADLS Gen2. This custom logging process can be added to multiple ADF pipeline activity steps for a robust and reusable logging and auditing process that can be easily queried in SSMS through custom SQL queries and even linked to Power BI dashboards and reports to promote robust reporting on logged data. In the next chapter, I will demonstrate how to log error details from your ADF pipeline into an Azure SQL table.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.115.120