Chapter 2. Real-time Analytics for Real-time Business Insights: Presto & Apache Pinot

Like a salad, data is best when you consume it fresh. It gives you the most benefits as soon as it’s made. Fresh data is also known as real-time data, or a series of events being generated from users, machines, or sensors. Fresh can be loosely defined as data that just happened a second ago. There are many types of fresh data, depending on the business. For LinkedIn, fresh data is the user clickstream, which is consumed by their flagship analytics product for members: “Who’s Viewed Your Profile.” For Uber, fresh data drives their user experience: getting a ride booked in seconds which shows up in minutes, with pricing based on real-time forecasts of local demand (also known as surge pricing). For other companies, fresh data could uncover fraudulent purchases.

Real-time datastores like Apache Pinot were designed to handle low-latency queries for constantly changing fresh data, in addition to static data. Their common characteristics are near real-time ingestion with Apache Kafka and horizontal scalability to deliver consistent low-latency OLAP queries on any amount of data. This chapter will introduce Pinot, show how to extend Presto with the Presto-Pinot connector and finally share some hard won production best practices and tuning configurations for getting the best experience out of Pinot.

Introducing Apache Pinot

Apache Pinot is a real-time distributed OLAP datastore, designed to deliver real-time analytics with low latency. It can ingest from real-time stream data sources (such as Apache Kafka) and batch data sources (such as Hadoop HDFS, Amazon S3, Azure ADLS, Google Cloud Storage). It answers queries with single digit to low tens of milliseconds, and its excellent compute efficiency helps it scale to site facing online traffic.

Pinot was first developed at LinkedIn, where it runs at large scale, serving tens of thousands of analytical queries per second while ingesting real-time data from streaming data sources. They needed to be able to analyze the latest data in real time. But, in 2012, the amount of data was growing faster than the LinkedIn team could analyze it. They looked to the current database landscape for a solution. On one hand, operational, transactional databases like RDBMS and NoSQL databases could be scaled to provide fast reads and writes for pinpoint data for a large number of users, but had trouble dealing with the intersection of the analytic nature of the queries and high ingest rates. On the other hand, existing OLAP databases relied upon batch loads and batch queries of data, thus were not designed to handle low-latency queries or to scale for serving thousands of concurrent queries per second. Thus they embarked on developing Pinot.

Note

Apache Pinot was named after Pinot, the notoriously difficult grape varietal for growing and producing wine. Pinot Noir is considered one of the most complex wines. This is an analogy to the real-time data environment: complex and challenging, yet, when done well, powerful in its results.

With Presto connected to Pinot, fresh data can be consumed by applications, data analysts, and scientists using standard ANSI SQL.

A closer look at Pinot

Pinot is a distributed system consisting of many different components as shown in Figure 2-1. In general, each component is independently scaleable and has unique responsibilities. Pinot requires a Zookeeper installation to help it propagate state across components for high availability. We will first cover the Pinot controllers and servers, and then see how the data is physically distributed across the cluster and finally close with how queries are executed in Pinot.

Figure 2-1. The Pinot distributed system

Pinot controller

The Pinot controller manages the state of the pinot cluster: It is the brain that plans the distribution of data across Pinot servers and rebalances the data as servers come and go. It also stores the table metadata like the schema and indexing information. The Pinot controller uses Apache Helix (another open source project out of LinkedIn) to save its state and ensure high availability. Helix is a general cluster management library that uses Apache Zookeeper under the covers.

Pinot servers

Pinot servers are the workhorses of a Pinot cluster and they are responsible for both storing the data and answering queries on it. They are instructed to ingest new data by the controller from either a real-time stream like Kafka or by bulk loading data from a distributed file system like GCS/S3/HDFS. Servers store the data in the form of data segments that will be elaborated next. A note on the naming: Pinot just chooses to call them “Servers”, but they are simply Java processes that can be containerized or virtualized. That is, Pinot supports modern infrastructure like Kubernetes and the cloud equally well, in addition to on premise bare metal.

Pinot data distribution

