Chapter 12. Trino in Production

After learning about and installing Trino, first as a simple exploratory setup in Chapter 2 and then as a deployment in Chapter 5, you now get to dive into further details. After all, simply installing and configuring a cluster is a very different task from keeping it up and running day and night, with different users and changing data sources and entirely separate usage.

In this chapter you are therefore getting to explore other aspects you need to learn about in order to be a successful operator of your Trino clusters.

Monitoring with the Trino Web UI

As discussed in “Trino Web UI”, the Trino Web UI is accessible on every Trino cluster coordinator and can be used to inspect and monitor the Trino cluster and processed queries. The detailed information provided can be used to better understand and tune the Trino system overall as well as individual queries.

Note

The Trino Web UI exposes information from the Trino system tables, discussed in “Trino System Tables”.

When you first navigate to the Trino Web UI address, you see the main dashboard shown in Figure 12-1. It displays Trino cluster information in the top section and a list of queries in the bottom section.

Trino WebUI main dashboard
Figure 12-1. Trino Web UI main dashboard

Cluster-Level Details

Let’s first discuss the Trino cluster information:

Running Queries

The total number of queries currently running in the Trino cluster. This accounts for all users. For example, if Alice is running two queries and Bob is running five queries, the total number in this box shows seven queries.

Queued Queries

The total number of queued queries for the Trino cluster for all users. Queued queries are waiting for the coordinator to schedule them, based on the resource group configuration.

Blocked Queries

The number of blocked queries in the cluster. A blocked query is unable to be processed due to missing available splits or other resources. You learn more about query states in the next sections.

Active Workers

The number of active worker nodes in the cluster. Any worker nodes added or removed, manually or by auto scaling, are registered in the discovery service, and the displayed number is updated accordingly.

Runnable Drivers

The average number of runnable drivers in the cluster, as described in Chapter 4.

Reserved Memory

The total amount of reserved memory in bytes in Trino.

Rows/Sec

The total number of rows processed per second across all queries running in the cluster.

Bytes/Sec

The total number of bytes processed per second across all queries running in the cluster.

Worker Parallelism

The total amount of worker parallelism, which is the total amount of thread CPU time across all workers, across all queries running in the cluster.

Query List

The bottom section of the Trino Web UI dashboard lists the recently run queries. An example screenshot is displayed in Figure 12-2. The number of available queries in this history list depends on the Trino cluster configuration.

List of queries in the Trino Web UI
Figure 12-2. List of queries in the Trino Web UI

Above the list itself, controls are available to select the criteria that determine which queries are listed. This allows you to locate specific queries even when a cluster is very busy and runs dozens or hundreds of queries.

The input field allows you to type criteria text to use in order to search for a specific query. The criteria include the username of the query initiator, the query source, the query ID, a resource group, or even the SQL text of the query and the query state.

The State filter beside the text input field allows you to include or exclude queries based on the state of the query—running, queued, finished, and failed queries. Failed queries can be further detailed to include or exclude for specific failure reasons—internal, external, resource, and user errors.

Controls on the left allow you to determine the sort order of the displayed queries, the timing of the reordering when the data changed, and the maximum number of queries to show.

Each row underneath the query criteria represents a single query. The left column in the row displays information about the query. The right column displays the SQL text and the state of the query. An example of the query summary is available in Figure 12-3.

Information for a specific query in the Trino Web UI
Figure 12-3. Information for a specific query in the Trino Web UI

Let’s take a closer look at the query details. Each query has the same information for each query run. The text at the top left is the query ID. In this example, the value is 20190803_224130_00010_iukvw. Looking closer, you may notice that the date and time (UTC) make up the beginning of the ID using the format YYYYMMDD_HHMMSS. The latter half is an incremental counter for the query. Counter value 00010 simply means it was the 10th query run since the coordinator started. The final piece, iukvw, is a random identifier for the coordinator. Both this random identifier and the counter value are reset if the coordinator is restarted. The time on the top right is the local time when the query was run.

The next three values in the example—ec2-user, trino-cli, and global—represent the end user running the query, the source of the query, and the resource group used to run the query. In this example, the user is the default ec2-user, and we were using the trino-cli to submit the query. If you specify the --user flag when running the Trino CLI, the value changes to what you specified. The source may also be something other than trino-cli; for example, it may display trino-jdbc when an application connects to Trino with the JDBC driver. The client can also set it to any desired value with the --source flag for the Trino CLI, or JDBC connection string property.

The grid of values below is not well labeled in the Trino Web UI, but it contains some important information about the query, as explained in Table 12-1.

Table 12-1. Grid of values for a specific query

Completed Splits: The number of completed splits for the query. The example shows 25 completed splits. At the beginning of query execution, this value is 0. It increases during query execution as splits complete.

