With more enterprises leaning on real-time data and analytics to drive business decisions, BigQuery and data warehousing techniques are critical. As Google Cloud’s serverless, Petabyte-scale data-warehouse, BigQuery is often your first and last stop for data storage, large-scale analytics, and even SQL based ML models.
The following recipes show examples of implementing data loading, scalable data querying, and streaming in BigQuery. Included are tips and tricks above and beyond standard SQL and specific to the BigQuery service and implementation.
All code samples for this chapter are located at https://github.com/ruiscosta/google-cloud-cookbook/chapter-8. You can follow along and copy the code for each individual recipe by going to the folder with that recipe’s number
You want to build a traditional pivot table in BigQuery, but need to do so in SQL.
You can build a query with CASE statements. Alternatively you can use a publicly available stored procedure fhoffa.x.pivot().
Implement a series of IF statements, in this case using SUM() aggregation function since we want a raw count.
#standardSQL SELECT datehour, SUM(IF(title LIKE '%google%', 1, NULL)) google_views, SUM(IF(title LIKE '%cloud%', 1, NULL)) cloud_views, SUM(IF(title LIKE '%maps%', 1, NULL)) maps_views FROM `bigquery-public-data`.wikipedia.pageviews_2020 WHERE EXTRACT(date FROM datehour) = "2020-01-02" GROUP BY datehour ORDER BY datehour ASC
Outputs:
Alternatively you can use a publicly shared function, fhoffa.x.pivot().
CALL fhoffa.x.pivot( 'bigquery-public-data.iowa_liquor_sales.sales' # source table , 'fh-bigquery.temp.test' # destination table , ['date'] # row_ids , 'store_number' # pivot_col_name , 'sale_dollars' # pivot_col_value , 30 # max_columns , 'SUM' # aggregation , '' # optional_limit );
If you are summing across a particular column, use the SUM() function as described above. If you’re pulling out the values themselves, you would use AVG(), MA(), or ANY_VALUE(). For example if you wanted to preserve all the article names that look like “google” and put that in the “google_views” column you would use: MAX((IF(title LIKE '%google%', title, NULL)) google_views.
Many pivot use-cases can be handled with the public stored procedure fhoffa.x.pivot(). More information can be found here: https://towardsdatascience.com/easy-pivot-in-bigquery-one-step-5a1f13c6c710
You have a BigQuery table that you need to add a partitioned column and clustered columns to increase query performance and decrease query costs.
BigQuery Standard SQL allows you to create a new table with the partitioned and clustered columns and populate it in the same statement with an AS clause. Note, it will not be the same table, though you could do this in place?
Validate the source table doesn’t have partitioned or clustered columns. Notice the lack of columns on the right
bq show bigquery-public-data:london_bicycles.cycle_hire
Outputs:
Dry-run a time-based query to see how much data is scanned.
# pre-partitioned dry run QUERY=' SELECT duration, bike_id, start_date, start_station_name, end_date, end_station_name FROM `bigquery-public-data.london_bicycles.cycle_hire` WHERE EXTRACT(DATE FROM start_date ) = "2016-04-03" AND EXTRACT(DATE FROM end_date ) = "2016-04-03" ORDER BY duration DESC LIMIT 5 ' bq query --use_legacy_sql=false --location=EU --dry_run $QUERY
Create a new table in your dataset, indicating the partitioned and schema columns.
CREATE OR REPLACE TABLE mydataset1.cycle_hire_partitiond_clustered PARTITION BY DATE(start_date) CLUSTER BY bike_id OPTIONS( expiration_timestamp=TIMESTAMP "2025-01-01 00:00:00 UTC", description="My partitioned and clustered cycle_hire table", labels=[("cookbook_query", "development")] ) AS SELECT * FROM `bigquery-public-data.london_bicycles.cycle_hire`
Validate that the destination table does have partitioned or clustered columns.
bq show mydataset.cycle_hire_partitiond_clustered
Dry-run query on new table
# post-partitioned dry run QUERY=' SELECT duration, bike_id, start_date, start_station_name, end_date, end_station_name FROM `dhodun1.mydataset.cycle_hire_partitioned_clustered` WHERE EXTRACT(DATE FROM start_date ) = "2016-04-03" AND EXTRACT(DATE FROM end_date ) = "2016-04-03" ORDER BY duration DESC LIMIT 5 ' bq query --use_legacy_sql=false --location=EU --dry_run $QUERY # Should output that query will process 2MB
You want to return the top item from a sorted list in a BigQuery query, but your dataset is particularly large and causing issues with the ROW_NUMBER() OVER() windowing function since you’re forcing a full sort of the dataset.
You can apply a trick with ARRAY_AGG(x LIMIT 1)[OFFSET(0)] which will allow BigQuery to drop all data that isn’t the number 1 row, increasing query performance.
Run a query using ROW_NUMBER OVER() windowing function. This query should succeed but will take a while.
SELECT rental_id, duration, bike_id, end_date FROM ( SELECT rental_id, duration, bike_id, end_date, ROW_NUMBER() OVER (ORDER BY end_date ASC) rental_num FROM `bigquery-public-data`.london_bicycles.cycle_hire ) WHERE rental_num = 1
Run a query using ROW_NUMBER OVER() windowing function. This query should succeed more quickly.
SELECT rental.* FROM ( SELECT ARRAY_AGG( rentals ORDER BY rentals.end_date ASC LIMIT 1)[OFFSET(0)] rental FROM ( SELECT rental_id, duration, bike_id, end_date FROM `bigquery-public-data`.london_bicycles.cycle_hire) rentals )
While ROW_NUMBER() has a scalable implementation that can perform distributed sorts across multiple BigQuery nodes, the entire sort needs to be calculated. In particularly large queries, using ARRAY_AGG allows BigQuery to drop unneeded data.
You are loading data into an existing BigQuery table that might introduce duplicate rows. This is common with repeated batch uploads, or when consolidating from a streaming ingest table that might contain “newer” versions of a given record or aggregation.
Using the MERGE ON...WHEN MATCHED clause in DML, you can easily indicate to BigQuery a join key to de-duplicate on and insert the new rows into your final table.
Create a production table.
bq cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset.cycle_hire
Create a loading table with new and duplicate data:
CREATE OR REPLACE TABLE mydataset.temp_loading_table AS ( --Grab 5 duplicate rows SELECT * FROM mydataset.cycle_hire LIMIT 5) UNION ALL ( --Add a new unique row SELECT 111147469109, 3180, 7054, '2015-09-03 12:45:00 UTC', 111, 'Park Lane, Hyde Park', '2015-09-03 11:52:00 UTC', 300, 'Serpentine Car Park, Hyde Park', NULL, NULL, NULL)
Verify the counts in both tables:
--Number of Rows in base table SELECT COUNT(*) FROM mydataset.cycle_hire; --Number of Rows in loading table SELECT COUNT(*) FROM mydataset.temp_loading_table;
Merge the data into your production table with an ON clause
MERGE my_london_bicycles_dataset.cycle_hire rentals USING my_london_bicycles_dataset.temp_loading_table temp ON temp.rental_id = rentals.rental_id WHEN NOT MATCHED THEN INSERT ROW
Verify only 1 row has been inserted:
--Number of rows now in base table SELECT COUNT(*) FROM my_london_bicycles_dataset.cycle_hire;
This recipe shows how to insert data in a way that doesn’t cause duplicates. For an example of how to filter out duplicates inline in a query, see the following example but without any need for sorting in the ARRAY_AGG function.
It’s also worth noting that you can accomplish this with SELECT DISTINCT * FROM mydataset.mytable, which will ensure every record is entirely unique (not just the key), but this is often a more costly query.
Your application is INSERTING updated rows or aggregations into a BigQuery table rather than UPDATING existing ones for performance reasons, and you need to select just the newest data. This is common in streaming applications and frameworks, such as Dataflow.
You could easily do this with standard GROUP BY key(s) ORDER BY timestamp LIMIT 1, but for a more scalable approach we implement our ARRAY_AGG(x LIMIT 1)[OFFSET(0)] trick from before so that an older data is simply dropped by BigQuery workers in-flight.
Create a production table.
bq --location=eu cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset.cycle_hire
Insert a “new” row with same ride_id but updated duration
INSERT INTO dhodun1.mydataset.cycle_hire VALUES ( 47469109, 3300, 7054, '2015-09-03 12:47:00 UTC', 111, 'Park Lane, Hyde Park', '2015-09-03 11:52:00 UTC', 300, 'Serpentine Car Park, Hyde Park', NULL, NULL, NULL ), ( 47469109, 3660, 7054, '2015-09-03 12:53:00 UTC', 111, 'Park Lane, Hyde Park', '2015-09-03 11:52:00 UTC', 300, 'Serpentine Car Park, Hyde Park', NULL, NULL, NULL )
View all three records
SELECT * FROM mydataset.cycle_hire WHERE rental_id = 47469109 ORDER BY end_date DESC
Use ARRAY_AGG to get the latest one
SELECT latest_record.* FROM ( SELECT rental_id, ARRAY_AGG(rentals ORDER BY end_date DESC LIMIT 1)[OFFSET(0)] latest_record FROM mydataset.cycle_hire rentals WHERE rental_id = 47469109 GROUP BY rental_id )
Remove WHERE clause to query over the entire dataset.
This trick adds an outer SELECT clause, mostly to clean up the nested output from ARRAY_AGG, but is a scalable implementation of cleaning up data when there are newer records. This can be hidden from the user by placing the query in a View or Materialized View for end-users analysts or downstream programs to consume from.
This use case is particularly common with streaming systems or any high-throughput systems that have newer versions of data or aggregations. While BigQuery does support ACID compliant UPDATE AND DELETE DML statements, so that individual records can be mutated an updated, this is generally a costly operation since the files backing BigQuery are immutable (an update triggers entire file-rewrites), and the system overall is tuned for OLAP. For this reason, some scalable streaming systems or otherwise would opt to generate new records in BigQuery, and let BigQuery perform the deduplication as above.
You have accidentally deleted a table in BigQuery and need to recover it.
You can either use snapshot decorators with the bq tool to recover the data. If you have already re-created a table,
Create a dummy table:
CREATE OR REPLACE TABLE mydataset.cycle_hire AS SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire
Note the UTC time.
date -u +%s
“Accidentally delete the table”
DROP TABLE mydataset.cycle_hire
Restore the data into a temporary staging table. Note we multiply the unix seconds by 1000 since we need milliseconds.
bq --location=eu cp [email protected] mydataset.cycle_hire_restored
This fix should also work if you have already replaced the table, which occasionally happens if you have automated pipelines configured to create the table if it doesn’t exist. In this situation, you can also use BigQuery “Time Travel” with FOR SYSTEM TIME AS OF in SQL to recover the data:
SELECT * FROM mydataset.cycle_hire_restored FOR SYSTEM TIME AS OF '2020-12-29 21:44:09.413928 UTC'
You have well-formed JSON or Avro data on a PubSub topic and you’re looking to ingest that real-time into a BigQuery table.
Configuring and executing a Dataflow template can quickly spin up a persistent, streaming Dataflow job with little to no code to achieve this.
Sample a message from the sample public taxi data topic:
gcloud pubsub subscriptions create taxi-test-sub --topic projects/pubsub-public-data/topics/taxirides-realtime gcloud pubsub subscriptions pull projects/<your-project-id>/subscriptions/taxi-test-sub
Will result in JSON message data:
{"ride_id":"7128ff90-62f7-42de-a93f-67d1f9e7c713","point_idx":2401,"latitude":40.74163,"longitude":-73.95092000000001,"timestamp":"2021-01-06T20:56:11.93239-05:00","meter_reading":55.330975,"meter_increment":0.02304497,"ride_status":"enroute","passenger_count":2}
Create a destination BigQuery Tables, both for well-formed data and a dead-letter table. A schema is needed on the BigQuery table matching the JSON data on the Pub/Sub topic.
cat <<EOF > schema.json [ { "name": "ride_id", "type": "STRING" }, { "name": "point_idx", "type": "INTEGER" }, { "name": "latitude", "type": "FLOAT" }, { "name": "longitude", "type": "FLOAT" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "meter_reading", "type": "FLOAT" }, { "name": "meter_increment", "type": "FLOAT" }, { "name": "ride_status", "type": "STRING" }, { "name": "passenger_count", "type": "INTEGER" } ] EOF bq mk mydataset.taxi_data schema.json bq mk mydataset.taxi_deadletter
Create a temporary GCS bucket for the Dataflow Job:
BUCKET=gs://my-bucket gsutil mb $BUCKET
In the Google Cloud Console, open the Dataflow page and click Create Job From Template:
Add a job name and choose “Pub/Sub Topic to BigQuery”. Note the other templates that are available.
For Input Pub/Sub Topic, point to the topic:
projects/pubsub-public-data/topics/taxirides-realtime
Add the BigQuery table and the temporary GCS bucket for the dataflow job:
Optionally, click SHOW OPTIONAL PARAMETERS and fill in the dead-letter BigQuery table. Also take note of other parameters that are available.
Click RUN JOB
Give the job a couple minutes to spin up, then click on the “ReadPubSubTopic” and “WriteSuccessfulRecords” graph nodes to see elements are flowing through the dataflow job:
Validate that data is being written to BigQuery:
bq head mydataset.taxi_data
Make sure to Stop the Dataflow Job
This recipe shows how to quickly get up and running with your first streaming pipeline. You can extend the functionality to Avro data using the Avro template. You can also provide a JavaScript transform in the additional parameters for lightweight transformations that don’t require you to write the entire pipeline code. Any records that are failed to be transformed or processed will be written to the dead-letter table for later analysis (you can test this if you have your own topic and you publish a malformed message, it should show up quickly in this table). Lastly, if you inspect the details of the BigQuery table in the UI, you’ll note that some of your data has been committed to the “Streaming Buffer” but is still immediately available for queries.