Data engineering pipelines often go awry, especially when ingesting “dirty” data from external systems. However, in a traditional data lake design, it is hard to undo updates that added objects into a table. In addition, it is challenging to audit data changes which are critical, both in terms of data compliance as well as simple debugging, to understand how data has changed over time. Other data workloads, such as machine learning training, require faithfully reproducing an old version of the data (e.g., to compare a new and old training algorithm on the same data). For example, if some upstream pipeline modifies the source data, data scientists are often caught unaware by such upstream data changes and hence struggle to reproduce their experiments.
All of these issues created significant challenges for data engineers and data scientists before Delta Lake, requiring them to design complex remediations to data pipeline errors or to haphazardly duplicate datasets. Through this chapter, we will explore the capabilities of Delta Lakes that allow data practitioners to go back in time and solve for these challenges. It is uncertain if time travel to the past is physically possible but data practitioners can time travel programmatically with Delta Lake. Delta Lake allows automatic versioning of all data stored in the data lake and we can time travel to any version. It also allows us to create a copy of an existing Delta table at a specific version using the clone command to help with a few time travel use cases. To understand these functionalities and how time travel works, we first need to unpack the file structure of Delta tables and deep dive into each of the components.
In chapter 2, one of our sections was to focus on unpacking the transaction log. But recall that the transaction log is just one part of the Delta table. In this section, we will focus on the components that make up the Delta table.
A Delta table is stored within a directory and is composed of the file types shown in Figure 2-1.
As an example, we have a delta table named customer_data
located in the filePath: dbfs:/user/hive/warehouse/deltaguide.db
To visualize the delta file system, see below a snippet from our example. You can also follow along with us by using the notebooks present here.
Root of the Delta directory : This is the hive warehouse location where we created a delta table and using the command below, we can explore the contents within the delta directory. For the full output, please refer to this location.
%fs ls dbfs:/user/hive/warehouse/deltaguide.db/customer_data
path | name | size |
dbfs:/dbfs:/..filePath../customer_data/_delta_log/ | _delta_log/ | 0 |
dbfs:/dbfs:/..filePath../customer_data/part-00000-41…-c001.snappy.parquet | part-00000-41...-c001.snappy.parquet | 542929260 |
dbfs:/dbfs:/..filePath../customer_data/part-00000-45...-c002.snappy.parquet | part-00000-45...-c002.snappy.parquet | 461126371 |
dbfs:/dbfs:/..filePath../customer_data/part-00000-62...-c000.snappy.parquet | part-00000-62...-c000.snappy.parquet | 540468009 |
dbfs:/dbfs:/..filePath../customer_data/part-00001-40...-c002.snappy.parquet | part-00001-40...-c002.snappy.parquet | 541858629 |
dbfs:/dbfs:/..filePath../customer_data/part-00001-a7...-c001.snappy.parquet | part-00001-a7...-c001.snappy.parquet | 542859315 |
dbfs:/dbfs:/..filePath../customer_data/part-00001-c3...-c000.snappy.parquet | part-00001-c3...-c000.snappy.parquet | 541186721 |
When a user creates a Delta Lake table, that table’s transaction log is automatically created in the _delta_log subdirectory. As he or she makes changes to that table, those changes are recorded as ordered, atomic commits in the transaction log. Each commit is written out as a JSON file, starting with 000000.json. Additional changes to the table generate subsequent JSON files in ascending numerical order so that the next commit is written out as 000001.json, the following as 000002.json, and so on.
Figure 2-2 shows the logical diagram of Delta Transaction Log Protocol.
The state of a table at a given version is called a snapshot and includes the following properties:
Version of the Delta log protocol : This is required to correctly read or write the table
Metadata of the table (e.g., the schema, a unique identifier, partition columns, and other configuration properties)
Set of files present in the table, along with metadata about those files
Set of tombstones for files that were recently deleted
Set of applications-specific transactions that have been successfully committed to the table
We will explore each of the properties in the upcoming sections and to provide you a mental mapping of how this actually looks from a file system perspective, here is a snippet.
%fs ls dbfs:/fil
ePath/customer_t2/_delta_log
path | name | size |
dbfs:/.../customer_t2/_delta_log/.s3-optimization-0 | .s3-optimization-0 | 0 |
dbfs:/.../customer_t2/_delta_log/.s3-optimization-1 | .s3-optimization-1 | 0 |
dbfs:/.../customer_t2/_delta_log/.s3-optimization-2 | .s3-optimization-2 | 0 |
dbfs:/.../customer_t2/_delta_log/00000000000000000010.checkpoint.parquet | 00000000000000000010.checkpoint.parquet | 120950 |
: : : |
: : : |
: : : |
dbfs:/.../customer_t2/_delta_log/00000000000000000020.checkpoint.parquet | 00000000000000000020.checkpoint.parquet | 94341 |
dbfs:/.../customer_t2/_delta_log/00000000000000000020.crc | 00000000000000000020.crc | 97 |
dbfs:/.../customer_t2/_delta_log/00000000000000000020.json | 00000000000000000020.json | 347291 |
dbfs:/.../customer_t2/_delta_log/00000000000000000021.crc | 00000000000000000021.crc | 95 |
dbfs:/.../customer_t2/_delta_log/00000000000000000021.json | 00000000000000000021.json | 41461 |
dbfs:/.../customer_t2/_delta_log/_last_checkpoint | _last_checkpoint | 26 |
As previously noted, Delta Lake is an open-source storage layer that runs on top of your existing data lake and is fully compatible with Apache Spark APIs. Delta Lake uses versioned Parquet files to store data in your cloud storage, enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet. Apart from the versions, Delta Lake also stores a transaction log to keep track of all the commits made to the table or blob store directory to provide ACID transactions.
You can use your favorite Apache Spark APIs to read and write data with Delta Lake. As noted in the previous chapter, to create a Delta table, write a DataFrame out in the delta
format.
%
python
# Generate Spark DataFrame
data
=
spark
.
range
(
0
,
5
)
# Write the table in parquet format
data
.
write
.
format
(
"parquet"
)
.
save
(
"/table_pq"
)
# Write the table in delta format
data
.
write
.
format
(
"delta"
)
.
save
(
"/table_delta"
)
%
scala
// Generate Spark DataFrame
val
data
=
spark
.
range
(
0
,
5
)
// Write the table in parquet format
data
.
write
.
format
(
"parquet"
).
save
(
"/table_pq"
)
// Write the table in delta format
data
.
write
.
format
(
"delta"
).
save
(
"/table_delta"
)
Once you write the files in delta, you will notice that a number of files are created under a folder in the format recognized by Spark as a Parquet table and/or stored in a metastore (e.g. Hive, Glue, etc.).
Note, the files of a metastore-defined table are stored under the default hive/warehouse
location though you have an option to specify a location of your choice at the time of writing delta files as noted in the previous example.
To review the underlying file structure, run the following command from your
-table>
The following is the shell command for the parquet table previously generated.
%sh ls -R /dbfs/table_pq/
/dbfs/table_pq/:
part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet
part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet
part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet
part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet
part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841f2-c000
.snappy.parquet
part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet
The following is the shell command for the Delta table previously generated.
%sh ls -R /dbfs/table_delta/
/dbfs/table_delta/:
_delta_log
part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet
part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet
part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet
part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000
.snappy.parquet
part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841f2-c000.snappy.parquet
part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet
/dbfs/table_delta/_delta_log:
00000000000000000000.crc
00000000000000000000.json
Majority of Data Practitioners are familiar with Parquet and its structure, let’s simplify the understanding of Delta by comparing it to Parquet.
The only difference between the Parquet and Delta tables is the _del
ta_log
folder which is the Delta transaction log (more on this in Chapter 2).
In both Parquet and Delta tables, the data itself is in the form of snappy compressed Parquet part files, which is the common format of Spark tables.
Apache Parquet is a columnar storage format that is optimized for BI-type queries (group by, aggregates, joins, etc.). Because of this, it is the default storage format for Apache Spark when it writes its data files to storage.
Snappy compression was developed by Google to provide high compression/decompression speeds with reasonable compression. By default, Spark tables are snappy compressed Parquet files.
By default, the reference implementation of Delta lake stores data files in directories named after partition values for data in that file (i.e. part1=value1/part2=value2/...
) that is why we see part files and a suffix in the name as a globally unique identifier to ensure uniqueness of each file. This is especially important because each directory in cloud object stores are not actual folders but logical representations of folders. For more information on the implications, read more at How to list and delete files faster in Databricks.
ny time data is modified in a Delta table, new files are created as a Version which is a snapshot of the delta table at that specific time and is a result of a set of actions that were performed by the user.
Let’s start by looking at the Delta table created in the previous section by running the following commands to review the path
column of the add
metadata.
%
python
# Read first transaction
j0
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000000.json"
)
# Review Add Information
j0
.
select
(
"add.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Read first transaction
val
j0
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000000.json"
)
// Review Add Information
j0
.
select
(
"add.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
Metadata in the transaction log include all of the files that make up the table version (version 0). The output of the preceding commands is:
+-------------------------------------------------------------------+
|path
|
+-------------------------------------------------------------------+
|part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet|
|part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet|
|part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet|
|part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet|
|part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841
f2-c000.snappy.parquet|
|part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet|
+-------------------------------------------------------------------+
Notice how this matches the file listing you see when you run the ls
command. While this file listing is pretty fast for small amounts of data, as noted in Chapter 2, listFrom
can be quite slow on cloud object storage especially for petabyte-scale data lakes.
When reviewing the table history (see Table 3-2), let’s focus on the operationMetrics for version 0 (table creation).
%
sql
-- Describe table history using file path
DESCRIBE
HISTORY
delta
.
`/
table_delta
`
Table 3-x transposes the values of the preceding figure.
Column Name | Value |
version | 0 |
timestamp | userID |
userName | vini[dot]jaiswal[at]databricks.com |
operation | WRITE |
operationParameters | {"mode”: “ErrorIfExists”, “partitionBy”: “[]"} |
job | null |
notebook | {"notebookId”: “9342327"} |
clusterId | |
readVersion | null |
isolationLevel | WriteSerializable |
isBlindAppend | true |
operationMetrics | {"numFiles”: “6”, “numOutputBytes”: “2713”, “numOutputRows”: “5"} |
userMetadata | null |
Of particular interest in Table 3-x is the operationMetrics column for table version 0.
{
"numFiles": "6", "numOutputBytes": "2713", "numOutputRows": "5"}
Notice how numFiles
corresponds to the six files listed in the previous add.path
query as well as the file listing (ls
).
What happens when we add new data to this table? The following command will add 4 new rows to our table.
%
python
# Add 4 new rows of data to our Delta table
data
=
spark
.
range
(
6
,
10
)
data
.
write
.
format
(
"delta"
)
.
mode
(
"append"
)
.
save
(
"/table_delta"
)
%
scala
// Add 4 new rows of data to our Delta table
val
data
=
spark
.
range
(
6
,
10
)
data
.
write
.
format
(
"delta"
).
mode
(
"append"
).
save
(
"/table_delta"
)
We’re now going to go over how to review data, file system, added files, etc.
You can confirm there are a total number of 9 rows in the table by running the following command.
%
python
spark
.
read
.
format
(
"delta"
)
.
load
(
"/table_delta"
)
.
count
()
%
scala
spark
.
read
.
format
(
"delta"
).
load
(
"/table_delta"
).
count
()
But what does the underlying file system look like? When we re-run our file listing, as expected, there are more files and a new JSON and CRC file within _delta_log
corresponding to a new version of the table.
%sh ls -R /dbfs/table_delta/
/dbfs/table_de
lta/:
_delta_log
part-00000-0267038a-f818-4823-8261-364fd7401501-c000.snappy.parquet
part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet
part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet
part-00001-cd3a1a49-0a0a-4284-8a1b-
283d83590dff-c000.snappy.parquet
part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet
part-00003-56bc7589-6266-4ce3-9515-62caf9af9109-c000.snappy.parquet
part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet
part-00005-3d298fe6
-8795-4558-92c7-70e0520a3d47-c000.snappy.parquet
part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841f2-c000.snappy.parquet
part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet
part-00007-60de290b-7147-4ec4-b535-ca9abf4ce2d1-c000.snappy.parquet
/d
bfs/table_delta/_delta_log:
00000000000000000000.crc
00000000000000000000.json
00000000000000000001.crc
00000000000000000001.json
Let’s follow up by looking at the Delta table version 1 by running the following commands to review the path
column of the add
metadata.
%
python
# Read version 1
j1
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000001.json"
)
# Review Add Information
j1
.
select
(
"add.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Read version 1
j1
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000001.json"
)
// Review Add Information
j1
.
select
(
"add.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
Metadata in the transaction log include all of the files that make up table version 1. The output of the preceding commands is:
+-------------------------------------------------------------------+
|path
|
+-------------------------------------------------------------------+
|part-00000-0267038a-f818-4823-8261-364fd7401501-c000.snappy.parquet|
|part-00001-cd3a1a49-0a0a-4284-8a1b-283d83590dff-c000.snappy.parquet|
|part-00003-56bc7589-6266-4ce3-95
15-62caf9af9109-c000.snappy.parquet|
|part-00005-3d298fe6-8795-4558-92c7-70e0520a3d47-c000.snappy.parquet|
|part-00007-60de290b-7147-4ec4-b535-ca9abf4ce2d1-c000.snappy.parquet|
+-------------------------------------------------------------------+
Interestingly, this output notes that only 5 files were needed to hold the 4 new rows of data; recall there were 6 files for the initial 5 rows of data.
This can be confirmed by reviewing the table history, let’s focus on the operationMetrics for version 1 (table creation).
%
sql
-- Describe table history using file path
DESCRIBE
HISTORY
delta
.
`/
table_delta
`
Below is the value of the operationMetrics column of version 1 which confirmed in fact there are only 5 files with the number of output rows being 4.
{
"numFiles": "5", "numOutputBytes": "2232", "numOutputRows": "4"}
Notice how numFiles
corresponds to the five files listed in the previous add.path
query as well as the file listing (ls
).
You can further validate this by running the following query
%
sql
-- Run this statement first as Delta will do a format check
SET
spark
.
databricks
.
delta
.
formatCheck
.
enabled
=
false
%
python
delta_path
=
"/table_delta/"
# Files listed in add.path metadatafiles = [delta_path + "part-00000-0267038a-f818-4823-8261-364fd7401501-c000.snappy.parquet",
delta_path + "part-00001-cd3a1a49-0a0a-4284-8a1b-283d83590dff-c000.snappy.parquet",
delta_path + "part-00003-
56bc7589-6266-4ce3-9515-62caf9af9109-c000.snappy.parquet",
delta_path + "part-00005-3d298fe6-8795-4558-92c7-70e0520a3d47-c000.snappy.parquet",
delta_path + "part-00007-60de290b-7147-4ec4-b535-ca9abf4ce2d1-c000.snappy.parquet"]
# Show val
ues stored in these files
spark.read.format("parquet").load(files).show()
# Results
+---+
| id|
+---+
| 8|
| 6|
| 7|
| 9|
+---+
As noted, these five files correspond to the four rows (id values 6, 7, 8, and 9) added to our Delta table.
What happens when we remove data from this table? The following command will DELETE
some of the values from our Delta table. Note, we will dive deeper into data modifications such as delete, update, and merge in Chapter 5: Data Modifications in Delta tables. But this example is an interesting showcase of what is happening in the file system.
%sql
-- Delete from Delta table where id <= 2
DELETE FROM delta.`/table_delta` WHERE id <= 2
You can confirm there are a total number of 6 rows (the 3 values of 0, 1, and 2 were removed) in the table by running the following command.
%python
spark.read.format("delta").load("/table_delta").count()
%scala
spark.read.format("delta").load("/table_d
elta").count()
But what does the underlying file system look like? Let’s re-run our file listing and as expected, there are more files and a new JSON and CRC file within _delta_log
corresponding to a new version of the table.
%sh ls -
R /dbfs/table_delta/
/dbfs/table_delta/:
_delta_log
part-00000-0267038a-f818-4823-8261-364fd7401501-c000.snappy.parquet
part-00000-74e6c7ea-7321-44f0-9fb6-55257677cb1f-c000.snappy.parquet
part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet
part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet
part-00001-cd3a1a49-0a0a-4284-8a1b-283d83590dff-c000.snappy.parquet
part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet
part-00003-56bc7589-6266-4ce3-9515-62caf9af9109-c00
0.snappy.parquet
part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet
part-00005-3d298fe6-8795-4558-92c7-70e0520a3d47-c000.snappy.parquet
part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841f2-c000.snappy.parquet
part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet
part-00007-60de290b-7147-4ec4-b535-ca9abf4ce2d1-c000.snappy.parquet
/dbfs/table_delta/_delta_log:
00000000000000000000.crc
00000000000000000000.json
00000000000000000001.crc
00000000000000000001.json
00000000000000000002.crc
00000000000000000002.json
As expected, there is a new transaction log file (000...00002.json
) but now there are 12 files in our Delta table folder after we deleted three rows (previously there were 11 files). Let’s review Delta table version 2 by running the following commands to review the path
column of the add
and remove
metadata.
There were three files removed from our table; the following query reads the version 2 transaction log and specifically extracts the path of removed files.
%
python
# Remove information
j2
.
select
(
"remove.path"
)
.
where
(
"remove is not null"
)
.
show
(
20
,
False
)
%
scala
// Remove information
j2
.
select
(
"remove.path"
).
where
(
"remove is not null"
).
show
(
20
,
False
)
# Result
+-------------------------------------------------------------------+
|path |
+-
------------------------------------------------------------------+
|part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet|
|part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df97c-c000.snappy.parquet|
|part-00001-94940738-a4e6-4b4b-b30f-797a76b32087
-c000.snappy.parquet|
+-------------------------------------------------------------------+
It is important to note that in the transaction log the deletion is a tombstone, i.e. we have not deleted the files. That is, these files are identified as removed so that when you query the table, Delta will not include the remove
files.
This can be confirmed by reviewing the table history, let’s focus on the operationMetrics for version 2 (table creation).
%
sql
-- Describe table history using file path
DESCRIBE
HISTORY
delta
.
`/
table_delta
`
Below is the value of the operationMetrics column of version 2 which confirmed in fact that 3 files were removed, 1 file was added, and 3 rows were deleted.
{
"numRemovedFiles": "3", "numDeletedRows": "3", "numAddedFiles": "1", "numCopiedRows": "0"}
As stated earlier, the files may be removed in the transaction log, but they are not removed from the file system. The Delta data files are created in this fashion to correspond to the concept of multi-version concurrency control (MVCC). For example, as seen in Figure 3-x, if user 1 is querying version 1 of the table while user 2 is deleting values (0, 1, 2) that creates table version 2. if the data files that represent version 1 (that contained the values 0, 1, and 2), user 1’s query would irrevocably fail.
This is especially important for long-running queries on data lakes where the queries and operations may take a long time. It is important to note that by default user 1 had executed the read query before the version 2 of the table was committed. If at a later time, as seen in Figure 3-x, another user (user 3) queries the same table, then by default when they read the data, the three files representing values {0, 1, 2} would not be included and they would only be returned six rows.
To further validate the data is still there, run the following query; the three files (files
) correspond to the three removed files in the previous section.
%
sql
-- Run this statement first as Delta will do a format check
SET
spark
.
databricks
.
delta
.
formatCheck
.
enabled
=
false
%
python
delta_path
=
"/table_delta/"
# Files listed in add.path metadata
files = [delta_path + "part-00004-bed772f9-1045-4e3a-8048-e179096e3c25-c000.snappy.parquet",
delta_path + "part-00003-0e5d49e6-4dd0-4746-8944-f64cba9df9
7c-c000.snappy.parquet",
delta_path + "part-00001-94940738-a4e6-4b4b-b30f-797a76b32087-c000.snappy.parquet"]
# Show values stored in these files
spark.read.format("parquet").load(files).show()
# Results
+---+
| id|
+---+
| 2|
| 1|
| 0|
+---+
Even though we had deleted three rows, we’ve also added a file in the file system as recorded in the transaction log.
%
python
# Read version 2
j2
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000002.json"
)
>
# Review Add Information
j2
.
select
(
"add.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Read version 2
j2
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000002.json"
)
// Review Add Information
j2
.
select
(
"add.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
To reiterate, even though we deleted three rows, as noted in Add Information there is an additional file!
+----
---------------------------------------------------------------+
|path |
+-------------------------------------------------------------------+
|part-00000-74e6c7ea-7321-44f0-9fb6-55257677cb1f-c0
00.snappy.parquet|
+-------------------------------------------------------------------+
In fact, there is nothing in this file if you were to read it using the query below as the description of this column is apt.
%
python
# Read the add file from version 2
spark
.
read
.
format
(
"parquet"
)
.
load
(
"/table_delta/part-00000-74e6c7ea-7321-44f0-9fb6-55257677cb1f-c000.snappy.parquet"
)
.
show
()
%
scala
// Read the add file from version 2
spark
.
read
.
format
(
"parquet"
).
load
(
"/table_delta/part-00000-74e6c7ea-7321-44f0-9fb6-55257677cb1f-c000.snappy.parquet"
).
show
()
# Result
+---+
| id|
+---+
+---+
During our delete operation, no new rows or files were added to the table as was expected for this small example.
For larger datasets, it is common that not all the rows will be removed from a file.
Extending the current scenario, let’s say that the values {3-8}
are stored in 1.parquet
and you run a DELETE
statement to remove the values id = 4
. In this example:
Within the transaction log, the 1.pa
rquet
file is added to the remove
column.
But if we stop here, we would be removing not only {4}
but also {3,5-8}
.
Therefore, a new file will be created - in this case the 2.parquet
file - that contains the values that were not deleted {3,5-8}
.
To demonstrate this with this scenario, let’s compact our current Delta table. If you’re running this on Databricks, you can use the OPTIMIZE
command; more on this command in Chapter 12: Performance Tuning.
%
sql
-- Optimize our Delta table
OPTIMIZE
delta
.
`/
table_delta
/`
This command is used to tune the performance of your Delta tables and it also has the benefit of compacting your files. In this case, this means that instead of 11 separate files, representing our 9 values, now we only have one file. The optimize operation itself is the third operation per the table history.
%
sql
-- Table history
DESCRIBE
HISTORY
delta
.
`/
table_delta
/`
-- Abridged results
+-------+---------+----------------------------+
|versio
n|operation|operationMetrics |
+-------+---------+----------------------------+
|3 |OPTIMIZE |[numRemovedFiles -> 9, ... |
|2 |DELETE |[numRemovedFiles -> 3, ... |
|1 |WRITE |[numFiles -> 5, ... |
|0 |WRITE
|[numFiles -> 6, ... |
+-------+---------+----------------------------+
The results from running the OPTIMIZE
command can be seen below (which is also recorded in the operationMetrics
column within table history.
path: /table_delta/
Metrics:
{
"numFilesAdded": 1, "numFilesRemoved": 9, "filesAdded": {"min": 507, "max": 507, "avg": 507, "totalFiles": 1, "totalSize": 507}, "filesRemoved": {"min": 308, "max": 481, "avg": 423, "totalFiles": 9, "totalSize": 3810}, "partitionsOptimized": 0, "zOrderStats": null, "numBatches": 1}
The two metrics most interesting for this scenario are numFilesAdded
and numFilesRemoved
which denote that now there is only 1 file that contains all the values and 9 files were removed.
The following query to validate the files that were removed.
%
python
# Read version 3
j3
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000003.json"
)
# Remove Information
j3
.
select
(
"remove.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Read version 3
val
j3
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000003.json"
)
// Remove Information
j3
.
select
(
"remove.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
-- Results
+-------------------------------------------------------------------+
|path |
+-------------------------------------------------------------------+
|part-00000-775edc03-7c05-4190-a964-fcdfc0428014-c000.snappy.parquet|
|part-00000-0267038a-f818-4823-8261-364fd7401501-c000.snappy.parquet|
|part-00000-74e6c7ea-7321-44f0-9fb6-55257677cb1f-c000.snappy.parquet|
|part-00006-e28f9fa4-516b-4cf6-acdd-22d70e8841
f2-c000.snappy.parquet|
|part-00007-5276d994-b9ca-4da8-b594-4bbfafbd7dcb-c000.snappy.parquet|
|part-00001-cd3a1a49-0a0a-4284-8a1b-283d83590dff-c000.snappy.parquet|
|part-00003-56bc7589-6266-4ce3-9515-62caf9af9109-c000.snappy.parquet|
|part-00005-3d298fe6-8
795-4558-92c7-70e0520a3d47-c000.snappy.parquet|
|part-00007-60de290b-7147-4ec4-b535-ca9abf4ce2d1-c000.snappy.parquet|
+-------------------------------------------------------------------+
The following query allows you to validate the files that were added.
%
python
# Add Information
j3
.
select
(
"add.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Add Information
j3
.
select
(
"add.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
-- Results
+-------------------------------------------------------------------+
|path |
+-------------------------------------------------------------------+
|part-00000-f661f932-f54f-4b38-8ed3-51f7770d3e55-c000.snappy.parquet|
+-------------------------------------------------------------------+
You can query the file and validate that it contains all the expected values {3-8}
.
%
sql
-- Temporarily disable the Delta format check
SET
spark
.
databricks
.
delta
.
formatCheck
.
enabled
=
false
%
python
/
%
scala
# Python or Scala
spark
.
read
.
parquet
(
"/table_delta/part-00000-f661f932-f54f-4b38-8ed3-51f7770d3e55-c000.snappy.parquet"
)
.
show
()
-- Result
s
+---+
| id|
+---+
| 4|
| 8|
| 3|
| 6|
| 9|
| 7|
+---+
So now that all of the data is in one file, what happens when we delete a value in our table?
%
sql
DELETE
FROM
delta
.
`/
tmp
/
delta
/`
WHERE
id
=
4
As we know this is version 4 of our table history, let’s review the transaction log using the following query.
%
python
# Read version 4
j4
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000004.json"
)
# Remove Information
j4
.
select
(
"remove.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Read version 4
val
j4
=
spark
.
read
.
json
(
"/table_delta/_delta_log/00000000000000000004.json"
)
// Remove Information
j4
.
select
(
"remove.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
-- Results+-------------------------------------------------------------------+
|path |
+-------------------------------------------------------------------+
|part-00000-f661f932-f54f-4b38
-8ed3-51f7770d3e55-c000.snappy.parquet|
+-------------------------------------------------------------------+
Notice how the remove.path
value is the same file as the preceding version 3 file. But as you can see from the following query, a new file is created.
%
python
# Add Information
j4
.
select
(
"add.path"
)
.
where
(
"add is not null"
)
.
show
(
20
,
False
)
%
scala
// Add Information
j4
.
select
(
"add.path"
).
where
(
"add is not null"
).
show
(
20
,
False
)
-- Result
+-------------------------------------------------------------------+
|path |
+-----------------------------------------
--------------------------+
|part-00000-68bf51ea-6e00-4591-b267-62fcd36602ed-c000.snappy.parquet|
+-------------------------------------------------------------------+
If you query this file, observe that it contains only the expected values of {3, 5-8}
.
%
python
# Read the V4 added file
spark
.
read
.
parquet
(
"/table_delta/part-00000-68bf51ea-6e00-4591-b267-62fcd36602ed-c000.snappy.parquet"
)
.
show
()
%
python
//
Read
the
V4
added
file
spark
.
read
.
parquet
(
"/table_delta/part-00000-68bf51ea-6e00-4591-b267-62fcd36602ed-c000.snappy.parquet"
)
.
show
()
-- Result
+---+
| id|
+---+
| 8|
| 3|
| 6|
| 9|
| 7|
+---+
To reiterate, instead of deleting the value 4 from the original file, instead Delta is creating a new file with the values not deleted (in this case) and keeping the old file in place.
What you’re seeing here is multiversion concurrency control (MVCC) in action where there are two separate files associated with two different table versions. But how about if you do want to query the previous data? Well, this is an excellent segue to Delta time travel options.
In Chapter 1 we unpacked what was happening with the transaction log to provide ACID transactional semantics. In the previous section, we unpacked what was happening with the data files in relation to the transaction log in light of Delta transactional protocol with particular focus on multiversion concurrency control (MVCC). An interesting byproduct of MVCC is that all of the files from previous transactions still reside and are accessible in storage. That is, the positive byproduct of the implementation of MVCC in Delta is data versioning or time travel.
From a high-level, Delta automatically versions the big data that you store in your data lake, and you can access any historical version of that data. This temporal data management simplifies your data pipeline by making it easy to audit, roll back data in case of accidental bad writes or deletes, and reproduce experiments and reports. You can standardize on a clean, centralized, versioned big data repository in your own cloud storage for your analytics.
Data versioning is an important tool to ensure data reliability to address common challenges with changing data.
Auditing data changes is critical from both in terms of data compliance as well as simple debugging to understand how data has changed over time. Organizations moving from traditional data systems to big data technologies and the cloud struggle in such scenarios.
During model training, data scientists run various experiments with different parameters on a given set of data. When scientists revisit their experiments after a period of time to reproduce the models, typically the source data has been modified by upstream pipelines. Lot of times, they are caught unaware by such upstream data changes and hence struggle to reproduce their experiments. Some scientists and organizations engineer best practices by creating multiple copies of the data, leading to increased storage costs. The same is true for analysts generating reports.
Data pipelines can sometimes write bad data for downstream consumers. This can happen because of issues ranging from infrastructure instabilities to messy data to bugs in the pipeline. For pipelines that do simple appends to directories or a table, rollbacks can easily be addressed by date-based partitioning. With updates and deletes, this can become very complicated, and data engineers typically have to engineer a complex pipeline to deal with such scenarios.
Delta’s time travel capabilities simplify building data pipelines for the preceding use cases which we will further explore in the following sections. As noted previously, as you write into a Delta table or directory, every operation is automatically versioned. You can access the different versions of the data two different ways:
Using a timestamp
Using a version number
In the following examples, we will use the generated TPC-DS dataset so we can work with the following example customer table (customer_t
1
). To review the historical data, run the DESCRIBE HISTORY
command.
%
sql
DESCRIBE
HISTORY
customer_t1
;
The following results are an abridged version of the table initially focusing only on the version
, timestamp
, and operation
columns.
-- Abridged R
esults
+-------+-------------------+---------+
|version| timestamp|operation|
+-------+-------------------+---------+
| 19|2020-10-30 18:15:03| WRITE|
| 18|2020-10-30 18:10:47| DELETE|
| 17|2020-10-30 08:41:47| WRITE|
| 16|2020-10-30 08:37:29| DELETE|
| 15|2020-10-30 05:34:29| RESTORE|
| 14|2020-10-30 02:50:07| WRITE|
| 13|2020-10-30 02:37:17| DELETE|
| 12|2020-10-30 02:31:36| WRITE|
| 11|2020-10
-28 23:33:34| DELETE|
+-------+-------------------+---------+
The following sections describe the various techniques to work with your versioned data.
You can provide the timestamp or date string as an option to the DataFrame reader using the following syntax.
%
sql
-- Query metastore-defined Delta table by timestamp
SELECT
*
FROM
my_table
TIMESTAMP
AS
OF
"2019-01-01"
SELECT
*
FROM
my_table
TIMESTAMP
AS
OF
date_sub
(
current_date
(),
1
)
SELECT
*
FROM
my_table
TIMESTAMP
AS
OF
"2019-01-01 01:30:00.000"
-- Query Delta table by file path by timestamp
SELECT
*
FROM
delta
.
`<
path
-
to
-
delta
>`
TIMESTAMP
AS
OF
"2019-01-01"
SELECT
*
FROM
delta
.
`<
path
-
to
-
delta
>`
TIMESTAMP
AS
OF
date_sub
(
current_date
(),
1
)
SELECT
*
FROM
delta
.
`<
path
-
to
-
delta
>`
TIMESTAMP
AS
OF
"2019-01-01 01:30:00.000"
%
python
# Load into Spark DataFrame from Delta table by timestamp
(
df
=
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
"2019-01-01"
)
.
load
(
"/path/to/my/table"
))
%
scala
// Load into Spark DataFrame from Delta table by timestamp
val
df
=
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
"2019-01-01"
)
.
load
(
"/path/to/my/table"
)
Specific to the customer_t1
table, the following queries will allow you to query by two different timestamps. The following SQL queries will use the metastore-defined Delta table syntax.
%
sql
-- Row count as of timestamp t1
SELECT
COUNT
(
1
)
FROM
customer_t1
TIMESTAMP
AS
OF
"2020-10-30T18:15:03.000+0000"
-- Row count as of timestamp t2
SELECT
COUNT
(
1
)
FROM
customer_t1
TIMESTAMP
AS
OF
"2020-10-30T18:10:47.000+0000"
%
python
# timestamps
t1
=
"2020-10-30T18:15:03.000+0000"
t2
=
"2020-10-30T18:10:47.000+0000"
DELTA_PATH
=
"/demo/customer_t1"
>
# Row count as of timestamp t1
(
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
t1
)
.
load
(
DELTA_PATH
)
.
count
())
# Row count as of timestamp t2
(
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
t2
)
.
load
(
DELTA_PATH
)
.
count
())
%
scala
// timestamps
val
t1
=
"2020-10-30T18:15:03.000+0000"
val
t2
=
"2020-10-30T18:10:47.000+0000"
val
DELTA_PATH
=
"/demo/customer_t1"
// Row count as of timestamp t1
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
t1
).
load
(
DELTA_PATH
).
count
()
// Row count as of timestamp t2
spark
.
read
.
format
(
"delta"
)
.
option
(
"timestampAsOf"
,
t2
).
load
(
DELTA_PATH
).
count
()
The preceding queries have the following results below.
Timestamp | Row Count |
2020-10-30T18:15:03.000+0000 | 65000000 |
2020-10-30T18:10:47.000+0000 | 58500000 |
If the reader code is in a library that you don’t have access to, and if you are passing input parameters to the library to read data, you can still travel back in time for a table by passing the timestamp in yyyyMMddHHmmssSSS
format to the path:
spark.re
ad.format("delta").load(<path-to-delta>@yyyyMMddHHmmssSSS)
For example, to query for the timestamp 2020-10-30T18:15:03.000+0000
, use the following Python and Scala syntax.
%
python
# Delta table base path
BASE_PATH
=
"/demo/customer_t1"
# Include timestamp in yyyyMMddHHmmssSSS format
DELTA_PATH
=
BASE_PATH
+
"@20201030181503000"
# Get row count of the Delta table by timestamp using @ parameter
spark
.
read
.
format
(
"delta"
)
.
load
(
DELTA_PATH
)
.
count
()
%
scala
// Delta table base path
val
BASE_PATH
=
"/demo/customer_t1"
// Include timestamp in yyyyMMddHHmmssSSS format
val
DELTA_PATH
=
BASE_PATH
+
"@20201030181503000"
// Get row count of the Delta table by timestamp using @ parameter
spark
.
read
.
format
(
"delta"
).
load
(
DELTA_PATH
).
count
()
In Delta, every write has a version number, and you can use the version number to travel back in time as well.
%
sql
-- Query metastore-defined Delta table by version
SELECT
COUNT
(
*
)
FROM
my_table
VERSION
AS
OF
5238
SELECT
COUNT
(
*
)
FROM
my_table
@
v5238
-- Query Delta table by file path by version
SELECT
count
(
*
)
FROM
delta
.
`/
path
/
to
/
my
/
table
@
v5238
`
%
python
# Query Delta table by version using versionAsOf
(
df
=
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
"5238"
)
.
load
(
"/path/to/my/table"
))
# Query Delta table by version using @ parameter
(
df
=
spark
.
read
.
format
(
"delta"
)
.
load
(
"/path/to/my/table@v5238"
))
%
scala
// Query Delta table by version using versionAsOf
val
df
=
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
"5238"
)
.
load
(
"/path/to/my/table"
)
// Query Delta table by version using @ parameter
val
df
=
spark
.
read
.
format
(
"delta"
)
.
load
(
"/path/to/my/table@v5238"
)
Specific to the customer_t1
table, the following queries will allow you to query by two different versions. The following SQL queries will use the metastore-defined Delta table syntax.
%
sql
-- Query metastore-defined Delta table by version 19
SELECT
COUNT
(
1
)
FROM
customer_t1
VERSION
AS
OF
19
SELECT
COUNT
(
1
)
FROM
customer_t1
@
v19
-- Query metastore-defined Delta table by version 18
SELECT
COUNT
(
1
)
FROM
customer_t1
VERSION
AS
OF
18
SELECT
COUNT
(
1
)
FROM
customer_t1
@
v18
%
python
# Delta table base path
DELTA_PATH
=
"/demo/customer_t1"
# Row count of Delta table by version 19
(
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
19
)
.
load
(
DELTA_PATH
)
.
count
())
# Row count of Delta table by version 18
(
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
18
)
.
load
(
DELTA_PATH
)
.
count
())
# Row count of Delta table by @ parameter for version 18
(
spark
.
read
.
format
(
"delta"
)
.
load
(
DELTA_PATH
+
"@v18"
)
.
count
())
%
scala
#
Delta
table
base
path
val
DELTA_PATH
=
"/demo/customer_t1"
#
Row
count
of
Delta
table
by
version
19
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
19
).
load
(
DELTA_PATH
).
count
()
#
Row
count
of
Delta
table
by
version
18
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
18
).
load
(
DELTA_PATH
).
count
()
#
Row
count
of
Delta
table
by
@
parameter
for
version
18
spark
.
read
.
format
(
"delta"
)
.
load
(
DELTA_PATH
+
"@v18"
).
count
()
The preceding queries have the following results below.
Version | Row Count |
19 | 65000000 |
18 | 58500000 |
In this section, we will focus on how to apply time travel for various use cases, and why it’s important to them.
To troubleshoot the ETL pipeline or data quality issues, or to fix the accidental broken data pipelines. This is the most common use case for time travel and we will be exploring this use case throughout the book.
Time travel, offers a verifiable data lineage that is useful for governance, audit and compliance purposes. As the definitive record of every change ever made to a table, you can trace the origin of an inadvertent change or a bug in a pipeline back to the exact action that caused it. If your GDPR pipeline job had a bug that accidentally deleted user information, you can fix the pipeline. Retention and vacuum allows you to act on Data subject requests well within the timeframe.
Time travel also makes it easy to do rollbacks in case of bad writes. Due to a previous error, it may be necessary to rollback to a previous version of the table before continuing forward with data processing.
Time travel also plays an important role in machine learning and data science. Reproducibility of models and experiments is a key consideration for data scientists, because they often create 100s of models before they put one into production, and in that time-consuming process would like to go back to earlier models.
With AS OF queries, you can now pin the snapshot of a continuously updating Delta table for multiple downstream jobs.
Time travel simplifies time series analytics. You can use timestamp
and As Of
queries to extract meaningful statistics associated with the selected data points and shift in associated variables over time.
Next we will provide further details for each use case. Note, we will cover more advanced use cases which include time travel in chapter 14.
The use case of Governance described here is about protecting people’s information in regards to GDPR (General Data Protection Regulation) and CCPA (California Consumer Privacy Act). The role of governance is to set the direction of protecting information in the form of policies, standards, and guidelines.
One of the most common scenarios surrounding governance to protect people’s information is the need to perform a delete request as part of any GPDR or CCPA compliance. What that means for data engineers is that you need to locate the records from your Delta Lake and delete it within the specified time period. Users can run DESCRIBE HISTORY
to see metadata around the changes that were made. An example of this scenario is how Starbucks implements governance, risk and control for their Delta lake.
A delete request is recorded in the system where the user has requested that their information be removed from the data lake. This information is often recorded in its own table or store for auditing and lineage purposes.
This table (customer_delete_keys
) will need to join with the customer table (customer_t1
) to identify which users in the table need to be removed.
You will then delete this data from the system to address this delete request.
With Delta Lake, you can achieve the above listed goals by running the following steps. You can also use this notebook for the whole code.
As noted earlier, the customer_delete_keys
table contains the set of customer keys that need to be removed. To review the table, run the following query.
%
sql
SELECT
*
FROM
customer_delete_keys
%
python
# Query table using metastore defined table
sql
(
"SELECT * FROM customer_delete_keys"
)
.
show
()
# Query table from file path
spark
.
read
.
format
(
“
delta
”
)
.
load
(
"/customer_delta_keys"
)
.
show
()
%
scala
// Query table using metastore defined table
sql
(
"SELECT * FROM customer_delete_keys”).show()
// Query table from file path
spark.read.format(“delta”).load("
/
customer_delta_keys
"
).
show
()
Note, going forward in this section, we will only use metastore-defined SQL queries. In Python and Scala, you can also use the syntax:
sql("<sql-statement-here>").show()
A partial result of the above queries can be seen in the output below.
+----------------+-------------+ | c_customer_id|c_customer_sk| +----------------+-------------+ |AAAAAAAAKHJBCKBA| 27400570| |AAAAAAAAFIJBCKBA| 27400581| |AAAAAAAAGIJBCKBA| 27400582| |AAAAAAAAHIJBCKBA| 27400583| |... | ...| +----------------+-------------+
The following SQL query will allow you to quickly identify the records that need to be removed from customer_t1
.
%
sql
-- Identify the records
SELECT
COUNT
(
*
)
FROM
customer_t1
WHERE
c_customer_id
IN
(
SELECT
c_customer_id
FROM
customer_delete_keys
)
-- Result
+--------+
|count(1)|
+--------+
| 6500000|
+--------+
Once we have identified the matches, you can now go ahead and use the DELETE function to remove those records from the Delta table.
%
sql
-- Delete identified records
DELETE
FROM
customer_t1
WHERE
c_customer_id
IN
(
SELECT
c_customer_id
FROM
customer_delete_keys
)
Fortunately this process is pretty straightforward as you can remove the SELECT
clause and replace it with a DELETE
clause. You can validate this by reviewing the transaction log via the DESCRIBE HISTORY
command (See Figure 3-x).
%
sql
DESCRIBE
HISTORY
customer_t1
;
Below is the transposed table of the DELETE transaction so we can inspect information.
Column Name | Column Value |
version | 13 |
operation | DELETE |
operationParameters | [predicate -> ["exists(t1.`c_customer_id`)"]] |
operationMetrics | [ numRemovedFiles -> 23, numDeletedRows -> 6500000, numAddedFiles -> 100, numCopiedRows -> 42250002 ] |
Auditor’s role is to review and inspect that delivery conforms to those guidelines. For auditing purposes, you can review the Delta transaction log which tracks all of the changes to your Delta table. In this case, for version 13 of the table:
Through the operation
column, a DELETE
activity was recorded.
The operationParameters
denote how the delete occurred, i.e. the delete was done by the c_customer_id
column.
From an auditing perspective, the important callout is the 6,500,000 rows deleted (numDelete
dRows
) under operationMetrics
. This allows us to determine the magnitude of impact on the records.
It is important to note that when a DELETE
is executed on a Delta table, the most current version of the table no longer reads this data, as it is explicitly excluded. If you recall earlier in the chapter, the Remove
column of the transaction log will contain the files that no longer should be included.
For some organizations, this meets their compliance requirements as the data is no longer accessible by default. But depending on your legal requirements, there are additional considerations:
If the data must be removed immediately, run the VACUUM
command to remove the data files
Instead of deleting, UPDATE
the data (e.g. update the name column with some other value such as a GUID) so you can keep the associated IDs without having any personally identifiable information.
Place personally identifiable information into a separate table so that GDPR delete requests would result in only updating or deleting the information from this demographics table with little to no impact on your fact data.
It is important to note that this example is showcasing a simplified governance use case. You will need to work with your legal department to determine your actual requirements. The key takeaway for our discussion here is that Delta Lake time travel allows you to safely remove data from your Delta Lake, reverse it in case there are mistakes, and track the progress through the Delta Lake transaction log.
For more information on governance:
Time travel also makes it easy to do rollbacks so you can reset your data to a previous snapshot. If a user accidentally inserted new data or your pipeline job inadvertently deleted user information, you can easily fix it by restoring the previous version of the table.
The following code sample takes our existing customer_t1
table and restores it to version 17. Let’s start by reviewing the history of our table.
%
sql
DESCRIBE
HISTORY
customer_t1
--- abridged results
+-------+---------+---------------------------------------------+
|version|operation|operationParameters |
+-------+---------+---------------------------------------------+
|18 |DELETE |[predicate -> ["exists(t1.`c_customer_i
d`)"]]|
|17 |WRITE |[mode -> Overwrite, partitionBy -> []] |
+-------+---------+---------------------------------------------+
To rollback, what we really are doing is that once we identify which version of the table we want to travel back to, we will load that version using as of.
Behind the scenes, delta inserts it as a new version and overwrites the latest snapshot. In this scenario, you can see from the abridged history results that version 18 of the table contains a DELETE
operation that we want to rollback from. The following code allows you to perform the rollback.
%
python
# Restore customer_t1 table to Version 17
>
# Load Version 17 data using df
df
=
spark
.
read
.
format
(
"delta"
)
.
option
(
"versionAsOf"
,
"17"
)
.
load
(
"/customer_t1"
)
# Overwrite Version 17 as the current version
df
.
write
.
format
(
"delta"
)
.
mode
(
"overwrite"
)
.
save
(
"/customer_t1"
)
%
scala
// Restore customer_t1 table to Version 17
// Load Version 17 data using df
val
df
=
spark
.
read
.
format
(
"delta"
).
option
(
"versionAsOf"
,
"17"
).
load
(
"/customer_t1"
)
// Overwrite Version 17 as the current version
df
.
write
.
format
(
"delta"
).
mode
(
"overwrite"
).
save
(
"/customer_t1"
)
You can validate this by either querying the data or reviewing the table history. The following is how you would do this using SQL.
%
sql
-- Get the row counts between two table versions and calculate the difference
SELECT
a
.
v19_cnt
,
b
.
v17_cnt
,
a
.
v19_cnt
-
b
.
v17_cnt
AS
difference
FROM
-- Get V19 row count
(
SELECT
COUNT
(
1
)
AS
v19_cnt
FROM
customer_t1
VERSION
AS
OF
19
)
a
CROSS
JOIN
-- Get V17 row count
(
SELECT
COUNT
(
1
)
AS
v17_cnt
>
FROM
customer_t1
VERSION
AS
OF
17
)
b
-- Result
+--------+--------+----------+
| v19_cnt| v17_cnt|difference|
+--------+--------+----------+
|65000000|65000000| 0|
+--------+--------+----------+
A comparable code can be written in Python and Scala versions as shown below .
%
python
# Get V19 row count
v19_cnt
=
sql
(
"SELECT COUNT(1) AS v19_cnt FROM customer_t1 VERSION AS OF 19"
)
.
first
()[
0
]
# Get V17 row count
v17_cnt
=
sql
(
"SELECT COUNT(1) AS v17_cnt FROM customer_t1 VERSION AS OF 17"
)
.
first
()[
0
]
# Calculate the difference
(
"Difference in row count between v19 and v17:
%s
"
%
(
v19_cnt
-
v17_cnt
))
%
scala
// Get V19 row count
val
v19_cnt
=
sql
(
"SELECT COUNT(1) AS v19_cnt FROM customer_t1 VERSION AS OF 19"
).
first
()(
0
).
toString
.
toInt
// Get V19 row count
val
v17_cnt
=
sql
(
"SELECT COUNT(1) AS v17_cnt FROM customer_t1 VERSION AS OF 17"
).
first
()(
0
).
toString
.
toInt
// Calculate the difference
(
"Difference in row count between v19 and v17: "
+
(
v19_cnt
-
v17_cnt
))
You will get the same result for both the Python and Scala examples.
-- Result
Difference in row count between v19 and v17: 0
You can also validate the restore by reviewing the table history; note that there is now version 19 that records the overwrite statement.
%sql DESCRIBE HISTORY customer_t1
--- abridged results
+-------+---------+----------------------------------------
-----+
|version|operation|operationParameters |
+-------+---------+---------------------------------------------+
|19 |WRITE |[mode -> Overwrite, partitionBy -> []] |
|18 |DELETE |[predicate -> ["exists(t1.`c_cus
tomer_id`)"]]|
|17 |WRITE |[mode -> Overwrite, partitionBy -> []] |
+-------+---------+---------------------------------------------+
Note, if you are using Databricks, this step will be a little easier when restoring with at least DBR 7.4 with the RESTORE command. You can perform the same steps as above with a single atomic command.
%
sql
-- Restore table as of version 17
RESTORE
customer_t1
VERSION
AS
OF
17
In addition to the previous queries comparing the count of two different versions, once you run the RESTO
RE
, additional information gets stored within the transaction log which are shown under the operationMetrics
column map.
%
sql
DESCRIBE
HISTORY
customer_t1
-- Abridged results
+-------+---------+---------------------------------------------+
|version|operation|operationParameters |
+-------+---------+---------------------------------------------+
|20 |RESTORE |[version -> 17, timestamp ->]
|
|19 |WRITE |[mode -> Overwrite, partitionBy -> []] |
|18 |DELETE |[predicate -> ["exists(t1.`c_customer_id`)"]]|
|17 |WRITE |[mode -> Overwrite, partitionBy -> []] |
+-------+---------+-------------------------------
--------------+
We are highlighting the record 20
from our output above for the restore
operation of the delta table to explain the operationMetrics
restore-based parameters.
{
"numRestoredFiles": "31", "removedFilesSize": "3453647999", "numRemovedFiles": "31", "restoredFilesSize": "3453647999", "numOfFilesAfterRestore": "31", "tableSizeAfterRestore": "3453647999"}
For more information about the RESTORE command, see Restore a Delta table.
We need to keep additional considerations in mind with respect to time travel.
Data retention:
One very important thing to remember is Data retention. You can only travel back so far depending on what time limits we set and how much data is retained.
To time travel to a previous version, you must retain both the log and the data files for that version.
Vacuum:
The ability to time travel back to a version older than the retention period is lost after running vacuum.
By default, `vacuum()` retains all the data needed for the last 7 days.
When you run the vacuum
with specified retention of 0 hours, it will throw an exception.
Delta protects you by giving this warning to save you from permanent changes to your data or if a downstream reader or application is referencing your table.
If you are certain that there are no operations being performed on this table, such as insert/upsert/delete/optimize, then you may turn off this check by setting: spark.databricks.delta.retentionDurationCheck.enabled = false
VACUUM
doesn’t clean up log files; log files are automatically cleaned up after checkpoints are written.
Another note to satisfy the delete requirement is that you also need to delete it from your blob storage. So as a best practice, whenever the ETL pipeline is set you should apply the retention policy from the start itself so you don’t have to worry later.
In this chapter, we focussed on a powerful benefit of the MVCC (Multi-Version Concurrency Control) implementation of Delta, that enables users to have the benefit of data versioning or time travel. In addition to showing common challenges with changing data, we explained the Delta time machine and its properties, how time travel works and what operations you can perform while traveling back in time.
As alluded to earlier in the chapter, time travel allows developers to debug what happened with their Delta table, improving developer productivity tremendously. One of the common reasons developers need to debug their table is due to data modification such as delete, update, and merge. This brings us to our next chapter, Chapter 4: Data Modifications in Delta tables.
3.139.86.56