Running Splits: The number of running splits for the query. When the query is completed, this value is always 0. However, during execution, this number changes as splits run and complete.

Queued Splits: The number of queued splits for the query. When the query is completed, this value is always 0. However, during execution, this number changes as splits move between queued and run states.

Wall Time: The total wall time spent executing the query. This value continues to grow even if you’re paging results.

Total Wall Time: This value is the same as the wall time except that it includes queued time as well. The wall time excludes any time for which the query is queued. This is the total time you’d observe, from when you submit to query to when you finish receiving results.

CPU Time: The total CPU time spent processing the query. This value is often larger than the wall time because parallel execution between workers and threads on workers all count separately and add up. For example, if four CPUs spend 1 second to process a query, the resulting total CPU time is 4 seconds.

Current Total Reserved Memory: The current total reserved memory used for the time of query execution. For completed queries, this value is 0.

Peak Total Memory: The peak total memory usage during the query execution. Certain operations during the query execution may require a lot of memory, and it is useful to know what the peak was.

Cumulative User Memory: The cumulative user memory used throughout the query processing. This does not mean all the memory was used at the same time. It’s the cumulative amount of memory.

Tip

Many of the icons and values in the Trino Web UI have pop-up tooltips that are visible if you hover your mouse cursor over the image. This is helpful if you are unsure of what a particular value represents.

Next you need to learn more about the different states of query processing, displayed above the query text itself. The most common states are RUNNING, FINISHED, USER CANCELLED, or USER ERROR. The states RUNNING and FINISHED are self-explanatory and exactly what they say. USER CANCELLED means that the query was killed by the user. USER ERROR, on the other hand, signifies that the SQL query statement submitted by the user contained a syntactic or semantic error.

The BLOCKED state occurs when a query that is running becomes blocked while waiting on something such as resources or additional splits to process. Seeing a query go to and from this state is normal. However, if a query gets stuck in this state, there are many potential reasons, and this may indicate a problem with the query or the Trino cluster. If you find a query that appears to be stuck in this state, first check the memory use and configuration of the system. It may be that this query requires an unusually high amount of memory or is computationally expensive. Additionally, if the client is not retrieving the results or cannot read the results fast enough, this back pressure can put the query into a BLOCKED state.

The QUEUED state occurs when a query is started, or stopped from processing, and put into a waiting stage as part of the rules defined for resource groups. The query is simply waiting to be executed.

You may also see a query in the PLANNING state. This typically occurs for larger, complex queries that require a lot of planning and optimizations for running the query. If you see this often, and planning seems to take a noticeable amount of time for queries, you should investigate possible reasons, such as insufficient memory availability or processing power of the coordinator.

Query Details View

So far you have seen information about the Trino cluster overall and higher-level information about the queries. The Web UI offers even more details about each query. Simply click the name of the specific query, as shown in Figure 12-3, to access the Query Details view.

The Query Details view contains a lot of information about a specific query. Let’s explore enough for you to feel comfortable using it.

Note

The Query Details view is often used by Trino developers and users with in-depth knowledge of Trino. This level of sophistication requires you to be very familiar with the Trino code and internals. Checking out this view may still be useful for normal users. Over time, you learn more and acquire more expertise.

The Query Details view uses several tabs for viewing more detailed information about the Trino query. Apart from the tabs, the query ID and the state are always visible. You can see an example header of the view with the tabs in Figure 12-4.

Figure 12-4. Query Details header and tabs

Overview

The Overview page includes the information visible in the Query Details section of the query list and much more detail in numerous sections:

  • Session

  • Execution

  • Resource Utilizations Summary

  • Timeline

  • Query

  • Prepare Query

  • Stages

  • Tasks

The Stages section, shown in Figure 12-5, displays information on the query stages.

Figure 12-5. Stages section in the Overview tab of the Query Details page

This particular query was the SELECT count(*) FROM lineitem query. Because it is a simpler query, it consists of only two stages. Stage 0 is the single-task stage that runs on the coordinator and is responsible for combining the results from the tasks in stage 1 and performing the final aggregation. Stage 1 is a distributed stage that runs tasks on each of the workers. This stage is responsible for reading the data and computing the partial aggregation.

The following list explains the numerical values from the Stages section, available for each stage:

TIME—SCHEDULED

The amount of time the stage remained scheduled before all tasks for the stage were completed.

TIME—BLOCKED

The amount of time the stage was blocked while waiting for data.

TIME—CPU

The total amount of CPU time of the tasks in the stage.

MEMORY–CUMULATIVE

The cumulative memory used throughout the stage. This does not mean all the memory was used at the same time. It is the cumulative amount of memory used over the time of processing.

MEMORY—CURRENT

The current total reserved memory used for the stage. For completed queries, this value is 0.