Logically a Pinot table is split into segments. A segment represents a horizontal slice of a table including all of its indices. Each segment can contain many tens of thousands of rows and is usually sized to be a couple of gigabytes. The segments store the data in a columnar compressed manner. Pinot segments support efficient indices and encoding techniques to make querying super fast. These segments are the atomic unit of data and are not re-packaged once created. Pinot segments are deleted once they are past the configured table retention.

All aspects of the data distribution like the segment size, retention, indices, replication factor etc are fully configurable on a per table basis when creating the Pinot table or afterwards.

The controller is responsible for choreographing the initial assignment, replication, and migration of the segments across the servers. It keeps track of the up-to-date physical location of the segments in the form of what it calls a “Routing Table” (not to be confused with the networking routing table).

Pinot Brokers and Querying

The Pinot Broker is the component in Pinot responsible for serving an HTTP POST endpoint for querying via a SQL like language. It is stateless and indeed the cluster can have multiple brokers, scaled corresponding to the query traffic. The controller keeps track of all the live brokers. The client initiates the query by first asking the controller for the list of live brokers and then makes an HTTP POST call to do the query. An example Pinot SQL query might look like: “SELECT name, COUNT(*) from table_name WHERE country = ‘US’ AND time_column_seconds > 1599516412 GROUP BY name ORDER BY COUNT(*) DESC LIMIT 10,”1 to return the top 10 American names in a table after a certain time point.

The broker proceeds similarly to Presto coordinator in that it first compiles the query into an internal operator tree. The lower part of the operator tree is converted into an internal query for the Pinot Servers. For the example Pinot query above, the servers would do the filtering and the partial aggregation. The broker then proceeds by asking the controller for the current Routing Table of the pinot table being queried, to know which segment is on which server. If there are any partitioning columns in the table, then those are used to prune the list of servers to query. The server specific internal query is scattered to all of the Pinot servers as shown in Figure 2-2. The servers then respond back and return partial results, much the same way as Presto workers. Just like the Presto coordinator, the broker stitches the results together by executing the upper part of the operator tree: For the example above, the partial GROUP BYs returned by the servers would be re-aggregated, sorted on the COUNT and the top 10 results retained.

Figure 2-2. Query Processing at the Pinot Broker

Why Use Pinot via Presto?

It’s a natural question to ask.Why we should connect Pinot data with Presto querying in the first place? Why would we want to query Pinot via Presto instead of querying it directly via its SQL endpoint ?

Consider the common use case of visualization of real-time data contained in your Pinot cluster into a dashboard. Typically you will use a third party tool like Looker, Tableau, Redash or Superset for such visualizations and dashboards. These tools expect full ANSI SQL and ODBC connectivity. They already have very rich and deep support for Presto. Pinot does not yet have full ANSI SQL support, nor does it support standard database access APIs like ODBC/JDBC. In fact, the only way to integrate these tools natively with Pinot is to write a custom Pinot client for them.2

Another great benefit of accessing Pinot via Presto is that it unlocks advanced SQL capabilities like subqueries, UNIONs and JOINs that are unavailable in Pinot. You are not chained to the single-table query capabilities of Pinot and can instead utilize the full power of Presto SQL.

Accessing Pinot via Presto prevents Data Silos and framework lock in. It allows you to enrich the data stored in Pinot with other data sources. A fairly common use case I encountered at Uber is to store the fact event data in Pinot and use Presto to join it with the dimension tables stored in Hive. As your data usage story evolves, you can easily migrate a table from one framework to another. For example, say you find out that you are doing mostly single key searches in Pinot and that they would be better served by a key-value system like Cassandra. You can load the Pinot table into Cassandra, and just change the catalog name to point to the Cassandra connector. And voila, now the same single key searches are sped up by having them be served by Cassandra.

Lastly, it’s important to point out that Pinot isn’t the only real-time OLAP system out there. There are other systems like Druid, ElasticSearch etc that can also serve the same use cases as Pinot with a different tradeoff in terms of performance and query capabilities. The great thing is that you can access all of these data sources via Presto: So you can easily play with them at the same time, without having to change your queries each time you want to try out a new system. The users don’t see any difference in the queries they write. For all they care, they are simply using Presto via Presto SQL. This flexibility eases the adoption curve and allows you to gradually move your workload from Druid to Pinot, or vice versa !.

