Chapter 10. Tuning

HiveQL is a declarative language where users issue declarative queries and Hive figures out how to translate them into MapReduce jobs. Most of the time, you don’t need to understand how Hive works, freeing you to focus on the problem at hand. While the sophisticated process of query parsing, planning, optimization, and execution is the result of many years of hard engineering work by the Hive team, most of the time you can remain oblivious to it.

However, as you become more experienced with Hive, learning about the theory behind Hive, and the low-level implementation details, will let you use Hive more effectively, especially where performance optimizations are concerned.

This chapter covers several different topics related to tuning Hive performance. Some tuning involves adjusting numeric configuration parameters (“turning the knobs”), while other tuning steps involve enabling or disabling specific features.

Using EXPLAIN

The first step to learning how Hive works (after reading this book…) is to use the EXPLAIN feature to learn how Hive translates queries into MapReduce jobs.

Consider the following example:

hive> DESCRIBE onecol;
number  int

hive> SELECT * FROM onecol;
5
5
4

hive> SELECT SUM(number) FROM onecol;
14

Now, put the EXPLAIN keyword in front of the last query to see the query plan and other information. The query will not be executed.

hive> EXPLAIN SELECT SUM(number) FROM onecol;

The output requires some explaining and practice to understand.

First, the abstract syntax tree is printed. This shows how Hive parsed the query into tokens and literals, as part of the first step in turning the query into the ultimate result:

ABSTRACT SYNTAX TREE:
(TOK_QUERY
  (TOK_FROM (TOK_TABREF (TOK_TABNAME onecol)))
  (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
  (TOK_SELECT
     (TOK_SELEXPR
       (TOK_FUNCTION sum (TOK_TABLE_OR_COL number))))))

(The indentation of the actual output was changed to fit the page.)

For those not familiar with parsers and tokenizers, this can look overwhelming. However, even if you are a novice in this area, you can study the output to get a sense for what Hive is doing with the SQL statement. (As a first step, ignore the TOK_ prefixes.)

Even though our query will write its output to the console, Hive will actually write the output to a temporary file first, as shown by this part of the output:

'(TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))'

Next, we can see references to our column name number, our table name onecol, and the sum function.

A Hive job consists of one or more stages, with dependencies between different stages. As you might expect, more complex queries will usually involve more stages and more stages usually requires more processing time to complete.

A stage could be a MapReduce job, a sampling stage, a merge stage, a limit stage, or a stage for some other task Hive needs to do. By default, Hive executes these stages one at a time, although later we’ll discuss parallel execution in Parallel Execution.

Some stages will be short, like those that move files around. Other stages may also finish quickly if they have little data to process, even though they require a map or reduce task:

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

The STAGE PLAN section is verbose and complex. Stage-1 is the bulk of the processing for this job and happens via a MapReduce job. A TableScan takes the input of the table and produces a single output column number. The Group By Operator applies the sum(number) and produces an output column _col0 (a synthesized name for an anonymous result). All this is happening on the map side of the job, under the Map Operator Tree:

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        onecol
          TableScan
            alias: onecol
            Select Operator
              expressions:
                    expr: number
                    type: int
              outputColumnNames: number
              Group By Operator
                aggregations:
                      expr: sum(number)
                bucketGroup: false
                mode: hash
                outputColumnNames: _col0
                Reduce Output Operator
                  sort order:
                  tag: -1
                  value expressions:
                        expr: _col0
                        type: bigint

On the reduce side, under the Reduce Operator Tree, we see the same Group by Operator but this time it is applying sum on _col0. Finally, in the reducer we see the File Output Operator, which shows that the output will be text, based on the string output format: HiveIgnoreKeyTextOutputFormat:

      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
          bucketGroup: false
          mode: mergepartial
          outputColumnNames: _col0
          Select Operator
            expressions:
                  expr: _col0
                  type: bigint
            outputColumnNames: _col0
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
                      org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Because this job has no LIMIT clause, Stage-0 is a no-op stage:

  Stage: Stage-0
    Fetch Operator
      limit: -1

Understanding the intricate details of how Hive parses and plans every query is not useful all of the time. However, it is a nice to have for analyzing complex or poorly performing queries, especially as we try various tuning steps. We can observe what effect these changes have at the “logical” level, in tandem with performance measurements.