MEMORY—BUFFERS

The current amout of memory consumed by data, waiting for processing.

MEMORY—PEAK

The peak total memory during the stage. Certain operations during the query execution may require a lot of memory, and it is useful to know what the peak was.

TASKS—PENDING

The number of pending tasks for the stage. When the query is completed, this value is always 0.

TASKS—RUNNING

The number of running tasks for the stage. When the query is completed, this value is always 0. During execution, the value changes as tasks run and complete.

TASKS—BLOCKED

The number of blocked tasks for the stage. When the query is completed, this value is always 0. However, during execution this number will change as tasks move between blocked and running states.

TASKS—TOTAL

The number of completed tasks for the query.

SCHEDULED TIME SKEW, CPU TIME SKEW, TASK SCHEDULED TIME, and TASK CPU TIME

These histogram charts show the distribution and changes of scheduled time, CPU time, task scheduled time, and task CPU time for multiple tasks across workers. This allows you to diagnose utilization of the workers during the execution of a longer-running, distributed query.

The section below the Stages section describes more details of the tasks, displayed in Figure 12-6.

Figure 12-6. Tasks information in the Query Details page

Let’s examine the values in the tasks list; take a look at Table 12-2.

Table 12-2. Description of the columns in the tasks list in Figure 12-6
Column Description

ID

The task identifier in the format stage-id.task-id. For example, ID 0.0 indicates Task 0 of Stage 0, and 1.2 indicates Task 2 of Stage 1.

Host

The IP address of the worker node where the task is run.

State

The state of the task, which can be PENDING, RUNNING, or BLOCKED.

Pending Splits

The number of pending splits for the task. This value changes as the task is running and shows 0 when the task is finished.

Running Splits

The number of running splits for the task. This value changes as the task is running and shows 0 when the task is finished.

Blocked Splits

The number of blocked splits for the task. The value changes as the task is running and shows 0 when the task is finished.

Completed Splits

The number of completed splits for the task. The value changes as the task is running and equals the total number of splits run when the task is finished.

Rows

The number of rows processed in the task. This value increases as the task runs.

Rows/s

The number of rows processed per second in the task.

Bytes

The number of bytes processed in the task. This value increases as the task runs.

Bytes/s

The number of bytes processed per second in the task.

Elapsed

The total amount of elapsed wall time for the task scheduled.

CPU Time

The total amount of CPU time for the task scheduled.

Buffered

Current amount of buffered data, waiting for processing.

If you examine some of these values carefully, you’ll notice how they roll up. For example, the total CPU time for all tasks in a stage equals the CPU time listed in the stage for which they belong. The total CPU time for the stages equals the amount of CPU time listed on the query CPU time.

Live Plan

The Live Plan tab allows you to view query processing performed by the Trino cluster, in real time, while it is executing. You can see an example in Figure 12-7.

Figure 12-7. Live plan example for the count(*) query on lineitem

During query execution, the counters in the plan are updated while the query execution progresses. The values in the plan are the same as described for the Overview tab, but they are overlaid in real time on the query execution plan. Looking at this view is useful to visualize where a query is stuck or spending a lot of time, in order to diagnose or improve a performance issue.

Stage Performance

The Stage Performance view provides a detailed visualization of the stage performance after query processing is finished. An example is displayed in Figure 12-8.

The view can be thought of as a drill-down from the Live Plan view, where you can see the operator pipeline of the task within the stage. The values in the plan are the same as described for the Overview tab. Looking at this view is useful to see where a query is stuck or spending a lot of time, in order to diagnose or fix a performance issue. You can click on each individual operator to access detailed information.

Figure 12-8. Trino Web UI view for stage performance of the count(*) lineitem query

Splits

The Splits view shows a timeline for the creation and processing of splits during the query execution.

JSON

The JSON tab provides all query detail information in JSON format. The information is updated based on the snapshot for which it is retrieved.

Note

Parts of the Web UI are helpful for copying lengthy strings to the system clipboard. Keep a look out for the clipboard icon. By clicking it, the associated string is copied to the system clipboard for you to paste somewhere else.

Tuning Trino SQL Queries

In “Query Planning”, you learned about the cost-based optimizer in Trino. Recall that SQL is a declarative language in which the user writes a SQL query to specify the data they want. This is unlike an imperative program. With SQL, the user does not specify how to process the data to get the result. It is left to the query planner and optimizer to determine the sequence of steps to process the data for the desired result. The sequence of steps is referred to as the query plan.

In most cases, the end user submitting the SQL queries can rely on Trino to plan, optimize, and execute a SQL query efficiently to get the results fast. As an end user, you should not have to worry about the details.