Tip

Apache Druid is another popular real-time datastore. By using the Presto-Druid connector, you can find similar benefits as those we discuss for Pinot. For Presto-Druid specifics see these docs.

To summarize, accessing Pinot via Presto unlocks the data stored in Pinot and enables it to be used for deep insights and integration with many other third party tools. Having convinced you about how Presto+Pinot is a winning combination, let’s dive right into setting up Presto to talk to Pinot.

Setting up Presto+Pinot

Having seen how Presto unlocks the true power of Pinot, let’s get our hands dirty by wiring them up.

To get started, please choose one of the many options listed in https://docs.pinot.apache.org/basics/getting-started for deploying Pinot in your environment. Your choice would depend on how you have deployed Presto. If you are running Presto on bare metal, then the easiest approach would be to run the Pinot Docker on the same machine as the Presto coordinator. Or if you have Presto running in a cloud provider, you can also install Pinot there. The only important consideration is that Presto should have network connectivity to Pinot. Pinot supports installation across all three public clouds, bare metal, Kubernetes and as a stand alone docker container. The Pinot quick start options bundle in Zookeeper and don’t require any other dependencies. They come preinstalled with a table baseballStats that we will be using to demonstrate Presto-Pinot capabilities.

Connecting a Pinot cluster to Presto

As discussed earlier, the Pinot controller is the brains of the Pinot cluster. So to make Presto access Pinot, all we need to do is to create a Pinot catalog that tells Presto about the controller URIs.

You can connect multiple Pinot clusters to your Presto installation, but each cluster must be added as a separate catalog. For example, say you have a Pinot cluster with controllers running at controller_host1:9000 and controller_host2:9000. You can expose this as a catalog called mypinotcluster by creating a file called etc/catalog/mypinotcluster.properties, with the contents:

# cat etc/catalog/mypinotcluster.properties
connector.name=pinot
pinot.controller-urls=controller_host1:9000,controller_host2:9000

Presto will randomly pick one of the above controllers.

The Pinot catalog property file

Like other connectors in the Presto ecosystem, catalogs are configured using a catalog property file. This catalog property file specifies the connector name (“pinot” in this case) and any other connector specific configuration. The Pinot connector has a plethora of configuration options that can be configured by adding them to the above catalog file, or as session properties under the “pinot.” namespace.

Exposing Pinot tables as Presto tables

After adding a Pinot catalog, the tables in the Pinot cluster will automatically show up in Presto. What happens under the hood is the Presto coordinator asks the Pinot controller for all the tables and simply exposes them as Presto tables.

Just to recap, a fully qualified presto table has the form: “Catalog.Schema.TableName”. Pinot does not have a concept of schema, so you can use anything as the schema name. However, for the purposes of output, the Pinot connector states that it has a single schema named ‘default’ containing all of the tables. To refer to a table, you can use any schema name including ‘default’. For example, this is how the preloaded table baseballStats bundled with the Pinot distribution shows up. Note that it appears in the schema ‘default’.

presto:default> show schemas from pinot;
       Schema
--------------------
 default
 information_schema
(2 rows)

Query 20200901_071445_00043_gutrt, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [2 rows, 35B] [20 rows/s, 351B/s]

presto:default> show tables from pinot.default;
     Table
---------------
 baseballstats
 
(1 row)
Query 20200901_071453_00044_gutrt, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [1 rows, 30B] [12 rows/s, 388B/s]
Note

Presto is case insensitive and table names and column names are typically listed in lower case. Pinot on the other hand is case sensitive. Presto will map the case sensitive Pinot table name into an all lowercase version. That is, Presto-Pinot connector will map the Presto table Baseballstats (or baseballstats) into the case sensitive Pinot version baseballStats.

