After learning about and installing Trino, first as a simple exploratory setup in Chapter 2 and then as a deployment in Chapter 5, you now get to dive into further details. After all, simply installing and configuring a cluster is a very different task from keeping it up and running day and night, with different users and changing data sources and entirely separate usage.
In this chapter you are therefore getting to explore other aspects you need to learn about in order to be a successful operator of your Trino clusters.
As discussed in “Trino Web UI”, the Trino Web UI is accessible on every Trino cluster coordinator and can be used to inspect and monitor the Trino cluster and processed queries. The detailed information provided can be used to better understand and tune the Trino system overall as well as individual queries.
The Trino Web UI exposes information from the Trino system tables, discussed in “Trino System Tables”.
When you first navigate to the Trino Web UI address, you see the main dashboard shown in Figure 12-1. It displays Trino cluster information in the top section and a list of queries in the bottom section.
Let’s first discuss the Trino cluster information:
The total number of queries currently running in the Trino cluster. This accounts for all users. For example, if Alice is running two queries and Bob is running five queries, the total number in this box shows seven queries.
The total number of queued queries for the Trino cluster for all users. Queued queries are waiting for the coordinator to schedule them, based on the resource group configuration.
The number of blocked queries in the cluster. A blocked query is unable to be processed due to missing available splits or other resources. You learn more about query states in the next sections.
The number of active worker nodes in the cluster. Any worker nodes added or removed, manually or by auto scaling, are registered in the discovery service, and the displayed number is updated accordingly.
The average number of runnable drivers in the cluster, as described in Chapter 4.
The total amount of reserved memory in bytes in Trino.
The total number of rows processed per second across all queries running in the cluster.
The total number of bytes processed per second across all queries running in the cluster.
The total amount of worker parallelism, which is the total amount of thread CPU time across all workers, across all queries running in the cluster.
The bottom section of the Trino Web UI dashboard lists the recently run queries. An example screenshot is displayed in Figure 12-2. The number of available queries in this history list depends on the Trino cluster configuration.
Above the list itself, controls are available to select the criteria that determine which queries are listed. This allows you to locate specific queries even when a cluster is very busy and runs dozens or hundreds of queries.
The input field allows you to type criteria text to use in order to search for a specific query. The criteria include the username of the query initiator, the query source, the query ID, a resource group, or even the SQL text of the query and the query state.
The State filter beside the text input field allows you to include or exclude queries based on the state of the query—running, queued, finished, and failed queries. Failed queries can be further detailed to include or exclude for specific failure reasons—internal, external, resource, and user errors.
Controls on the left allow you to determine the sort order of the displayed queries, the timing of the reordering when the data changed, and the maximum number of queries to show.
Each row underneath the query criteria represents a single query. The left column in the row displays information about the query. The right column displays the SQL text and the state of the query. An example of the query summary is available in Figure 12-3.
Let’s take a closer look at the query details. Each query has the same
information for each query run. The text at the top left is the query
ID. In this example, the value is 20190803_224130_00010_iukvw
. Looking
closer, you may notice that the date and time (UTC) make up the beginning of
the ID using the format YYYYMMDD_HHMMSS
. The latter half is an
incremental counter for the query. Counter value 00010
simply means it was the
10th query run since the coordinator started. The final piece, iukvw
, is a
random identifier for the coordinator. Both this random identifier and the
counter value are reset if the coordinator is restarted. The time on the top
right is the local time when the query was run.
The next three values in the example—ec2-user
, trino-cli
, and global
—represent the end user running the query, the source of the query, and the
resource group used to run the query. In this example, the user is the
default ec2-user
, and we were using the trino-cli
to submit the query. If
you specify the --user
flag when running the Trino CLI, the value changes to
what you specified. The source may also be something other than trino-cli
;
for example, it may display trino-jdbc
when an application connects to Trino
with the JDBC driver. The client can also set it to any desired value with the
--source
flag for the Trino CLI, or JDBC connection string property.
The grid of values below is not well labeled in the Trino Web UI, but it contains some important information about the query, as explained in Table 12-1.
Completed Splits: The number of completed splits for the query. The example shows 25 completed splits. At the beginning of query execution, this value is 0. It increases during query execution as splits complete. |
Running Splits: The number of running splits for the query. When the query is completed, this value is always 0. However, during execution, this number changes as splits run and complete. |
Queued Splits: The number of queued splits for the query. When the query is completed, this value is always 0. However, during execution, this number changes as splits move between queued and run states. |
Wall Time: The total wall time spent executing the query. This value continues to grow even if you’re paging results. |
Total Wall Time: This value is the same as the wall time except that it includes queued time as well. The wall time excludes any time for which the query is queued. This is the total time you’d observe, from when you submit to query to when you finish receiving results. |
CPU Time: The total CPU time spent processing the query. This value is often larger than the wall time because parallel execution between workers and threads on workers all count separately and add up. For example, if four CPUs spend 1 second to process a query, the resulting total CPU time is 4 seconds. |
Current Total Reserved Memory: The current total reserved memory used for the time of query execution. For completed queries, this value is 0. |
Peak Total Memory: The peak total memory usage during the query execution. Certain operations during the query execution may require a lot of memory, and it is useful to know what the peak was. |
Cumulative User Memory: The cumulative user memory used throughout the query processing. This does not mean all the memory was used at the same time. It’s the cumulative amount of memory. |
Many of the icons and values in the Trino Web UI have pop-up tooltips that are visible if you hover your mouse cursor over the image. This is helpful if you are unsure of what a particular value represents.
Next you need to learn more about the different states of query processing,
displayed above the query text itself. The most common states are RUNNING
,
FINISHED
, USER CANCELLED
, or USER ERROR
. The states RUNNING
and
FINISHED
are self-explanatory and exactly what they say. USER CANCELLED
means that the query was killed by the user. USER ERROR
, on the other hand,
signifies that the SQL query statement submitted by the user contained a
syntactic or semantic error.
The BLOCKED
state occurs when a query that is running becomes blocked while waiting
on something such as resources or additional splits to process. Seeing a query
go to and from this state is normal. However, if a query gets stuck in this state,
there are many potential reasons, and this may indicate a problem with the query or the Trino cluster. If you find a query that appears to be stuck in this state,
first check the memory use and configuration of the system. It may be that this
query requires an unusually high amount of memory or is computationally
expensive. Additionally, if the client is not retrieving the results or cannot
read the results fast enough, this back pressure can put the query into a
BLOCKED
state.
The QUEUED
state occurs when a query is started, or stopped from processing,
and put into a waiting stage as part of the rules defined for resource groups.
The query is simply waiting to be executed.
You may also see a query in the PLANNING
state. This typically occurs for
larger, complex queries that require a lot of planning and optimizations for
running the query. If you see this often, and planning seems to take a noticeable
amount of time for queries, you should investigate possible reasons, such as insufficient memory availability or processing power of the coordinator.
So far you have seen information about the Trino cluster overall and higher-level information about the queries. The Web UI offers even more details about each query. Simply click the name of the specific query, as shown in Figure 12-3, to access the Query Details view.
The Query Details view contains a lot of information about a specific query. Let’s explore enough for you to feel comfortable using it.
The Query Details view is often used by Trino developers and users with in-depth knowledge of Trino. This level of sophistication requires you to be very familiar with the Trino code and internals. Checking out this view may still be useful for normal users. Over time, you learn more and acquire more expertise.
The Query Details view uses several tabs for viewing more detailed information about the Trino query. Apart from the tabs, the query ID and the state are always visible. You can see an example header of the view with the tabs in Figure 12-4.
The Overview page includes the information visible in the Query Details section of the query list and much more detail in numerous sections:
Session
Execution
Resource Utilizations Summary
Timeline
Query
Prepare Query
Stages
Tasks
The Stages section, shown in Figure 12-5, displays information on the query stages.
This particular query was the SELECT count(*)
FROM lineitem
query. Because it is a simpler query, it consists of only two
stages. Stage 0 is the single-task stage that runs on the coordinator and is
responsible for combining the results from the tasks in stage 1 and performing
the final aggregation. Stage 1 is a distributed stage that runs tasks on each of
the workers. This stage is responsible for reading the data and computing the
partial aggregation.
The following list explains the numerical values from the Stages section, available for each stage:
The amount of time the stage remained scheduled before all tasks for the stage were completed.
The amount of time the stage was blocked while waiting for data.
The total amount of CPU time of the tasks in the stage.
The cumulative memory used throughout the stage. This does not mean all the memory was used at the same time. It is the cumulative amount of memory used over the time of processing.
The current total reserved memory used for the stage. For completed queries, this value is 0.
The current amout of memory consumed by data, waiting for processing.
The peak total memory during the stage. Certain operations during the query execution may require a lot of memory, and it is useful to know what the peak was.
The number of pending tasks for the stage. When the query is completed, this value is always 0.
The number of running tasks for the stage. When the query is completed, this value is always 0. During execution, the value changes as tasks run and complete.
The number of blocked tasks for the stage. When the query is completed, this value is always 0. However, during execution this number will change as tasks move between blocked and running states.
The number of completed tasks for the query.
These histogram charts show the distribution and changes of scheduled time, CPU time, task scheduled time, and task CPU time for multiple tasks across workers. This allows you to diagnose utilization of the workers during the execution of a longer-running, distributed query.
The section below the Stages section describes more details of the tasks, displayed in Figure 12-6.
Let’s examine the values in the tasks list; take a look at Table 12-2.
Column | Description |
---|---|
ID |
The task identifier in the format |
Host |
The IP address of the worker node where the task is run. |
State |
The state of the task, which can be |
Pending Splits |
The number of pending splits for the task. This value changes as the task is running and shows 0 when the task is finished. |
Running Splits |
The number of running splits for the task. This value changes as the task is running and shows 0 when the task is finished. |
Blocked Splits |
The number of blocked splits for the task. The value changes as the task is running and shows 0 when the task is finished. |
Completed Splits |
The number of completed splits for the task. The value changes as the task is running and equals the total number of splits run when the task is finished. |
Rows |
The number of rows processed in the task. This value increases as the task runs. |
Rows/s |
The number of rows processed per second in the task. |
Bytes |
The number of bytes processed in the task. This value increases as the task runs. |
Bytes/s |
The number of bytes processed per second in the task. |
Elapsed |
The total amount of elapsed wall time for the task scheduled. |
CPU Time |
The total amount of CPU time for the task scheduled. |
Buffered |
Current amount of buffered data, waiting for processing. |
If you examine some of these values carefully, you’ll notice how they roll up. For example, the total CPU time for all tasks in a stage equals the CPU time listed in the stage for which they belong. The total CPU time for the stages equals the amount of CPU time listed on the query CPU time.
The Live Plan tab allows you to view query processing performed by the Trino cluster, in real time, while it is executing. You can see an example in Figure 12-7.
During query execution, the counters in the plan are updated while the query execution progresses. The values in the plan are the same as described for the Overview tab, but they are overlaid in real time on the query execution plan. Looking at this view is useful to visualize where a query is stuck or spending a lot of time, in order to diagnose or improve a performance issue.
The Stage Performance view provides a detailed visualization of the stage performance after query processing is finished. An example is displayed in Figure 12-8.
The view can be thought of as a drill-down from the Live Plan view, where you can see the operator pipeline of the task within the stage. The values in the plan are the same as described for the Overview tab. Looking at this view is useful to see where a query is stuck or spending a lot of time, in order to diagnose or fix a performance issue. You can click on each individual operator to access detailed information.
In “Query Planning”, you learned about the cost-based optimizer in Trino. Recall that SQL is a declarative language in which the user writes a SQL query to specify the data they want. This is unlike an imperative program. With SQL, the user does not specify how to process the data to get the result. It is left to the query planner and optimizer to determine the sequence of steps to process the data for the desired result. The sequence of steps is referred to as the query plan.
In most cases, the end user submitting the SQL queries can rely on Trino to plan, optimize, and execute a SQL query efficiently to get the results fast. As an end user, you should not have to worry about the details.
However, sometimes you are not getting the performance you expect, so you need to be able to tune Trino queries. You need to identify whether a specific execution is an outlier single query that is not performing well, or whether multiple queries of similar properties are not performing well.
Let’s start with tuning an individual query, assuming the rest of the queries you run are fine on the system. When examining a poorly performing query, you should first look to see if the tables that the query references have data statistics. At the time of this writing, the only tables that provide data statistics to Trino are those used with the Hive connector. It is expected that additional connectors will start to provide data statistics.
trino:ontime> SHOW STATS FOR flights;
Joins in SQL are one of the most expensive operations. You need to focus on
joins when tuning the performance of your query, and determine the join order by
running an EXPLAIN
on the query:
trino:ontime> EXPLAIN SELECT f.uniquecarrier, c.description, count(*) AS ct FROM postgresql.airline.carrier c, hive.ontime.flights_orc f WHERE c.code = f.uniquecarrier GROUP BY f.uniquecarrier, c.description ORDER BY count(*) DESC LIMIT 10;
As a general rule, you want the smaller input to a join to be on the build side. This is the input to the hash join for which a hash table is built. Because the build side requires reading in the entire input and building a hash table in memory, you want this to be the smaller of the inputs. Being able to determine whether Trino got the join order correct requires some domain knowledge of the data to further investigate. For example, if you know nothing about the data, you may have to run some experimental queries to obtain additional information.
If you have determined that the join order is nonoptimal, you can override the
join reordering strategy by setting a toggle to use the syntactic order of the
tables listed in the SQL query. This can be configured in the
config.properties file as the property optimizer.join-reordering-strategy
.
However, if you want to override a single query, you often want to just see the
session property join_reordering_strategy
(see “Session Information and Configuration”). The allowed
values for this property are AUTOMATIC
, ELIMINATE_CROSS_JOINS
and NONE
.
Setting the value to ELIMINATE_CROSS_JOINS
or NONE
performs an override of
the cost-based optimizer. ELIMINATE_CROSS_JOINS
is a good compromise since it
reorders joins only to eliminate cross joins, which is good practice, and
otherwise stays with the lexical order suggested by the query author:
... FROM postgresql.airline.carrier c, hive.ontime.flights_orc f ...
... FROM hive.ontime.flights_orc f, postgresql.airline.carrier c ...
Besides join optimizations, Trino includes some heuristic-based optimizations. These optimizers are not costed and do not always result in best results. Optimizations can take advantage of the fact that Trino is a distributed query engine; aggregations are performed in parallel. This means that an aggregation can be split into multiple smaller parts that can then be distributed to multiple workers, run in parallel, and be aggregated in a final step from the partial results.
A common optimization in Trino and other SQL engines is to push partial
aggregations before the join to reduce the amount of data going into the join.
Using it can be configured with the push_aggregation_through_join
property.
Because the aggregation produces partial results, a final aggregation is still
performed after the join. The benefit of using this optimization depends on the
actual data, and the optimization can even result in a slower query. For
example, if the join is highly selective, then it may be more performant to run
the aggregation after the join completes. To experiment, you can simply turn this
optimization off by setting the property to false
for the current session.
Another common heuristic is to compute a partial aggregation before the final aggregation:
trino:ontime> EXPLAIN SELECT count(*) FROM flights_orc; Query Plan --------------------------------------------------------------------- - Output[_col0] Layout: [count:bigint] _col0 := count - Aggregate(FINAL) Layout: [count:bigint] count := "count"("count_3") - LocalExchange[SINGLE] () Layout: [count_3:bigint] - RemoteExchange[GATHER] Layout: [count_3:bigint] - Aggregate(PARTIAL) Layout: [count_3:bigint] count_3 := "count"(*) - TableScan[hive:ontime:flights_orc] Layout: [] (1 row)
When this is a generally a good heuristic, the amount of memory to keep for the
hash table can be tuned. For example, if the table has a lot of rows with few
distinct values in the grouping keys, this optimization works well and
reduces the amount of data early before being distributed over the network.
However, if there are a higher number of distinct values, the size of the hash
tables needs to be larger in order to reduce the amount of data. By default, the
memory used for the hash table is 16 MB, but this can be adjusted by setting
task.max-partial-aggregation-memory
in the config.properties file. However,
with too high a count of distinct group keys, the aggregation does nothing to
reduce the network transfer, and it may even slow down the query.
Getting the memory configuration and management for your Trino cluster right is not an easy task. Many constantly changing factors influence the memory needs:
Number of workers
Memory of coordinator and worker
Number and type of data sources
Characteristics of running queries
Number of users
For Trino, a multiuser system using a cluster of workers, resource management is a fairly challenging problem to solve. Ultimately, you have to pick a starting point and then monitor the system and adjust to the current and upcoming needs. Let’s look at some details and talk about recommendations and guidelines around memory management and monitoring in Trino.
All memory management discussed applies to the JVM running the Trino server. The values are allocations within the JVMs on the workers, and the JVM configuration needs to consider the size of these values to allow allocation.
Depending on the number of concurrent queries, the JVM needs to be adjusted to a much larger value. The next example provides some insight.
All of the preceding factors combine into what we call the workload. Tuning the cluster’s memory relies heavily on the workload being run.
For example, most query shapes contain multiple joins, aggregations, and window functions. If the query size of the workload is small, you can set lower memory limits per query and increase the concurrency—and the other way around for larger query sizes. For context, query size is a product of query shape and amount of input data. Trino provides a way to manage memory utilization across the cluster by setting certain properties at the time of deployment in config.properties:
query.max-memory-per-node
query.max-total-memory-per-node
query.max-memory
query.max-total-memory
Memory management in Trino is separated into two kinds of memory allocations:
User queries such as aggregations and sorting control the user memory allocation.
System memory allocation is based on the execution implementation by the query engine itself and includes read, write, and shuffle on buffers, table scans, and other operations.
With this separation in mind, you can examine the memory properties some more:
query.max-memory-per-node
The maximum user memory a query can utilize on a specific worker for processing aggregations, input data allocation, etc.
query.max-total-memory-per-node
The maximum allowed total of user and system memory, required to be larger
than query.max-memory-per-node
. When the memory consumed by a query in user
and system allocations exceeds this limit, it is killed.
query.max-memory
The maximum user memory a query can utilize across all workers in the cluster.
query.max-total-memory
The maximum utilization of memory by a query for user and system allocations
for the entire cluster, as a result necessarily greater than
query.max-memory
.
If a query ends up exceeding these limits and as a result is killed, error codes expose the reason:
EXCEEDED_LOCAL_MEMORY_LIMIT
means that query.max-memory-per-node
or
query.max-total-memory-per-node
was exceeded.
EXCEEDED_GLOBAL_MEMORY_LIMIT
means that query.max-memory
or
query.max-total-memory
was exceeded.
Let’s look at a real-world example for a small cluster of one coordinator and ten workers and their characteristics:
One coordinator
Ten workers; typically workers are all identical system specifications
Physical memory per worker: 50 GB
Max JVM heap size configured in -Xmx
in jvm.config
to 38 GB
query.max-memory-per-node
: 13 GB
query.max-total-memory-per-node
: 16 GB
memory.heap-headroom-per-node
: 9 GB
query.max-memory
: 50 GB
query.max-total-memory
: 60 GB
Let’s break these numbers down a bit more.
The total available memory on each worker is ~50 GB and we leave ~12 GB for the operating system, agents/daemons, and components running outside the JVM on the system. These systems include monitoring and other systems that allow you to manage the machine and keep it functioning well. As a result, we determine to set the JVM max heap size to 38 GB.
When query size and shape is small, concurrency can be set higher. In the preceding
example, we are assuming query size and shape to be medium to large and are also
accounting for data skew. query.max-memory
is set to 50 GB, which is at the
overall cluster level. While looking at max-memory
, we also consider
initial-hash-partitions; this should ideally be a number less than or equal to
the number of workers.
If we set that to 8 with max-memory
50 GB, we get 50/8, so about 6.25 GB per worker.
Looking at the local limit max-memory-per-node
set to 13 GB, we keep some
headroom for data skew by allowing two times the memory consumption per node.
These numbers vary significantly based on how the data is organized and what type of queries are typically run—basically, the workload for the cluster. In addition, the
infrastructure used for the cluster, such as the available machine sizes and numbers, has a big impact on the selection of the ideal configuration.
A configuration can be set to help avoid a deadlock situation:
query.low-memory-killer.policy
. This can be set to total-reservation
or
total-reservation-on-blocked-nodes
. When set to total-reservation
, Trino
kills the largest running query on the cluster to free up resources. On the
other hand, total-reservation-on-blocked-nodes
kills the query that is
utilizing the most memory on the nodes that are blocked.
As you can see from the example, you just end up making some assumptions to get started. And then you adjust based on what happens with your actual workloads.
For example, running a cluster for interactive ad hoc queries from users with a visualization tool can create many small queries. An increase of users then ends up increasing the number of queries and the concurrency of them. This typically does not require a change of the memory configuration, but just a scaling up of the number of workers in the cluster.
On the other hand, if that same cluster gets a new data source added that exposes massive amounts of data for complex queries that most likely blow the limits, you have to adjust memory.
This gets us to another point that is worth mentioning. Following the best practice recommendation, in a typical Trino cluster infrastructure setup, all workers are the same. All use the same virtual machine (VM) image or container size with identical hardware specifications. As a result, changing the memory on these workers typically means that the new value is either too large for the physical available memory, or too small to make good use of the overall system. Adjusting the memory therefore creates the need to replace all worker nodes in the cluster. Depending on your cluster infrastructure using, for example, virtual machines in a private cloud or containers in a Kubernetes cluster from a public cloud provider, this process can be more or less laborious and fast to implement.
This leads us to one last point worth mentioning here. Your assessment of the workloads can reveal that they are widely different: lots of queries are small, fast, ad hoc queries with little memory footprint, and others are massive, long-running processes with a bunch of analysis in there, maybe even using very different data sources. These workload differences indicate very different memory configuration, and even very different worker configuration and maybe even monitoring needs. In this scenario, you really should take the next step and separate the workloads by using different Trino clusters.
To improve performance for your Trino cluster, certain task-related properties may need to be adjusted from the default settings. In this section, we discuss two common properties you may have to tune. However, you can find several others in the Trino documentation. All these properties are set in the config.properties file:
The default value is set to the number of CPUs of the machine multiplied by 2.
For example, a dual hex core machine uses 2 Ă— 6 Ă— 2
, so 24 worker threads. If
you observe that all threads are being used and the CPU utilization in the
machine is still low, you can try to improve CPU utilization and thus
performance by increasing this number via the
task.max-worker-threads
setting. The recommendation is to slowly increment this
number, as setting it too high can have a diminishing return or adverse effects
due to increased memory usage and additional context switching.
Operators such as joins and aggregations are processed in parallel by
partitioning the data locally and executing the operators in parallel. For
example, the data is locally partitioned on the GROUP BY
columns, and then
multiple aggregation operators are performed in parallel. The default
concurrency for these parallel operations is 16
. This value can be adjusted
by setting the task.concurrency
property. If you are running many concurrent
queries, the default value may result in reduced performance due to the
overhead of context switching. For clusters that run only a smaller number of
concurrent queries, a higher value can help increase parallelism and therefore
performance.
To improve performance of your Trino cluster, certain scheduling related properties may need to be adjusted from the default settings. You can tune three common configurations:
Splits per task
Splits per node
Local scheduling
Several others are explained in the Trino documentation.
Each worker node processes a maximum number of splits. By default, the maximum
number of splits processed per worker node is 100
. This value can be adjusted
with the node-scheduler.max-splits-per-node
property. You may want to adjust
this if you’re finding that the workers have maxed out this value and are still
underutilized. Increasing the value improves performance, particularly when
a lot of splits exist. In addition, you can consider increasing the
node-scheduler.max-pending-splits-per-task
property. This value should not
exceed node-scheduler.max-splits-per-node
. It ensures that work is
queued up when the workers finish the tasks in process.
When scheduling splits on worker nodes, the Trino scheduler uses two methods. The method to use depends on the deployment of Trino. For example, if workers run on the same machines as distributed storage, it is optimal to schedule splits on the worker nodes where the data is located. Scheduling splits where the data is not located requires the data to be transferred over the network to the worker node for processing. Therefore, you see an increase in performance when scheduling splits on the same node where the data is stored.
The default method legacy
does not account for the data location when
scheduling splits. The improved flat
method does account for the data location
when scheduling splits, and it can be used by setting
node-scheduler.network-topology
. The Trino scheduler uses 50% of the queue
for scheduling local splits.
The common use case for using the flat method occurs when Trino is installed and collocated on HDFS data nodes or when using RubiX for caching (see “Performance Improvements with RubiX”). RubiX caches data from the distributed storage on the worker nodes. Therefore, scheduling local splits is advantageous.
Another important factor affecting the performance of your Trino cluster is the network configuration and setup within the cluster and the closeness to the data sources. Trino supports network-specific properties that can be adjusted from the defaults to adopt to your specific scenario.
In addition to improving performance, sometimes other network-related issues require tuning in order for queries to perform well. Let’s discuss some of the common properties you may have to tune.
Trino exchange clients make requests to the upstream tasks producing data. The
default number of threads used by the exchange clients is 25. This value can be
adjusted by setting the property exchange.client-threads
.
Setting a larger number of threads can improve performance for larger clusters or clusters configured for high concurrency. The reason is that when more data is produced, additional concurrency to consume the data can decrease latency. Keep in mind that these additional threads increase the amount of memory needed.
Data sent and received during the exchange is kept in buffers on the target and source sides of the exchange. The buffer sizes for each side can be adjusted independently.
On the source side, data is written by the tasks to a buffer waiting to be
requested from the downstream exchange client. The default buffer size is 32 MB.
It can be adjusted by setting the sink.max-buffer-size
property. Increasing
the value may increase throughput for a larger cluster.
On the target side, data is written to a buffer before it is processed by the
downstream task. The default buffer size is 32 MB. It can be adjusted by setting
the property exchange.max-buffer-size
. Setting a higher value can improve
query performance by allowing retrieval of more data over the network before
back pressure is applied. This is especially true for larger clusters.
In Chapter 2, you used the configuration file etc/jvm.config, which contains command-line options for the JVM. The Trino launcher uses this file when starting the JVM running Trino. Compared to the earlier mentioned configuration, a more suitable configuration for production usage uses higher memory values:
-server -XX:+UseG1GC -XX:+ExplicitGCInvokesConcurrent -XX:+ExitOnOutOfMemoryError -XX:+UseGCOverheadLimit -XX:+HeapDumpOnOutOfMemoryError -XX:-UseBiasedLocking -Djdk.attach.allowAttachSelf=true -Xms64G -Xmx64G -XX:G1HeapRegionSize=32M -XX:ReservedCodeCacheSize=512M -Djdk.nio.maxCachedBufferSize=2000000
Typically you need to increase the maximum memory allocation pool for the JVM with the Xmx
value, in this case, upped to 64 GB. The
Xms
parameter sets the initial, minimal memory allocation. The general
recommendation is to set both Xmx
and Xms
to the same value.
In the preceding configuration example memory allocation is set to 16 GB. The actual
value you use depends on the machines used by your cluster. The general
recommendation is to set both the Xmx
and Xms
to 80% of the total memory of
the system. This allows for plenty of headroom for other system processes
running. Further details about the memory management of Trino and related
configuration can be found in “Memory Management”.
For large Trino deployments, memory allocations of 200 GB and beyond are not uncommon.
While small processes such as monitoring can run on the same machine as Trino, it’s highly discouraged to share the system with other resource-intensive software. For example, Apache Spark and Trino should not be run on the same set of hardware.
If you suspect Java garbage collection (GC) issues, you can set additional parameters to help you with debugging:
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -XX:+PrintReferenceGC -XX:+PrintClassHistogramAfterFullGC -XX:+PrintClassHistogramBeforeFullGC -XX:PrintFLSStatistics=2 -XX:+PrintAdaptiveSizePolicy -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1
These options can be helpful when troubleshooting a full GC pause. In combination with the advancements of JVM and the engineering of the Trino query engine, a GC pause should be a very rare event. If it does happen, it should be investigated. First steps to fix these issues are often an upgrade of the JVM version and the Trino version used, since both receive performance improvements regularly.
JVM and garbage collection algorithms and configuration are complex topics. Documentation on GC tuning is available from Oracle and other JVM vendors. We strongly recommend adjusting these settings in small changes in test environments, before attempting to roll them out to production systems. Also keep in mind that Trino currently requires Java 11. Older or newer JVM versions, as well as JVM versions from different vendors, can have different behavior.
Resource groups are a powerful concept in Trino used to limit resource utilization on the system. The resource group configuration consists of two main pieces: the resource group properties and the selector rules.
A resource group is a named collection of properties that define available cluster resources. You can think of a resource group as a separate section in the cluster that is isolated from other resource groups. The group is defined by CPU and memory limits, concurrency limits, queuing priorities, and priority weights for selecting queued queries to run.
The selector rules, on the other hand, allow Trino to assign an incoming query request to a specific resource group.
The default resource group manager uses a file-based configuration and needs to be configured in etc/resource-groups.properties:
resource-groups.configuration-manager
=
file
resource-groups.config-file
=
etc/resource-groups.json
As you can see, the actual configuration uses a JSON file. The content of the file defines the resource groups as well as the selector rules. Note that the JSON file can be any path that is accessible to Trino and that resource groups need to be configured only on the coordinator:
{
"rootGroups"
:
[],
"selectors"
:
[],
"cpuQuotaPeriod"
:
""
}
cpuQuotaPeriod
is optional.
Let’s look at the definition of two resource groups to get started:
"rootGroups"
:
[
{
"name"
:
"ddl"
,
"maxQueued"
:
100
,
"hardConcurrencyLimit"
:
10
,
"softMemoryLimit"
:
"10%"
,
},
{
"name"
:
"ad-hoc"
,
"maxQueued"
:
50
,
"hardConcurrencyLimit"
:
1
,
"softMemoryLimit"
:
"100%"
,
}
]
The example defines two resource groups named ddl
and ad-hoc
. Each group has
a set maximum number of concurrent queries and total amount of distributed
memory limits. For the given group, if the limits are met for concurrency or
memory limits, then any new query is placed in the queue. Once the total memory
usage goes down or a query completes, the resource group chooses a query from
the queue to schedule to run. Each group also has a maximum number of queries to
queue. If this limit is reached, any new queries are rejected and the client
receives an error indicating that.
In our example, the ad hoc group is designed for all queries that are not DDL queries. This group allows only one query to run concurrently, with up to 50 queries to be queued. The group has a memory limit of up to 100%, meaning it could use all the available memory to run it.
DDL queries have their own group, with the idea that these types of queries are relatively short and lightweight and should not be starved by longer-running ad hoc SQL queries. In this group, you specify that there should be no more than 10 DDL queries running concurrently, and the total amount of distributed memory used by all queries running should be no more than 10% of the Trino clusters memory. This allows a DDL query to be executed without having to wait in the ad hoc query line.
Now that the two groups are defined, you need to define the selector rules. When a new query arrives in the coordinator, it is assigned to a particular group. Let’s take a look at the example:
"selectors"
:
[
{
"queryType"
:
"DATA_DEFINITION"
,
"group"
:
"ddl"
},
{
"group"
:
"ad-hoc"
}
]
The first selector matches any query type of DATA_DEFINITION
and assigns it to
the ddl
resource group. The second selector matches all other queries and
places those queries in the ad-hoc
resource group.
The order of the selectors is important because they are processed sequentially, and
the first match assigns the query to the group. And in order to match, all
properties specified must match. For example, if we switch the order of the
selectors, then all queries including DDL are to be assigned to the ad-hoc
resource group. No queries are ever assigned to the ddl
resource group.
Let’s take a closer look at the following configuration properties for resource groups:
name
The required name of the resource group, referenced by the selector rule for assignment.
maxQueued
The required maximum number of queries queued for the group.
hardConcurrencyLimit
The required maximum number of concurrent queries that can be running in the group.
softMemoryLimit
The required maximum amount of distributed memory that can be used by concurrent running queries. Once this limit is reached, new queries are queued until the memory reduces. Both absolute values (GB) and percentages (%) can be used.
softCpuLimit
The optional soft limit on the amount of CPU time that can be used within a
time period as defined by the cpuQuotaPeriod
property. Once this limit is
reached, a penalty is applied to the running queries.
hardCpuLimit
The optional hard limit on the amount of CPU time that can be used within a
time period as defined by the cpuQuotaPeriod
property. Once this limit is
reached, queries are rejected until the next quota period.
schedulingPolicy
The policy used to schedule new queries to select a query from the queue in the resource group and process it. Details are provided in the following section.
schedulingWeight
This optional property is to be used in conjunction with schedulingPolicy
.
jmxExport
Flag to cause resource groups to be exposed via JMX. Defaults to false
.
subGroups
The schedulingPolicy
value noted in the preceding list can be configured to various values
to be run in the following modes:
Setting schedulingPolicy
to fair
schedules queries in a
first-in, first-out (FIFO) queue. If the resource group has subgroups,
the subgroups with queued queries alternate.
Setting schedulingPolicy
to query_priority
schedules queued queries
based on a managed priority queue. The priority of the query is specified by
the client by using the query_priority
session property (see “Session Information and Configuration”).
If the resource group has subgroups, the subgroups must also specify
query_priority
.
Setting schedulingPolicy
to weighted_fair
is used to choose the
resource group subgroup to start the next query. The schedulingWeight
property is
used in conjunction with this: queries are chosen in proportion to the schedulingWeight
of the subgroups.
Selector rules are required to define the group
property, since it determines
the resource group to which a query is assigned. It is a good practice to have
the last selector in the file set to define only a group. It then acts as an
explicit catchall group.
Optional properties and regular expressions can be used to refine the selector rules:
user
Matches against a username value. Regular expressions may be used to match against multiple names.
source
Matches against the source value. For example, this may be trino-cli
or trino-jdbc
. Regular expressions may be used.
queryType
Matches against the type of query. The available options are
DATA_DEFINITION
, DELETE
, DESCRIBE
, EXPLAIN
, INSERT
, and SELECT
.
clientTags
Matches against the client tags specified by the client submitting the query.
To set the source or client tags from the Trino CLI, you can use the
--source
and --client-tags
options:
$ trino --user mfuller --source mfuller-cli
$ trino --user mfuller --client-tags adhoc-queries
Well done! You have come a long way with Trino. In this chapter, you immersed yourself in the details of monitoring, tuning, and adjusting Trino. Of course, there is always more to learn, and typically these skills are improved with practice. Be sure to connect with other users to exchange ideas and learn from their experience and join the Trino community (see “Community Chat”).
And in Chapter 13, you get to find out what others have achieved with Trino already.
3.143.239.231