However, sometimes you are not getting the performance you expect, so you need to be able to tune Trino queries. You need to identify whether a specific execution is an outlier single query that is not performing well, or whether multiple queries of similar properties are not performing well.

Let’s start with tuning an individual query, assuming the rest of the queries you run are fine on the system. When examining a poorly performing query, you should first look to see if the tables that the query references have data statistics. At the time of this writing, the only tables that provide data statistics to Trino are those used with the Hive connector. It is expected that additional connectors will start to provide data statistics.

trino:ontime> SHOW STATS FOR flights;

Joins in SQL are one of the most expensive operations. You need to focus on joins when tuning the performance of your query, and determine the join order by running an EXPLAIN on the query:

trino:ontime> EXPLAIN
               SELECT f.uniquecarrier, c.description, count(*) AS ct
               FROM postgresql.airline.carrier c,
                    hive.ontime.flights_orc f
               WHERE c.code = f.uniquecarrier
               GROUP BY f.uniquecarrier, c.description
               ORDER BY count(*) DESC
               LIMIT 10;

As a general rule, you want the smaller input to a join to be on the build side. This is the input to the hash join for which a hash table is built. Because the build side requires reading in the entire input and building a hash table in memory, you want this to be the smaller of the inputs. Being able to determine whether Trino got the join order correct requires some domain knowledge of the data to further investigate. For example, if you know nothing about the data, you may have to run some experimental queries to obtain additional information.

If you have determined that the join order is nonoptimal, you can override the join reordering strategy by setting a toggle to use the syntactic order of the tables listed in the SQL query. This can be configured in the config.properties file as the property optimizer.join-reordering-strategy. However, if you want to override a single query, you often want to just see the session property join_reordering_strategy (see “Session Information and Configuration”). The allowed values for this property are AUTOMATIC, ELIMINATE_CROSS_JOINS and NONE. Setting the value to ELIMINATE_CROSS_JOINS or NONE performs an override of the cost-based optimizer. ELIMINATE_CROSS_JOINS is a good compromise since it reorders joins only to eliminate cross joins, which is good practice, and otherwise stays with the lexical order suggested by the query author:

...
FROM postgresql.airline.carrier c,
hive.ontime.flights_orc f
...
...
FROM hive.ontime.flights_orc f,
postgresql.airline.carrier c
...

Besides join optimizations, Trino includes some heuristic-based optimizations. These optimizers are not costed and do not always result in best results. Optimizations can take advantage of the fact that Trino is a distributed query engine; aggregations are performed in parallel. This means that an aggregation can be split into multiple smaller parts that can then be distributed to multiple workers, run in parallel, and be aggregated in a final step from the partial results.

A common optimization in Trino and other SQL engines is to push partial aggregations before the join to reduce the amount of data going into the join. Using it can be configured with the push_aggregation_through_join property. Because the aggregation produces partial results, a final aggregation is still performed after the join. The benefit of using this optimization depends on the actual data, and the optimization can even result in a slower query. For example, if the join is highly selective, then it may be more performant to run the aggregation after the join completes. To experiment, you can simply turn this optimization off by setting the property to false for the current session.

Another common heuristic is to compute a partial aggregation before the final aggregation:

trino:ontime> EXPLAIN SELECT count(*) FROM flights_orc;
                             Query Plan
---------------------------------------------------------------------
 - Output[_col0]
         Layout: [count:bigint]
         _col0 := count
     - Aggregate(FINAL)
             Layout: [count:bigint]
             count := "count"("count_3")
         - LocalExchange[SINGLE] ()
                 Layout: [count_3:bigint]
             - RemoteExchange[GATHER]
                     Layout: [count_3:bigint]
                 - Aggregate(PARTIAL)
                         Layout: [count_3:bigint]
                         count_3 := "count"(*)
                     - TableScan[hive:ontime:flights_orc]
                             Layout: []
(1 row)

When this is a generally a good heuristic, the amount of memory to keep for the hash table can be tuned. For example, if the table has a lot of rows with few distinct values in the grouping keys, this optimization works well and reduces the amount of data early before being distributed over the network. However, if there are a higher number of distinct values, the size of the hash tables needs to be larger in order to reduce the amount of data. By default, the memory used for the hash table is 16 MB, but this can be adjusted by setting task.max-partial-aggregation-memory in the config.properties file. However, with too high a count of distinct group keys, the aggregation does nothing to reduce the network transfer, and it may even slow down the query.

Memory Management

Getting the memory configuration and management for your Trino cluster right is not an easy task. Many constantly changing factors influence the memory needs:

  • Number of workers

  • Memory of coordinator and worker

  • Number and type of data sources

  • Characteristics of running queries

  • Number of users

For Trino, a multiuser system using a cluster of workers, resource management is a fairly challenging problem to solve. Ultimately, you have to pick a starting point and then monitor the system and adjust to the current and upcoming needs. Let’s look at some details and talk about recommendations and guidelines around memory management and monitoring in Trino.