Having seen how Pinot tables appear in Presto, lets now see how the Pinot data types are mapped to Presto types. Most of the Pinot data types have straight forward Presto analogues (for example STRING pinot data type corresponds to VARCHAR in Presto, INT pinot data type corresponds to INTEGER in Presto) and so on. Converting Pinot TIME and DATE_TIME to the Presto DATE and TIMESTAMP fields requires a little bit more work, which we will discuss later. Some Pinot types don’t have a Presto analogue and are coerced. For example, the Pinot FLOAT type is converted into a Presto DOUBLE and Pinot multi-value types are coerced into a Presto VARCHAR. Pinot columns are typically divided into Metric and Dimension columns. You can GROUP BY on dimension columns, while metric columns are typically aggregated. The listing below shows how to view the Presto schema of a Pinot table.

presto:default> describe pinot.any_schema_name_works.baseballstats;
         Column          |  Type   | Extra |  Comment
-------------------------+---------+-------+-----------
 homeruns                | integer |       | METRIC
 playerstint             | integer |       | METRIC
...
 yearid                  | integer |       | DIMENSION
 hits                    | integer |       | METRIC
 runsbattedin            | integer |       | METRIC
...
(25 rows)

Query 20200901_071514_00046_gutrt, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [25 rows, 2.37KB] [270 rows/s, 25.7KB/s]

Now that we have wired up our Pinot cluster to Presto, we can get to the business of actually querying it !. However, before we do that it’s worth talking about how exactly does the Presto-Pinot connector query Pinot.

How Presto queries Pinot

Figure 2-3. Pushdown in Presto-Pinot connector

The Presto-Pinot connector works in consortium with Presto’s query optimizer and attempts to identify the maximal operator subtree that can be pushed down into Pinot. It acts after all of the other Presto optimizer rules have been executed to ensure that it can find good candidates to push down. As shown in Figure 2-3, the pushdown creates a special table scan node in Presto that embeds the operator subtree pushed into Pinot. This special scan node takes care of querying Pinot.

The actual scan can happen in one of two modes depending on the operator tree pushed into the Pinot table scan node as shown in Figure 2-4. Presto prefers querying the Pinot broker if possible, but otherwise Presto can also masquerade as a Pinot broker by directly contacting the Pinot servers and aggregating their results. These two querying modes are referred to as “Direct-to-Broker” and “Direct-to-Server” respectively. The “Direct-to-Broker” mode is preferred because it’s more efficient and allows leveraging some of the optimizations in the Pinot broker, such as partition aware server pruning.

Figure 2-4. Executing the Pinot Table Scan depending on the pushed down operator tree

Presto-Pinot checks for two conditions to decide if it can use the Direct-To-Broker mode. First, it checks if the operator subtree can even be supported by the Pinot broker. The presence of certain unsupported expressions like substr can prevent the connector from using Direct-To-Broker. Second, the operator subtree should either include an aggregation or have a “short” limit. A limit is considered “short” if it is below the config value pinot.non-aggregate-limit-for-broker-queries, which is set to 25K by default. If these two conditions are satisfied, it proceeds with the Direct-To-Broker mode otherwise falls back to Direct-To-Server.

Direct-to-Broker queries

In this mode, Presto-Pinot constructs a Pinot query for the operator subtree that it wants to push down. A broker is chosen and a single Presto split is created with this constructed query and the chosen broker. The Presto worker executing this singleton split queries the Pinot broker’s HTTP endpoint, retrieves the results and converts them into the format expected by Presto.

Direct-to-Server queries

In this mode, Presto-Pinot effectively masquerades as a Pinot broker. But first, it must obtain the Routing Table from the real Pinot broker. Armed with the routing table, it knows which segments and servers to query. Presto plans the query as a multiple split query, where each Presto split is responsible for querying no more than a single Pinot server. By default, the connector creates a single split for each segment on the server (configurable via the pinot.num-segments-per-split config). All Presto workers execute the splits in parallel. A Presto worker executing the split, will query the specified Pinot server for the specified segments using the Pinot-internal API -- the very same API used between the Pinot broker and the Pinot server. In this mode, the filters are pushed down into the Pinot server-specific query but aggregations are not pushed down.