EXPLAIN EXTENDED

Using EXPLAIN EXTENDED produces even more output. In an effort to “go green,” we won’t show the entire output, but we will show you the Reduce Operator Tree to demonstrate the different output:

 Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
          bucketGroup: false
          mode: mergepartial
          outputColumnNames: _col0
          Select Operator
            expressions:
                  expr: _col0
                  type: bigint
            outputColumnNames: _col0
            File Output Operator
              compressed: false
              GlobalTableId: 0
              directory: file:/tmp/edward/hive_2012-[long number]/-ext-10001
              NumFilesPerFileSink: 1
              Stats Publishing Key Prefix: 
                  file:/tmp/edward/hive_2012-[long number]/-ext-10001/
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
                      org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  properties:
                    columns _col0
                    columns.types bigint
                    escape.delim 
                    serialization.format 1
              TotalFiles: 1
              GatherStats: false
              MultiFileSpray: false

We encourage you to compare the two outputs for the Reduce Operator Tree.

Limit Tuning

The LIMIT clause is commonly used, often by people working with the CLI. However, in many cases a LIMIT clause still executes the entire query, then only returns a handful of results. Because this behavior is generally wasteful, it should be avoided when possible. Hive has a configuration property to enable sampling of source data for use with LIMIT:

<property>
  <name>hive.limit.optimize.enable</name>
  <value>true</value>
  <description>Whether to enable to optimization to
    try a smaller subset of data for simple LIMIT first.</description>
</property>

Once the hive.limit.optimize.enable is set to true, two variables control its operation, hive.limit.row.max.size and hive.limit.optimize.limit.file:

<property>
  <name>hive.limit.row.max.size</name>
  <value>100000</value>
  <description>When trying a smaller subset of data for simple LIMIT, 
   how much size we need to guarantee each row to have at least.
  </description>
</property>

<property>
  <name>hive.limit.optimize.limit.file</name>
  <value>10</value>
  <description>When trying a smaller subset of data for simple LIMIT, 
   maximum number of files we can sample.</description>
</property>

A drawback of this feature is the risk that useful input data will never get processed. For example, any query that requires a reduce step, such as most JOIN and GROUP BY operations, most calls to aggregate functions, etc., will have very different results. Perhaps this difference is okay in many cases, but it’s important to understand.

Optimized Joins

We discussed optimizing join performance in Join Optimizations and Map-side Joins. We won’t reproduce the details here, but just remind yourself that it’s important to know which table is the largest and put it last in the JOIN clause, or use the /* streamtable(table_name) */ directive.

If all but one table is small enough, typically to fit in memory, then Hive can perform a map-side join, eliminating the need for reduce tasks and even some map tasks. Sometimes even tables that do not fit in memory are good candidates because removing the reduce phase outweighs the cost of bringing semi-large tables into each map tasks.

Local Mode

Many Hadoop jobs need the full scalability benefits of Hadoop to process large data sets. However, there are times when the input to Hive is very small. In these cases, the overhead of launching tasks for queries consumes a significant percentage of the overall job execution time. In many of these cases, Hive can leverage the lighter weight of the local mode to perform all the tasks for the job on a single machine and sometimes in the same process. The reduction in execution times can be dramatic for small data sets.

You can explicitly enable local mode temporarily, as in this example:

hive> set oldjobtracker=${hiveconf:mapred.job.tracker};

hive> set mapred.job.tracker=local;

hive> set mapred.tmp.dir=/home/edward/tmp;

hive> SELECT * from people WHERE firstname=bob;
...

hive> set mapred.job.tracker=${oldjobtracker};

You can also tell Hive to automatically apply this optimization by setting hive.exec.mode.local.auto to true, perhaps in your $HOME/.hiverc.

To set this property permanently for all users, change the value in your $HIVE_HOME/conf/hive-site.xml:

<property>
  <name>hive.exec.mode.local.auto</name>
  <value>true</value>
  <description> 
    Let hive determine whether to run in local mode automatically 
  </description>
</property>

Parallel Execution