Note

All memory management discussed applies to the JVM running the Trino server. The values are allocations within the JVMs on the workers, and the JVM configuration needs to consider the size of these values to allow allocation.

Depending on the number of concurrent queries, the JVM needs to be adjusted to a much larger value. The next example provides some insight.

All of the preceding factors combine into what we call the workload. Tuning the cluster’s memory relies heavily on the workload being run.

For example, most query shapes contain multiple joins, aggregations, and window functions. If the query size of the workload is small, you can set lower memory limits per query and increase the concurrency—and the other way around for larger query sizes. For context, query size is a product of query shape and amount of input data. Trino provides a way to manage memory utilization across the cluster by setting certain properties at the time of deployment in config.properties:

  • query.max-memory-per-node

  • query.max-total-memory-per-node

  • query.max-memory

  • query.max-total-memory

Memory management in Trino is separated into two kinds of memory allocations:

User memory

User queries such as aggregations and sorting control the user memory allocation.

System memory

System memory allocation is based on the execution implementation by the query engine itself and includes read, write, and shuffle on buffers, table scans, and other operations.

With this separation in mind, you can examine the memory properties some more:

query.max-memory-per-node

The maximum user memory a query can utilize on a specific worker for processing aggregations, input data allocation, etc.

query.max-total-memory-per-node

The maximum allowed total of user and system memory, required to be larger than query.max-memory-per-node. When the memory consumed by a query in user and system allocations exceeds this limit, it is killed.

query.max-memory

The maximum user memory a query can utilize across all workers in the cluster.

query.max-total-memory

The maximum utilization of memory by a query for user and system allocations for the entire cluster, as a result necessarily greater than query.max-memory.

If a query ends up exceeding these limits and as a result is killed, error codes expose the reason:

  • EXCEEDED_LOCAL_MEMORY_LIMIT means that query.max-memory-per-node or query.max-total-memory-per-node was exceeded.

  • EXCEEDED_GLOBAL_MEMORY_LIMIT means that query.max-memory or query.max-total-memory was exceeded.

Let’s look at a real-world example for a small cluster of one coordinator and ten workers and their characteristics:

  • One coordinator

  • Ten workers; typically workers are all identical system specifications

  • Physical memory per worker: 50 GB

  • Max JVM heap size configured in -Xmx in jvm.config to 38 GB

  • query.max-memory-per-node: 13 GB

  • query.max-total-memory-per-node: 16 GB

  • memory.heap-headroom-per-node: 9 GB

  • query.max-memory: 50 GB

  • query.max-total-memory: 60 GB

Let’s break these numbers down a bit more.

The total available memory on each worker is ~50 GB and we leave ~12 GB for the operating system, agents/daemons, and components running outside the JVM on the system. These systems include monitoring and other systems that allow you to manage the machine and keep it functioning well. As a result, we determine to set the JVM max heap size to 38 GB.

When query size and shape is small, concurrency can be set higher. In the preceding example, we are assuming query size and shape to be medium to large and are also accounting for data skew. query.max-memory is set to 50 GB, which is at the overall cluster level. While looking at max-memory, we also consider initial-hash-partitions; this should ideally be a number less than or equal to the number of workers.

If we set that to 8 with max-memory 50 GB, we get 50/8, so about 6.25 GB per worker. Looking at the local limit max-memory-per-node set to 13 GB, we keep some headroom for data skew by allowing two times the memory consumption per node. These numbers vary significantly based on how the data is organized and what type of queries are typically run—basically, the workload for the cluster. In addition, the infrastructure used for the cluster, such as the available machine sizes and numbers, has a big impact on the selection of the ideal configuration.

A configuration can be set to help avoid a deadlock situation: query.low-memory-killer.policy. This can be set to total-reservation or total-reservation-on-blocked-nodes. When set to total-reservation, Trino kills the largest running query on the cluster to free up resources. On the other hand, total-reservation-on-blocked-nodes kills the query that is utilizing the most memory on the nodes that are blocked.

As you can see from the example, you just end up making some assumptions to get started. And then you adjust based on what happens with your actual workloads.

For example, running a cluster for interactive ad hoc queries from users with a visualization tool can create many small queries. An increase of users then ends up increasing the number of queries and the concurrency of them. This typically does not require a change of the memory configuration, but just a scaling up of the number of workers in the cluster.

On the other hand, if that same cluster gets a new data source added that exposes massive amounts of data for complex queries that most likely blow the limits, you have to adjust memory.