It’s important to note that once the Presto workers have executed the splits, whether in Direct-to-Broker or in Direct-To-Server modes, the rest of the query processing proceeds normally in Presto as dictated by the rest of the operator tree that wasn’t able to be pushed into Pinot.

Presto-Pinot querying in action

Having understood the core theory behind how Presto-Pinot queries Pinot, let’s get a better feel for it by trying out a few queries. Explaining a Presto query over a Pinot table will print out the query plan being used, including the detail about whether Pinot is being queried in Direct-To-Broker or Direct-To-Server mode.

Let’s first consider a simple aggregation query over the previously shown baseballStats table. This query simply prints the number of times a player name is repeated in the table.

EXPLAIN SELECT playerName, count(1) FROM baseballStats GROUP BY playerName
- Output[playerName, _col1] => [playername:varchar, count:bigint]
    Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
    playerName := playername
    _col1 := count
  - RemoteStreamingExchange[GATHER] => [playername:varchar, count:bigint]
      Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
    - TableScan[TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, isQueryShort=Optional[true], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=playerName, dataType=varchar, type=REGULAR}, PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}]],
pinotQuery=Optional[GeneratedPinotQuery{query=SELECT count(*) FROM baseballStats GROUP BY playerName TOP 10000, format=PQL, table=baseballStats, expectedColumnIndices=[0, 1], groupByClauses=1, haveFilter=false, isQueryShort=true}]}'
        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
        count := PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}
        playername := PinotColumnHandle{columnName=playerName, dataType=varchar, type=REGULAR}

The bolded parts of the explain output show that the query has been pushed down entirely into Pinot and effectively translated into a broker PQL query: “SELECT count(*) FROM baseballStats GROUP BY playerName TOP 10000”. You can know that it is sent to the broker by the bit "isQueryShort=true.” As you can see, Presto has assumed that the PQL query will return no more than 10000 unique players. If this TOP 10000 is not specified, Pinot will assume we want only the TOP 10 players. This can be changed by changing the connector configuration “pinot.topn-large”, which defaults to 10000.

And now let’s consider something more complex: Let say we want to find the average runs scored by a players team if that player has made any home runs ever. This query is a self join of the baseballStats table with different filters on each side of the join.

EXPLAIN SELECT a.playerName, AVG(b.runs) FROM baseballstats a JOIN baseballstats b ON a.teamid = b.teamid WHERE a.homeRuns > 0 GROUP BY a.playerName
- Output[playername, _col1] => [playername:varchar, avg:double]
      _col1 := avg
   - RemoteStreamingExchange[GATHER] => [playername:varchar, avg:double]
        - Project[projectLocality = LOCAL] => [playername:varchar, avg:double]
            - Aggregate(FINAL)[playername][$hashvalue] => [playername:varchar, $hashvalue:bigint, avg:double]
                    avg := ""presto.default.avg""((avg_36))
                - LocalExchange[HASH][$hashvalue] (playername) => [playername:varchar, avg_36:row(field0 double, field1 bigint), $hashvalue:bigint]
                    - RemoteStreamingExchange[REPARTITION][$hashvalue_37] => [playername:varchar, avg_36:row(field0 double, field1 bigint), $hashvalue_37:bigint]
                        - Aggregate(PARTIAL)[playername][$hashvalue_43] => [playername:varchar, $hashvalue_43:bigint, avg_36:row(field0 double, field1 bigint)]
                                avg_36 := ""presto.default.avg""((expr_27))
                            - Project[projectLocality = LOCAL] => [playername:varchar, expr_27:bigint, $hashvalue_43:bigint]
                                    expr_27 := CAST(numberofgames_3 AS bigint)
                                    $hashvalue_43 := combine_hash(BIGINT 0, COALESCE($operator$hash_code(playername), BIGINT 0))
                                - InnerJoin[(""teamid"" = ""teamid_8"")][$hashvalue_38, $hashvalue_40] => [playername:varchar, numberofgames_3:integer]
                                        Distribution: PARTITIONED
                                    - RemoteStreamingExchange[REPARTITION][$hashvalue_38] => [teamid:varchar, playername:varchar, $hashvalue_38:bigint]
                                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                                        - ScanProject[table = TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, isQueryShort=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=teamID, dataType=varchar, type=REGULAR}, PinotColumnHandle{columnName=playerName, dataType=varchar, type=REGULAR}]], 