Hive converts a query into one or more stages. Stages could be a MapReduce stage, a sampling stage, a merge stage, a limit stage, or other possible tasks Hive needs to do. By default, Hive executes these stages one at a time. However, a particular job may consist of some stages that are not dependent on each other and could be executed in parallel, possibly allowing the overall job to complete more quickly. However, if more stages are run simultaneously, the job may complete much faster.

Setting hive.exec.parallel to true enables parallel execution. Be careful in a shared cluster, however. If a job is running more stages in parallel, it will increase its cluster utilization:

<property>
  <name>hive.exec.parallel</name>
  <value>true</value>
  <description>Whether to execute jobs in parallel</description>
</property>

Strict Mode

Strict mode is a setting in Hive that prevents users from issuing queries that could have unintended and undesirable effects.

Setting the property hive.mapred.mode to strict disables three types of queries.

First, queries on partitioned tables are not permitted unless they include a partition filter in the WHERE clause, limiting their scope. In other words, you’re prevented from queries that will scan all partitions. The rationale for this limitation is that partitioned tables often hold very large data sets that may be growing rapidly. An unrestricted partition could consume unacceptably large resources over such a large table:

hive> SELECT DISTINCT(planner_id) FROM fracture_ins WHERE planner_id=5;
FAILED: Error in semantic analysis: No Partition Predicate Found for
Alias "fracture_ins" Table "fracture_ins"

The following enhancement adds a partition filter—the table partitions—to the WHERE clause:

hive> SELECT DISTINCT(planner_id) FROM fracture_ins
    > WHERE planner_id=5 AND hit_date=20120101;
... normal results ...

The second type of restricted query are those with ORDER BY clauses, but no LIMIT clause. Because ORDER BY sends all results to a single reducer to perform the ordering, forcing the user to specify a LIMIT clause prevents the reducer from executing for an extended period of time:

hive> SELECT * FROM fracture_ins WHERE hit_date>2012 ORDER BY planner_id;
FAILED: Error in semantic analysis: line 1:56 In strict mode,
limit must be specified if ORDER BY is present planner_id

To issue this query, add a LIMIT clause:

hive> SELECT * FROM fracture_ins WHERE hit_date>2012 ORDER BY planner_id
    > LIMIT 100000;
... normal results ...

The third and final type of query prevented is a Cartesian product. Users coming from the relational database world may expect that queries that perform a JOIN not with an ON clause but with a WHERE clause will have the query optimized by the query planner, effectively converting the WHERE clause into an ON clause. Unfortunately, Hive does not perform this optimization, so a runaway query will occur if the tables are large:

hive> SELECT * FROM fracture_act JOIN fracture_ads
    > WHERE fracture_act.planner_id = fracture_ads.planner_id;
FAILED: Error in semantic analysis: In strict mode, cartesian product
is not allowed. If you really want to perform the operation,
+set hive.mapred.mode=nonstrict+

Here is a properly constructed query with JOIN and ON clauses:

hive> SELECT * FROM fracture_act JOIN fracture_ads
    > ON (fracture_act.planner_id = fracture_ads.planner_id);
... normal results ...

Tuning the Number of Mappers and Reducers

Hive is able to parallelize queries by breaking the query into one or more MapReduce jobs. Each of which might have multiple mapper and reducer tasks, at least some of which can run in parallel. Determining the optimal number of mappers and reducers depends on many variables, such as the size of the input and the operation being performed on the data.

A balance is required. Having too many mapper or reducer tasks causes excessive overhead in starting, scheduling, and running the job, while too few tasks means the inherent parallelism of the cluster is underutilized.

When running a Hive query that has a reduce phase, the CLI prints information about how the number of reducers can be tuned. Let’s see an example that uses a GROUP BY query, because they always require a reduce phase. In contrast, many other queries are converted into map-only jobs:

hive> SELECT pixel_id, count FROM fracture_ins WHERE hit_date=20120119
    > GROUP BY pixel_id;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 3
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
...

Hive is determining the number of reducers from the input size. This can be confirmed using the dfs -count command, which works something like the Linux du -s command; it computes a total size for all the data under a given directory:

[edward@etl02 ~]$ hadoop dfs -count /user/media6/fracture/ins/* | tail -4
 1  8  2614608737 hdfs://.../user/media6/fracture/ins/hit_date=20120118
 1  7  2742992546 hdfs://.../user/media6/fracture/ins/hit_date=20120119
 1 17  2656878252 hdfs://.../user/media6/fracture/ins/hit_date=20120120
 1  2   362657644 hdfs://.../user/media6/fracture/ins/hit_date=20120121

(We’ve reformatted the output and elided some details for space.)

The default value of hive.exec.reducers.bytes.per.reducer is 1 GB. Changing this value to 750 MB causes Hive to estimate four reducers for this job:

hive> set hive.exec.reducers.bytes.per.reducer=750000000;

hive> SELECT pixel_id,count(1) FROM fracture_ins WHERE hit_date=20120119
    > GROUP BY pixel_id;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 4
...

This default typically yields good results. However, there are cases where a query’s map phase will create significantly more data than the input size. In the case of excessive map phase data, the input size of the default might be selecting too few reducers. Likewise the map function might filter a large portion of the data from the data set and then fewer reducers may be justified.

A quick way to experiment is by setting the number of reducers to a fixed size, rather than allowing Hive to calculate the value. If you remember, the Hive default estimate is three reducers. Set mapred.reduce.tasks to different numbers and determine if more or fewer reducers results in faster run times. Remember that benchmarking like this is complicated by external factors such as other users running jobs simultaneously. Hadoop has a few seconds overhead to start up and schedule map and reduce tasks. When executing performance tests, it’s important to keep these factors in mind, especially if the jobs are small.

The hive.exec.reducers.max property is useful for controlling resource utilization on shared clusters when dealing with large jobs. A Hadoop cluster has a fixed number of map and reduce “slots” to allocate to tasks. One large job could reserve all of the slots and block other jobs from starting. Setting hive.exec.reducers.max can stop a query from taking too many reducer resources. It is a good idea to set this value in your $HIVE_HOME/conf/hive-site.xml. A suggested formula is to set the value to the result of this calculation:

(Total Cluster Reduce Slots * 1.5) / (avg number of queries running)

The 1.5 multiplier is a fudge factor to prevent underutilization of the cluster.

JVM Reuse

JVM reuse is a Hadoop tuning parameter that is very relevant to Hive performance, especially scenarios where it’s hard to avoid small files and scenarios with lots of tasks, most which have short execution times.

The default configuration of Hadoop will typically launch map or reduce tasks in a forked JVM. The JVM start-up may create significant overhead, especially when launching jobs with hundreds or thousands of tasks. Reuse allows a JVM instance to be reused up to N times for the same job. This value is set in Hadoop’s mapred-site.xml (in $HADOOP_HOME/conf):

<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is no limit.
  </description>
</property>

A drawback of this feature is that JVM reuse will keep reserved task slots open until the job completes, in case they are needed for reuse. If an “unbalanced” job has some reduce tasks that run considerably longer than the others, the reserved slots will sit idle, unavailable for other jobs, until the last task completes.

Indexes

Indexes may be used to accelerate the calculation speed of a GROUP BY query.

Hive contains an implementation of bitmap indexes since v0.8.0. The main use case for bitmap indexes is when there are comparatively few values for a given column. See Bitmap Indexes for more information.

Dynamic Partition Tuning

As explained in Dynamic Partition Inserts, dynamic partition INSERT statements enable a succinct SELECT statement to create many new partitions for insertion into a partitioned table.

This is a very powerful feature, however if the number of partitions is high, a large number of output handles must be created on the system. This is a somewhat uncommon use case for Hadoop, which typically creates a few files at once and streams large amounts of data to them.

Out of the box, Hive is configured to prevent dynamic partition inserts from creating more than 1,000 or so partitions. While it can be bad for a table to have too many partitions, it is generally better to tune this setting to the larger value and allow these queries to work.

First, it is always good to set the dynamic partition mode to strict in your hive-site.xml, as discussed in Strict Mode. When strict mode is on, at least one partition has to be static, as demonstrated in Dynamic Partition Inserts:

<property>
  <name>hive.exec.dynamic.partition.mode</name>
  <value>strict</value>
  <description>In strict mode, the user must specify at least one
static partition in case the user accidentally overwrites all
partitions.</description>
</property>

Then, increase the other relevant properties to allow queries that will create a large number of dynamic partitions, for example:

<property>
  <name>hive.exec.max.dynamic.partitions</name>
  <value>300000</value>
  <description>Maximum number of dynamic partitions allowed to be
created in total.</description>
</property>

<property>
  <name>hive.exec.max.dynamic.partitions.pernode</name>
  <value>10000</value>
  <description>Maximum number of dynamic partitions allowed to be
created in each mapper/reducer node.</description>
</property>

Another setting controls how many files a DataNode will allow to be open at once. It must be set in the DataNode’s $HADOOP_HOME/conf/hdfs-site.xml.

In Hadoop v0.20.2, the default value is 256, which is too low. The value affects the number of maximum threads and resources, so setting it to a very high number is not recommended. Note also that in Hadoop v0.20.2, changing this variable requires restarting the DataNode to take effect:

<property>
  <name>dfs.datanode.max.xcievers</name>
  <value>8192</value>
</property>

Speculative Execution

Speculative execution is a feature of Hadoop that launches a certain number of duplicate tasks. While this consumes more resources computing duplicate copies of data that may be discarded, the goal of this feature is to improve overall job progress by getting individual task results faster, and detecting then black-listing slow-running TaskTrackers.

Hadoop speculative execution is controlled in the $HADOOP_HOME/conf/mapred-site.xml file by the following two variables:

<property>
  <name>mapred.map.tasks.speculative.execution</name>
  <value>true</value>
  <description>If true, then multiple instances of some map tasks
  may be executed in parallel.</description>
</property>

<property>
  <name>mapred.reduce.tasks.speculative.execution</name>
  <value>true</value>
  <description>If true, then multiple instances of some reduce tasks
  may be executed in parallel.</description>
</property>

However, Hive provides its own variable to control reduce-side speculative execution:

<property>
  <name>hive.mapred.reduce.tasks.speculative.execution</name>
  <value>true</value>
  <description>Whether speculative execution for
  reducers should be turned on. </description>
</property>

It is hard to give a concrete recommendation about tuning these speculative execution variables. If you are very sensitive to deviations in runtime, you may wish to turn these features on. However, if you have long-running map or reduce tasks due to large amounts of input, the waste could be significant.

Single MapReduce MultiGROUP BY

Another special optimization attempts to combine multiple GROUP BY operations in a query into a single MapReduce job. For this optimization to work, a common set of GROUP BY keys is required:

<property>
  <name>hive.multigroupby.singlemr</name>
  <value>false</value>
  <description>Whether to optimize multi group by query to generate single M/R
  job plan. If the multi group by query has common group by keys, it will be
  optimized to generate single M/R job.</description>
</property>

Virtual Columns

Hive provides two virtual columns: one for the input filename for split and the other for the block offset in the file. These are helpful when diagnosing queries where Hive is producing unexpected or null results. By projecting these “columns,” you can see which file and row is causing problems:

hive> set hive.exec.rowoffset=true;

hive> SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE, line
    > FROM hive_text WHERE line LIKE '%hive%' LIMIT 2;
har://file/user/hive/warehouse/hive_text/folder=docs/
data.har/user/hive/warehouse/hive_text/folder=docs/README.txt   2243
          http://hive.apache.org/

har://file/user/hive/warehouse/hive_text/folder=docs/
data.har/user/hive/warehouse/hive_text/folder=docs/README.txt   3646
- Hive 0.8.0 ignores the hive-default.xml file, though we continue

(We wrapped the long output and put a blank line between the two output rows.)

A third virtual column provides the row offset of the file. It must be enabled explicitly:

<property>
  <name>hive.exec.rowoffset</name>
  <value>true</value>
  <description>Whether to provide the row offset virtual column</description>
</property>

Now it can be used in queries:

hive> SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE,
    >  ROW__OFFSET__INSIDE__BLOCK
    > FROM hive_text WHERE line LIKE '%hive%' limit 2;
file:/user/hive/warehouse/hive_text/folder=docs/README.txt      2243    0
file:/user/hive/warehouse/hive_text/folder=docs/README.txt      3646    0
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.41.214