This gets us to another point that is worth mentioning. Following the best practice recommendation, in a typical Trino cluster infrastructure setup, all workers are the same. All use the same virtual machine (VM) image or container size with identical hardware specifications. As a result, changing the memory on these workers typically means that the new value is either too large for the physical available memory, or too small to make good use of the overall system. Adjusting the memory therefore creates the need to replace all worker nodes in the cluster. Depending on your cluster infrastructure using, for example, virtual machines in a private cloud or containers in a Kubernetes cluster from a public cloud provider, this process can be more or less laborious and fast to implement.

This leads us to one last point worth mentioning here. Your assessment of the workloads can reveal that they are widely different: lots of queries are small, fast, ad hoc queries with little memory footprint, and others are massive, long-running processes with a bunch of analysis in there, maybe even using very different data sources. These workload differences indicate very different memory configuration, and even very different worker configuration and maybe even monitoring needs. In this scenario, you really should take the next step and separate the workloads by using different Trino clusters.

Task Concurrency

To improve performance for your Trino cluster, certain task-related properties may need to be adjusted from the default settings. In this section, we discuss two common properties you may have to tune. However, you can find several others in the Trino documentation. All these properties are set in the config.properties file:

Task worker threads

The default value is set to the number of CPUs of the machine multiplied by 2. For example, a dual hex core machine uses 2 Ă— 6 Ă— 2, so 24 worker threads. If you observe that all threads are being used and the CPU utilization in the machine is still low, you can try to improve CPU utilization and thus performance by increasing this number via the task.max-worker-threads setting. The recommendation is to slowly increment this number, as setting it too high can have a diminishing return or adverse effects due to increased memory usage and additional context switching.

Task operator concurrency

Operators such as joins and aggregations are processed in parallel by partitioning the data locally and executing the operators in parallel. For example, the data is locally partitioned on the GROUP BY columns, and then multiple aggregation operators are performed in parallel. The default concurrency for these parallel operations is 16. This value can be adjusted by setting the task.concurrency property. If you are running many concurrent queries, the default value may result in reduced performance due to the overhead of context switching. For clusters that run only a smaller number of concurrent queries, a higher value can help increase parallelism and therefore performance.

Worker Scheduling

To improve performance of your Trino cluster, certain scheduling related properties may need to be adjusted from the default settings. You can tune three common configurations:

  • Splits per task

  • Splits per node

  • Local scheduling

Several others are explained in the Trino documentation.

Scheduling Splits per Task and per Node

Each worker node processes a maximum number of splits. By default, the maximum number of splits processed per worker node is 100. This value can be adjusted with the node-scheduler.max-splits-per-node property. You may want to adjust this if you’re finding that the workers have maxed out this value and are still underutilized. Increasing the value improves performance, particularly when a lot of splits exist. In addition, you can consider increasing the node-scheduler.max-pending-splits-per-task property. This value should not exceed node-scheduler.max-splits-per-node. It ensures that work is queued up when the workers finish the tasks in process.

Local Scheduling

When scheduling splits on worker nodes, the Trino scheduler uses two methods. The method to use depends on the deployment of Trino. For example, if workers run on the same machines as distributed storage, it is optimal to schedule splits on the worker nodes where the data is located. Scheduling splits where the data is not located requires the data to be transferred over the network to the worker node for processing. Therefore, you see an increase in performance when scheduling splits on the same node where the data is stored.

The default method legacy does not account for the data location when scheduling splits. The improved flat method does account for the data location when scheduling splits, and it can be used by setting node-scheduler.network-topology. The Trino scheduler uses 50% of the queue for scheduling local splits.

The common use case for using the flat method occurs when Trino is installed and collocated on HDFS data nodes or when using RubiX for caching (see “Performance Improvements with RubiX”). RubiX caches data from the distributed storage on the worker nodes. Therefore, scheduling local splits is advantageous.

Network Data Exchange

Another important factor affecting the performance of your Trino cluster is the network configuration and setup within the cluster and the closeness to the data sources. Trino supports network-specific properties that can be adjusted from the defaults to adopt to your specific scenario.

In addition to improving performance, sometimes other network-related issues require tuning in order for queries to perform well. Let’s discuss some of the common properties you may have to tune.

Concurrency

Trino exchange clients make requests to the upstream tasks producing data. The default number of threads used by the exchange clients is 25. This value can be adjusted by setting the property exchange.client-threads.

Setting a larger number of threads can improve performance for larger clusters or clusters configured for high concurrency. The reason is that when more data is produced, additional concurrency to consume the data can decrease latency. Keep in mind that these additional threads increase the amount of memory needed.

Buffer Sizes

Data sent and received during the exchange is kept in buffers on the target and source sides of the exchange. The buffer sizes for each side can be adjusted independently.

On the source side, data is written by the tasks to a buffer waiting to be requested from the downstream exchange client. The default buffer size is 32 MB. It can be adjusted by setting the sink.max-buffer-size property. Increasing the value may increase throughput for a larger cluster.