pinotQuery=Optional[GeneratedPinotQuery{query=SELECT teamID, playerName FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE__ WHERE (homeRuns > 0)__TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=PQL, table=baseballStats, expectedColumnIndices=[0, 1], groupByClauses=0, haveFilter=true, isQueryShort=false}]}'}, projectLocality = LOCAL] => [teamid:varchar, playername:varchar, $hashvalue_39:bigint]
                                                Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                                                $hashvalue_39 := combine_hash(BIGINT 0, COALESCE($operator$hash_code(teamid), BIGINT 0))
                                                playername := PinotColumnHandle{columnName=playerName, dataType=varchar, type=REGULAR}
                                                teamid := PinotColumnHandle{columnName=teamID, dataType=varchar, type=REGULAR}
                                    - LocalExchange[HASH][$hashvalue_40] (teamid_8) => [numberofgames_3:integer, teamid_8:varchar, $hashvalue_40:bigint]
                                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                                        - RemoteStreamingExchange[REPARTITION][$hashvalue_41] => [numberofgames_3:integer, teamid_8:varchar, $hashvalue_41:bigint]
                                                Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                                            - ScanProject[table = TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, isQueryShort=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=numberOfGames, dataType=integer, type=REGULAR}, PinotColumnHandle{columnName=teamID, dataType=varchar, type=REGULAR}]], 
pinotQuery=Optional[GeneratedPinotQuery{query=SELECT numberOfGames, teamID FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE____TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=PQL, table=baseballStats, expectedColumnIndices=[0, 1], groupByClauses=0, haveFilter=false, isQueryShort=false}]}'}, projectLocality = LOCAL] => [numberofgames_3:integer, teamid_8:varchar, $hashvalue_42:bigint]
                                                    Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                                                    $hashvalue_42 := combine_hash(BIGINT 0, COALESCE($operator$hash_code(teamid_8), BIGINT 0))
                                                    teamid_8 := PinotColumnHandle{columnName=teamID, dataType=varchar, type=REGULAR}
                                                    numberofgames_3 := PinotColumnHandle{columnName=numberOfGames, dataType=integer, type=REGULAR}

This query is planned as a join of two Pinot “Direct to Server” queries, as can be seen by the presence of isQueryShort = false. The left side is a PQL query that has the filter homeRuns > 0 pushed down. As dictated by the query, this filter is absent on the right side. Also note how the actual join and aggregation happens in Presto as usual.

Date/Time handling in Pinot vs. Presto

Presto has a rich set of expressions to work with timestamps, time zones and time intervals. Date and Time is also of central importance in Pinot. So much so, that each Pinot table is required to have at least one designated timestamp column that needs to be specified when creating the table. Since working with time is so common, we will go through several examples to illustrate some common recipes.

Presto can either work with time stored as a numeric value (like seconds since Unix epoch) or as the newly-introduced Pinot time and date/time types.

Time stored as a numeric type

Below we give examples of each using the Meetup sample data present in Pinot’s distribution, in which event_time refers to the event timestamp in milliseconds since epoch

Let’s start with something simple: Just a count of the number of meetup events taking place each day:

EXPLAIN SELECT DATE_TRUNC('day', FROM_UNIXTIME(event_time/1000.0)), count(1) FROM meetupRsvp GROUP BY 1
                Output[_col0, _col1] => [date_trunc:timestamp, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
_col0 := date_trunc
_col1 := count
RemoteStreamingExchange[GATHER] => [date_trunc:timestamp, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
TableScan[TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=meetupRsvp, isQueryShort=Optional[true], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=date_trunc, dataType=timestamp, type=DERIVED}, PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT count(*) FROM meetupRsvp GROUP BY dateTimeConvert(DIV(event_time, 1000.0), '1:SECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '1:DAYS') TOP 10000, format=PQL, table=meetupRsvp, expectedColumnIndices=[0, 1], groupByClauses=1, haveFilter=false, isQueryShort=true}]}'}] => [date_trunc:timestamp, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
count := PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}
date_trunc := PinotColumnHandle{columnName=date_trunc, dataType=timestamp, type=DERIVED}

As you can see, this was translated into the Pinot’s dateTimeConvert UDF query to the broker. Next let’s count the total number of events in the last 24 hours:

EXPLAIN SELECT COUNT(1) FROM meetupRsvp WHERE event_time < CAST(TO_UNIXTIME(current_timestamp - interval '24' hour) * 1000 AS BIGINT)
            
	- Output[_col0] => [count:bigint]
        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
        _col0 := count
    - RemoteStreamingExchange[GATHER] => [count:bigint]
            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
        - TableScan[TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=meetupRsvp, isQueryShort=Optional[true], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT count(*) FROM meetupRsvp WHERE (event_time < 1598897257351), format=PQL, table=meetupRsvp, expectedColumnIndices=[0], groupByClauses=0, haveFilter=true, isQueryShort=true}]}]}] => [count:bigint]
                Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                count := PinotColumnHandle{columnName=count, dataType=bigint, type=DERIVED}

In this case, Presto has planned it as a direct-to-broker query, and it has converted the constant expression “cast(to_unixtime(current_timestamp - interval ’24’ hour) * 1000 as BIGINT)” to a number 1598897257351 before sending the query to Pinot.

Time stored as a Pinot TIME or DATE_TIME type

Pinot can also store time in special TIME and DATE_TIME type fields that have a notion of granularities. The Pinot connector, however, only supports the following two combinations:

  • A Pinot TIME (or DATE_TIME) field with unit “MILLISECONDS”, format “EPOCH” and granularity “1” will map to a Presto TIMESTAMP type, if the config pinot.infer-timestamp-type-in-schema = true. (It’s false by default.)

  • A Pinot TIME (or DATE_TIME) field with unit “DAYS”, format “EPOCH” and granularity “1” maps to a Presto DATE type, if the config pinot.infer-date-type-in-schema = true. (It’s false by default.)

These restrictions are there because Presto stores time internally as milliseconds since epoch and dates as the full number of days since epoch. Loading a table with any other combination (like, for example, a Pinot TIME field with unit MILLISECONDS, format EPOCH and granularity 1000) would fail and that Pinot table would not be queryable via Presto.

Since the resulting Presto types are already in DATE/TIMESTAMP that Presto natively understands, this eliminates the need of from_unixtime/to_unixtime functions above. The queries above can be written more simply (assuming timestamp_column is the name of the column that maps to a Presto TIMESTAMP column) as follows:

  • “Select date_trunc(‘day’, timestamp_column), count(1) from baseballStats group by 1”.

  • “select count(1) from baseballStats where timestamp_column < current_timestamp - interval ‘24’ hour”.

The Presto-Pinto connector does not yet fully support timezones, but that feature is in active development.

Troubleshooting Common Issues

These are some of the most common issues I have seen in my experience with the Presto-Pinot connector at Uber, which not only runs some of the largest Presto/Pinot clusters, but also developed the Presto-Pinot connector. I also share solutions to these issues.

A large class of issues stems from Presto-Pinot incorrectly choosing Direct-To-Server or Direct-To-Broker query modes, when it really should be using the other mode. Lets cover the problems with choosing these two modes incorrectly, in turn.

Direct-To-Broker query mode causing issues

The heuristic to choose Direct-To-Broker can sometimes misfire and cause issues. There are two common symptoms associated with this:

  • The query OOMs out. This typically happens when the aggregation is not small, like when you mistakenly group by on a column that has a high cardinality. This blows up the result set that Presto fetches from Pinot, and requires more memory than what Presto can afford to the singleton Direct-To-Broker split.

  • The query times out after 15 or 30 seconds: The query is too heavy weight for the Pinot broker. It’s either doing some complex aggregation or simply scanning too many segments.

