Using Hive

If you have a business intelligence-type workload with low latency requirements and multiple users, then you might consider using Impala for your database access. Apache Spark on Hive is for batch processing and ETL chains. This section will be used to show how to connect Spark to Hive, and how to use this configuration. First, I will develop an application that uses a local Hive Metastore, and show that it does not store and persist table data in Hive itself. I will then set up Apache Spark to connect to the Hive Metastore server, and store tables and data within Hive. I will start with the local Metastore server.

Local Hive Metastore server

The following example Scala code shows how to create a Hive context, and create a Hive-based table using Apache Spark. First, the Spark configuration, context, SQL, and Hive classes are imported. Then, an object class called hive_ex1, and the main method are defined. The application name is defined, and a Spark configuration object is created. The Spark context is then created from the configuration object:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

object hive_ex1 {

  def main(args: Array[String]) {

    val appName = "Hive Spark Ex 1"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc = new SparkContext(conf)

Next, I create a new Hive context from the Spark context, and import the Hive implicits, and the Hive context SQL. The implicits allow for implicit conversions, and the SQL include allows me to run Hive context-based SQL:

    val hiveContext = new HiveContext(sc)

    import hiveContext.implicits._
    import hiveContext.sql

The next statement creates an empty table called adult2 in Hive. You will recognize the schema from the adult data that has already been used in this chapter:

     hiveContext.sql( "    
         CREATE TABLE IF NOT EXISTS adult2
            (
              idx             INT,
              age             INT,
              workclass       STRING,
              fnlwgt          INT,
              education       STRING,
              educationnum    INT,
              maritalstatus   STRING,
              occupation      STRING,
              relationship    STRING,
              race            STRING,
              gender          STRING,
              capitalgain     INT,
              capitalloss     INT,
              nativecountry   STRING,
              income          STRING
            )
    
                     ")

Next, a row count is taken from the table called adult2 via a COUNT(*), and the output value is printed:

   val resRDD = hiveContext.sql("SELECT COUNT(*) FROM adult2")

   resRDD.map(t => "Count : " + t(0) ).collect().foreach(println)

As expected, there are no rows in the table.

Count : 0

It is also possible to create Hive-based external tables in Apache Spark Hive. The following HDFS file listing shows that the CSV file called adult.train.data2 exists in the HDFS directory called /data/spark/hive, and it contains data:

[hadoop@hc2nn hive]$ hdfs dfs -ls /data/spark/hive
Found 1 items
-rw-r--r--   3 hadoop supergroup    4171350 2015-06-24 15:18 /data/spark/hive/adult.train.data2

Now, I adjust my Scala-based Hive SQL to create an external table called adult3 (if it does not exist), which has the same structure as the previous table. The row format in this table-create statement specifies a comma as a row column delimiter, as would be expected for CSV data. The location option in this statement specifies the /data/spark/hive directory on HDFS for data. So, there can be multiple files on HDFS, in this location, to populate this table. Each file would need to have the same data structure matching this table structure:

    hiveContext.sql("

        CREATE EXTERNAL TABLE IF NOT EXISTS adult3
           (
             idx             INT,
             age             INT,
             workclass       STRING,
             fnlwgt          INT,
             education       STRING,
             educationnum    INT,
             maritalstatus   STRING,
             occupation      STRING,
             relationship    STRING,
             race            STRING,
             gender          STRING,
             capitalgain     INT,
             capitalloss     INT,
             nativecountry   STRING,
             income          STRING
           )
           ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
           LOCATION '/data/spark/hive'

                   ")

A row count is then taken against the adult3 table, and the count is printed:

    val resRDD = hiveContext.sql("SELECT COUNT(*) FROM adult3")

    resRDD.map(t => "Count : " + t(0) ).collect().foreach(println)

As you can see, the table now contains around 32,000 rows. Since this is an external table, the HDFS-based data has not been moved, and the row calculation has been derived from the underlying CSV-based data.

Count : 32561

It occurs to me that I want to start stripping dimension data out of the raw CSV-based data in the external adult3 table. After all, Hive is a data warehouse, so a part of a general ETL chain using the raw CSV-based data would strip dimensions and objects from the data, and create new tables. If I consider the education dimension, and try to determine what unique values exist, then for instance, the SQL would be as follows:

    val resRDD = hiveContext.sql("

       SELECT DISTINCT education AS edu FROM adult3
         ORDER BY edu

                   ")

    resRDD.map(t => t(0) ).collect().foreach(println)

And the ordered data matches the values that were derived earlier in this chapter using Spark SQL:

 10th
 11th
 12th
 1st-4th
 5th-6th
 7th-8th
 9th
 Assoc-acdm
 Assoc-voc
 Bachelors
 Doctorate
 HS-grad
 Masters
 Preschool
 Prof-school
 Some-college

This is useful, but what if I want to create dimension values, and then assign integer index values to each of the previous education dimension values. For instance, 10th would be 0, and 11th would be 1. I have set up a dimension CSV file for the education dimension on HDFS, as shown here. The contents just contain the list of unique values, and an index:

[hadoop@hc2nn hive]$ hdfs dfs -ls /data/spark/dim1/
Found 1 items
-rw-r--r--   3 hadoop supergroup        174 2015-06-25 14:08 /data/spark/dim1/education.csv
[hadoop@hc2nn hive]$ hdfs dfs -cat /data/spark/dim1/education.csv
1,10th
2,11th
3,12th

Now, I can run some Hive QL in my Apache application to create an education dimension table. First, I drop the education table if it already exists, then I create the table by parsing the HDFS CSV file:

    hiveContext.sql("  DROP TABLE IF EXISTS education ")
    hiveContext.sql("

      CREATE TABLE IF NOT EXISTS  education
        (
          idx        INT,
          name       STRING
        )
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
        LOCATION '/data/spark/dim1/'
                   ")

I can then select the contents of the new education table to ensure that it looks correct.

val resRDD = hiveContext.sql(" SELECT * FROM education ")
resRDD.map( t => t(0)+" "+t(1) ).collect().foreach(println)

This gives the expected list of indexes and the education dimension values:

1 10th
2 11th
3 12th
………
16 Some-college

So, I have the beginnings of an ETL pipeline. The raw CSV data is being used as external tables, and the dimension tables are being created, which could then be used to convert the dimensions in the raw data to numeric indexes. I have now successfully created a Spark application, which uses a Hive context to connect to a Hive Metastore server, which allows me to create and populate tables.

I have the Hadoop stack Cloudera CDH 5.3 installed on my Linux servers. I am using it for HDFS access while writing this book, and I also have Hive and Hue installed and running (CDH install information can be found at the Cloudera website at http://cloudera.com/content/cloudera/en/documentation.html). When I check HDFS for the adult3 table, which should have been created under /user/hive/warehouse, I see the following:

[hadoop@hc2nn hive]$ hdfs dfs -ls /user/hive/warehouse/adult3
ls: `/user/hive/warehouse/adult3': No such file or directory

The Hive-based table does not exist in the expected place for Hive. I can confirm this by checking the Hue Metastore manager to see what tables exist in the default database. The following figure shows that my default database is currently empty. I have added red lines to show that I am currently looking at the default database, and that there is no data. Clearly, when I run an Apache Spark-based application, with a Hive context, I am connecting to a Hive Metastore server. I know this because the log indicates that this is the case and also, my tables created in this way persist when Apache Spark is restarted.

Local Hive Metastore server

The Hive context within the application that was just run has used a local Hive Metastore server, and has stored data to a local location; actually in this case under /tmp on HDFS. I now want to use the Hive-based Metastore server, so that I can create tables and data in Hive directly. The next section will show how this can be done.

A Hive-based Metastore server

I already mentioned that I am using Cloudera's CDH 5.3 Hadoop stack. I have Hive, HDFS, Hue, and Zookeeper running. I am using Apache Spark 1.3.1 installed under /usr/local/spark, in order to create and run applications (I know that CDH 5.3 is released with Spark 1.2, but I wanted to use DataFrames in this instance, which were available in Spark 1.3.x.).

The first thing that I need to do to configure Apache Spark to connect to Hive, is to drop the Hive configuration file called hive-site.xml into the Spark configuration directory on all servers where Spark is installed:

[hadoop@hc2nn bin]# cp /var/run/cloudera-scm-agent/process/1237-hive-HIVEMETASTORE/hive-site.xml /usr/local/spark/conf

Then, given that I have installed Apache Hive via the CDH Manager to be able to use PostgreSQL, I need to install a PostgreSQL connector JAR for Spark, else it won't know how to connect to Hive, and errors like this will occur:

15/06/25 16:32:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (s)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.
Caused by: java.lang.reflect.InvocationTargetException
Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factor
Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "dbcp-builtin" pnectionPool gave an 
error : The specified datastore driver ("org.postgresql.Driver") was not f. Please check your CLASSPATH
specification, and the name of the driver.
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The spver
("org.postgresql.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specme of the driver.

I have stripped that error message down to just the pertinent parts, otherwise it would have been many pages long. I have determined the version of PostgreSQL that I have installed, as follows. It appears to be of version 9.0, determined from the Cloudera parcel-based jar file:

[root@hc2nn jars]# pwd ; ls postgresql*
/opt/cloudera/parcels/CDH/jars
postgresql-9.0-801.jdbc4.jar

Next, I have used the https://jdbc.postgresql.org/ website to download the necessary PostgreSQL connector library. I have determined my Java version to be 1.7, as shown here, which affects which version of library to use:

[hadoop@hc2nn spark]$ java -version
java version "1.7.0_75"
OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

The site says that if you are using Java 1.7 or 1.8, then you should use the JDBC41 version of the library. So, I have sourced the postgresql-9.4-1201.jdbc41.jar file. The next step is to copy this file to the Apache Spark install lib directory, as shown here:

[hadoop@hc2nn lib]$ pwd ; ls -l postgresql*
/usr/local/spark/lib
-rw-r--r-- 1 hadoop hadoop 648487 Jun 26 13:20 postgresql-9.4-1201.jdbc41.jar

Now, the PostgreSQL library must be added to the Spark CLASSPATH, by adding an entry to the file called compute-classpath.sh, in the Spark bin directory, as shown here:

[hadoop@hc2nn bin]$ pwd ; tail compute-classpath.sh
/usr/local/spark/bin

# add postgresql connector to classpath
appendToClasspath "${assembly_folder}"/postgresql-9.4-1201.jdbc41.jar

echo "$CLASSPATH"

In my case, I encountered an error regarding Hive versions between CDH 5.3 Hive and Apache Spark as shown here. I thought that the versions were so close that I should be able to ignore this error:

Caused by: MetaException(message:Hive Schema version 0.13.1aa does not match metastore's schema version 0.13.0

Metastore is not upgraded or corrupt)

I decided, in this case, to switch off schema verification in my Spark version of the hive-site.xml file. This had to be done in all the Spark-based instances of this file, and then Spark restarted. The change is shown here; the value is set to false:

  <property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
  </property>

Now, when I run the same set of application-based SQL as the last section, I can create objects in the Apache Hive default database. First, I will create the empty table called adult2 using the Spark-based Hive context:

    hiveContext.sql( "

        CREATE TABLE IF NOT EXISTS adult2
           (
             idx             INT,
             age             INT,
             workclass       STRING,
             fnlwgt          INT,
             education       STRING,
             educationnum    INT,
             maritalstatus   STRING,
             occupation      STRING,
             relationship    STRING,
             race            STRING,
             gender          STRING,
             capitalgain     INT,
             capitalloss     INT,
             nativecountry   STRING,
             income          STRING
           )

                    ")

As you can see, when I run the application and check the Hue Metastore browser, the table adult2 now exists:

A Hive-based Metastore server

I have shown the table entry previously, and it's structure is obtained by selecting the table entry called adult2, in the Hue default database browser:

A Hive-based Metastore server

Now the external table adult3 Spark based Hive QL can be executed and data access confirmed from Hue. In the last section, the necessary Hive QL was as follows:

    hiveContext.sql("

        CREATE EXTERNAL TABLE IF NOT EXISTS adult3
           (
             idx             INT,
             age             INT,
             workclass       STRING,
             fnlwgt          INT,
             education       STRING,
             educationnum    INT,
             maritalstatus   STRING,
             occupation      STRING,
             relationship    STRING,
             race            STRING,
             gender          STRING,
             capitalgain     INT,
             capitalloss     INT,
             nativecountry   STRING,
             income          STRING
           )
           ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
           LOCATION '/data/spark/hive'

                   ")

As you can now see, the Hive-based table called adult3 has been created in the default database by Spark. The following figure is again generated from the Hue Metastore browser:

A Hive-based Metastore server

The following Hive QL has been executed from the Hue Hive query editor. It shows that the adult3 table is accessible from Hive. I have limited the rows to make the image presentable. I am not worried about the data, only the fact that I can access it:

A Hive-based Metastore server

The last thing that I will mention in this section which will be useful when using Hive QL from Spark against Hive, will be user-defined functions or UDF's. As an example, I will consider the row_sequence function, which is used in the following Scala-based code:

hiveContext.sql("

ADD JAR /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar

  ")

hiveContext.sql("

CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

  ")

     val resRDD = hiveContext.sql("

          SELECT row_sequence(),t1.edu FROM
            ( SELECT DISTINCT education AS edu FROM adult3 ) t1
          ORDER BY t1.edu

                    ")

Either existing, or your own, JAR-based libraries can be made available to your Spark Hive session via the ADD JAR command. Then, the functionality within that library can be registered as a temporary function with CREATE TEMPORARY FUNCTION using the package-based class name. Then, the new function name can be incorporated in Hive QL statements.

This chapter has managed to connect an Apache Spark-based application to Hive, and run Hive QL against Hive, so that table and data changes persist in Hive. But why is this important? Well, Spark is an in-memory parallel processing system. It is an order faster than Hadoop-based Map Reduce in processing speed. Apache Spark can now be used as a processing engine, whereas the Hive data warehouse can be used for storage. Fast in-memory Spark-based processing speed coupled with big data scale structured data warehouse storage available in Hive.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.79.147