On the target side, data is written to a buffer before it is processed by the downstream task. The default buffer size is 32 MB. It can be adjusted by setting the property exchange.max-buffer-size. Setting a higher value can improve query performance by allowing retrieval of more data over the network before back pressure is applied. This is especially true for larger clusters.

Tuning Java Virtual Machine

In Chapter 2, you used the configuration file etc/jvm.config, which contains command-line options for the JVM. The Trino launcher uses this file when starting the JVM running Trino. Compared to the earlier mentioned configuration, a more suitable configuration for production usage uses higher memory values:

-server
-XX:+UseG1GC
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+HeapDumpOnOutOfMemoryError
-XX:-UseBiasedLocking
-Djdk.attach.allowAttachSelf=true
-Xms64G
-Xmx64G
-XX:G1HeapRegionSize=32M
-XX:ReservedCodeCacheSize=512M
-Djdk.nio.maxCachedBufferSize=2000000

Typically you need to increase the maximum memory allocation pool for the JVM with the Xmx value, in this case, upped to 64 GB. The Xms parameter sets the initial, minimal memory allocation. The general recommendation is to set both Xmx and Xms to the same value.

In the preceding configuration example memory allocation is set to 16 GB. The actual value you use depends on the machines used by your cluster. The general recommendation is to set both the Xmx and Xms to 80% of the total memory of the system. This allows for plenty of headroom for other system processes running. Further details about the memory management of Trino and related configuration can be found in “Memory Management”.

For large Trino deployments, memory allocations of 200 GB and beyond are not uncommon.

While small processes such as monitoring can run on the same machine as Trino, it’s highly discouraged to share the system with other resource-intensive software. For example, Apache Spark and Trino should not be run on the same set of hardware.

If you suspect Java garbage collection (GC) issues, you can set additional parameters to help you with debugging:

-XX:+PrintGCApplicationConcurrentTime
-XX:+PrintGCApplicationStoppedTime
-XX:+PrintGCCause
-XX:+PrintGCDateStamps
-XX:+PrintGCTimeStamps
-XX:+PrintGCDetails
-XX:+PrintReferenceGC
-XX:+PrintClassHistogramAfterFullGC
-XX:+PrintClassHistogramBeforeFullGC
-XX:PrintFLSStatistics=2
-XX:+PrintAdaptiveSizePolicy
-XX:+PrintSafepointStatistics
-XX:PrintSafepointStatisticsCount=1

These options can be helpful when troubleshooting a full GC pause. In combination with the advancements of JVM and the engineering of the Trino query engine, a GC pause should be a very rare event. If it does happen, it should be investigated. First steps to fix these issues are often an upgrade of the JVM version and the Trino version used, since both receive performance improvements regularly.

Warning

JVM and garbage collection algorithms and configuration are complex topics. Documentation on GC tuning is available from Oracle and other JVM vendors. We strongly recommend adjusting these settings in small changes in test environments, before attempting to roll them out to production systems. Also keep in mind that Trino currently requires Java 11. Older or newer JVM versions, as well as JVM versions from different vendors, can have different behavior.

Resource Groups

Resource groups are a powerful concept in Trino used to limit resource utilization on the system. The resource group configuration consists of two main pieces: the resource group properties and the selector rules.

A resource group is a named collection of properties that define available cluster resources. You can think of a resource group as a separate section in the cluster that is isolated from other resource groups. The group is defined by CPU and memory limits, concurrency limits, queuing priorities, and priority weights for selecting queued queries to run.

The selector rules, on the other hand, allow Trino to assign an incoming query request to a specific resource group.

The default resource group manager uses a file-based configuration and needs to be configured in etc/resource-groups.properties:

resource-groups.configuration-manager=file
resource-groups.config-file=etc/resource-groups.json

As you can see, the actual configuration uses a JSON file. The content of the file defines the resource groups as well as the selector rules. Note that the JSON file can be any path that is accessible to Trino and that resource groups need to be configured only on the coordinator:

{
  "rootGroups": [],
  "selectors": [],
  "cpuQuotaPeriod": ""
}

cpuQuotaPeriod is optional.

Let’s look at the definition of two resource groups to get started:

  "rootGroups": [
    {
      "name": "ddl",
      "maxQueued": 100,
      "hardConcurrencyLimit": 10,
      "softMemoryLimit": "10%",
    },
    {
      "name": "ad-hoc",
      "maxQueued": 50,
      "hardConcurrencyLimit": 1,
      "softMemoryLimit": "100%",
          }
  ]