The fix is simple: Force the query to go via the Direct-To-Server mode. The recommended way to fix this is by using the session property pinot.forbid_broker_queries = true on a per query basis. A heavier hammer would be to disable Direct-To-Broker globally by using the catalog config property pinot.forbid-broker-queries = true.

Direct-To-Server query mode causing issues

Recall that Direct-To-Server is usually a fallback if the Direct-to-Broker cannot be chosen. It typically results in the following symptoms:

  • Increased load on Pinot clusters due to Presto: This will manifest as other Pinot queries timing out or getting rejected. This is typically caused by the use of Direct-To-Server querying, which is bypassing the rate limits and query processing smarts (like segment pruning) put in place by the Pinot broker.

  • Presto-Pinot queries take a long time to finish: A Presto-Pinot query creates a split per segment per server. This is usually beneficial in that it increases the parallelism and the query finishes faster. But sometimes this can create a lot of splits and those splits are stuck waiting for resources in the Presto’s queue.

One approach to fix these issues is to make Direct-To-Server a bit easier on Pinot by increasing the number of segments per split. This can be done by increasing the session property pinot.num_segments_per_split on a per query basis. Setting pinot.num_segments_per_split to 0 is equivalent to setting it to infinity: It creates just a single split for all the segments on that server.

If the above does not help, then we have to get our hands dirty and try to rework the query such that Direct-To-Broker can be chosen. There are two common remedies for this:

  • Consider removing any expressions that are preventing Direct-To-Broker. Perhaps you are using a UDF that is not supported by the Pinot Broker. This may be fixed by a simple local rework of the query: For example, consider modifying something like “SELECT COUNT(lower(playerName)) FROM baseballStats” to just “SELECT COUNT(playerName) FROM baseballStats”, to allow the query to be planned as a broker query. The two forms would return the same result because Pinot does not have the concept of NULLs, and thus lower(playerName) will always be non NULL, and thus can be optimized away from inside of COUNT.

  • Introduce an artificial NO-OP LIMIT: Recall that Presto-Pinot defines a short non-aggregate query as a query having a limit of under 25K. This threshold is configurable by the configuration property pinot.non-aggregate-limit-for-broker-queries (or alternatively the session property pinot.non_aggregate_limit_for_broker_queries). Queries without a limit are treated as “large” and are thus never planned as a direct-to-broker. Introduce a fake limit under this threshold to force direct-to-broker: e.g, change “SELECT playerName from baseballStats where playerState = ‘CA’” to “SELECT playerName from baseballStats where playerState = ‘CA’ LIMIT 24000” if you know that the number of players actually in California is less than 24K, thus making the limit be a No-Op.

If Direct-To-Server query mode causes many problems for you, can you also consider forbidding it outright with the config pinot.forbid-segment-queries = true in the catalog property file. With this configuration, queries will either be planned as Direct-To-Broker or fail right away.

Note

“Open Source Community”

Like Presto, Pinot is an open source project and welcomes your involvement. Both Pinot itself and the Presto-Pinot connector are being actively developed and the community would love your feedback and questions in order to improve them. There is a dedicated troubleshooting channel on the Pinot slack channel to help you integrate it with Presto. For more in-depth information on Pinot, you can head over to pinot.apache.org or check out their SIGMOD 2018 paper Pinot: Realtime OLAP for 530 Million Users 

Summary

This chapter introduced you to extending Presto for real-time business insights using real-time datastores. We walked through an implementation with Presto and Pinot via the Presto-Pinot connector.

1 This is an example of a Pinot SQL query. Pinot also supports an older Pinot-Query-Language PQL query which has a few differences, for example it would have used the keyword “TOP” instead of the ORDER-BY-DESC-LIMIT.

2 The author speaks from first hand experience having tried to integrate Pinot with Superset directly and then realizing that it is better to simply leverage Superset’s excellent integration with Presto.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
34.234.83.135