The example defines two resource groups named ddl and ad-hoc. Each group has a set maximum number of concurrent queries and total amount of distributed memory limits. For the given group, if the limits are met for concurrency or memory limits, then any new query is placed in the queue. Once the total memory usage goes down or a query completes, the resource group chooses a query from the queue to schedule to run. Each group also has a maximum number of queries to queue. If this limit is reached, any new queries are rejected and the client receives an error indicating that.

In our example, the ad hoc group is designed for all queries that are not DDL queries. This group allows only one query to run concurrently, with up to 50 queries to be queued. The group has a memory limit of up to 100%, meaning it could use all the available memory to run it.

DDL queries have their own group, with the idea that these types of queries are relatively short and lightweight and should not be starved by longer-running ad hoc SQL queries. In this group, you specify that there should be no more than 10 DDL queries running concurrently, and the total amount of distributed memory used by all queries running should be no more than 10% of the Trino clusters memory. This allows a DDL query to be executed without having to wait in the ad hoc query line.

Now that the two groups are defined, you need to define the selector rules. When a new query arrives in the coordinator, it is assigned to a particular group. Let’s take a look at the example:

"selectors": [
    {
      "queryType": "DATA_DEFINITION",
      "group": "ddl"

    },
    {
      "group": "ad-hoc"
    }
  ]

The first selector matches any query type of DATA_DEFINITION and assigns it to the ddl resource group. The second selector matches all other queries and places those queries in the ad-hoc resource group.

The order of the selectors is important because they are processed sequentially, and the first match assigns the query to the group. And in order to match, all properties specified must match. For example, if we switch the order of the selectors, then all queries including DDL are to be assigned to the ad-hoc resource group. No queries are ever assigned to the ddl resource group.

Resource Group Definition

Let’s take a closer look at the following configuration properties for resource groups:

name

The required name of the resource group, referenced by the selector rule for assignment.

maxQueued

The required maximum number of queries queued for the group.

hardConcurrencyLimit

The required maximum number of concurrent queries that can be running in the group.

softMemoryLimit

The required maximum amount of distributed memory that can be used by concurrent running queries. Once this limit is reached, new queries are queued until the memory reduces. Both absolute values (GB) and percentages (%) can be used.

softCpuLimit

The optional soft limit on the amount of CPU time that can be used within a time period as defined by the cpuQuotaPeriod property. Once this limit is reached, a penalty is applied to the running queries.

hardCpuLimit

The optional hard limit on the amount of CPU time that can be used within a time period as defined by the cpuQuotaPeriod property. Once this limit is reached, queries are rejected until the next quota period.

schedulingPolicy

The policy used to schedule new queries to select a query from the queue in the resource group and process it. Details are provided in the following section.

schedulingWeight

This optional property is to be used in conjunction with schedulingPolicy.

jmxExport

Flag to cause resource groups to be exposed via JMX. Defaults to false.

subGroups

A container for additional nested resource groups.

Scheduling Policy

The schedulingPolicy value noted in the preceding list can be configured to various values to be run in the following modes:

Fair

Setting schedulingPolicy to fair schedules queries in a first-in, first-out (FIFO) queue. If the resource group has subgroups, the subgroups with queued queries alternate.

Priority

Setting schedulingPolicy to query_priority schedules queued queries based on a managed priority queue. The priority of the query is specified by the client by using the query_priority session property (see “Session Information and Configuration”). If the resource group has subgroups, the subgroups must also specify query_priority.

Weighted

Setting schedulingPolicy to weighted_fair is used to choose the resource group subgroup to start the next query. The schedulingWeight property is used in conjunction with this: queries are chosen in proportion to the schedulingWeight of the subgroups.

Selector Rules Definition

Selector rules are required to define the group property, since it determines the resource group to which a query is assigned. It is a good practice to have the last selector in the file set to define only a group. It then acts as an explicit catchall group.

Optional properties and regular expressions can be used to refine the selector rules:

user

Matches against a username value. Regular expressions may be used to match against multiple names.

source

Matches against the source value. For example, this may be trino-cli or trino-jdbc. Regular expressions may be used.

queryType

Matches against the type of query. The available options are DATA_DEFINITION, DELETE, DESCRIBE, EXPLAIN, INSERT, and SELECT.

clientTags

Matches against the client tags specified by the client submitting the query.

To set the source or client tags from the Trino CLI, you can use the --source and --client-tags options:

$ trino --user mfuller --source mfuller-cli
$ trino --user mfuller --client-tags adhoc-queries

Conclusion

Well done! You have come a long way with Trino. In this chapter, you immersed yourself in the details of monitoring, tuning, and adjusting Trino. Of course, there is always more to learn, and typically these skills are improved with practice. Be sure to connect with other users to exchange ideas and learn from their experience and join the Trino community (see “Community Chat”).

And in Chapter 13, you get to find out what others have achieved with Trino already.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.239.231