If you have a business intelligence-type workload with low latency requirements and multiple users, then you might consider using Impala for your database access. Apache Spark on Hive is for batch processing and ETL chains. This section will be used to show how to connect Spark to Hive, and how to use this configuration. First, I will develop an application that uses a local Hive Metastore, and show that it does not store and persist table data in Hive itself. I will then set up Apache Spark to connect to the Hive Metastore server, and store tables and data within Hive. I will start with the local Metastore server.
The following example Scala code shows how to create a Hive context, and create a Hive-based table using Apache Spark. First, the Spark configuration, context, SQL, and Hive classes are imported. Then, an object class called hive_ex1
, and the main method are defined. The application name is defined, and a Spark configuration object is created. The Spark context is then created from the configuration object:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext object hive_ex1 { def main(args: Array[String]) { val appName = "Hive Spark Ex 1" val conf = new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf)
Next, I create a new Hive context from the Spark context, and import the Hive implicits, and the Hive context SQL. The implicits
allow for implicit conversions, and the SQL include allows me to run Hive context-based SQL:
val hiveContext = new HiveContext(sc) import hiveContext.implicits._ import hiveContext.sql
The next statement creates an empty table called adult2
in Hive. You will recognize the schema from the adult data that has already been used in this chapter:
hiveContext.sql( " CREATE TABLE IF NOT EXISTS adult2 ( idx INT, age INT, workclass STRING, fnlwgt INT, education STRING, educationnum INT, maritalstatus STRING, occupation STRING, relationship STRING, race STRING, gender STRING, capitalgain INT, capitalloss INT, nativecountry STRING, income STRING ) ")
Next, a row count is taken from the table called adult2
via a COUNT(*)
, and the output value is printed:
val resRDD = hiveContext.sql("SELECT COUNT(*) FROM adult2") resRDD.map(t => "Count : " + t(0) ).collect().foreach(println)
As expected, there are no rows in the table.
Count : 0
It is also possible to create Hive-based external tables in Apache Spark Hive. The following HDFS file listing shows that the CSV file called adult.train.data2
exists in the HDFS directory called /data/spark/hive
, and it contains data:
[hadoop@hc2nn hive]$ hdfs dfs -ls /data/spark/hive Found 1 items -rw-r--r-- 3 hadoop supergroup 4171350 2015-06-24 15:18 /data/spark/hive/adult.train.data2
Now, I adjust my Scala-based Hive SQL to create an external table called adult3
(if it does not exist), which has the same structure as the previous table. The row format in this table-create statement specifies a comma as a row column delimiter, as would be expected for CSV data. The location option in this statement specifies the /data/spark/hive
directory on HDFS for data. So, there can be multiple files on HDFS, in this location, to populate this table. Each file would need to have the same data structure matching this table structure:
hiveContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS adult3 ( idx INT, age INT, workclass STRING, fnlwgt INT, education STRING, educationnum INT, maritalstatus STRING, occupation STRING, relationship STRING, race STRING, gender STRING, capitalgain INT, capitalloss INT, nativecountry STRING, income STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/spark/hive' ")
A row count is then taken against the adult3
table, and the count is printed:
val resRDD = hiveContext.sql("SELECT COUNT(*) FROM adult3") resRDD.map(t => "Count : " + t(0) ).collect().foreach(println)
As you can see, the table now contains around 32,000 rows. Since this is an external table, the HDFS-based data has not been moved, and the row calculation has been derived from the underlying CSV-based data.
Count : 32561
It occurs to me that I want to start stripping dimension data out of the raw CSV-based data in the external adult3
table. After all, Hive is a data warehouse, so a part of a general ETL chain using the raw CSV-based data would strip dimensions and objects from the data, and create new tables. If I consider the education dimension, and try to determine what unique values exist, then for instance, the SQL would be as follows:
val resRDD = hiveContext.sql(" SELECT DISTINCT education AS edu FROM adult3 ORDER BY edu ") resRDD.map(t => t(0) ).collect().foreach(println)
And the ordered data matches the values that were derived earlier in this chapter using Spark SQL:
10th 11th 12th 1st-4th 5th-6th 7th-8th 9th Assoc-acdm Assoc-voc Bachelors Doctorate HS-grad Masters Preschool Prof-school Some-college
This is useful, but what if I want to create dimension values, and then assign integer index values to each of the previous education dimension values. For instance, 10th
would be 0
, and 11th
would be 1
. I have set up a dimension CSV file for the education dimension on HDFS, as shown here. The contents just contain the list of unique values, and an index:
[hadoop@hc2nn hive]$ hdfs dfs -ls /data/spark/dim1/ Found 1 items -rw-r--r-- 3 hadoop supergroup 174 2015-06-25 14:08 /data/spark/dim1/education.csv [hadoop@hc2nn hive]$ hdfs dfs -cat /data/spark/dim1/education.csv 1,10th 2,11th 3,12th
Now, I can run some Hive QL in my Apache application to create an education dimension table. First, I drop the education table if it already exists, then I create the table by parsing the HDFS CSV file:
hiveContext.sql(" DROP TABLE IF EXISTS education ") hiveContext.sql(" CREATE TABLE IF NOT EXISTS education ( idx INT, name STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/spark/dim1/' ")
I can then select the contents of the new education table to ensure that it looks correct.
val resRDD = hiveContext.sql(" SELECT * FROM education ") resRDD.map( t => t(0)+" "+t(1) ).collect().foreach(println)
This gives the expected list of indexes and the education dimension values:
1 10th 2 11th 3 12th ……… 16 Some-college
So, I have the beginnings of an ETL pipeline. The raw CSV data is being used as external tables, and the dimension tables are being created, which could then be used to convert the dimensions in the raw data to numeric indexes. I have now successfully created a Spark application, which uses a Hive context to connect to a Hive Metastore server, which allows me to create and populate tables.
I have the Hadoop stack Cloudera CDH 5.3 installed on my Linux servers. I am using it for HDFS access while writing this book, and I also have Hive and Hue installed and running (CDH install information can be found at the Cloudera website at http://cloudera.com/content/cloudera/en/documentation.html). When I check HDFS for the adult3
table, which should have been created under /user/hive/warehouse
, I see the following:
[hadoop@hc2nn hive]$ hdfs dfs -ls /user/hive/warehouse/adult3 ls: `/user/hive/warehouse/adult3': No such file or directory
The Hive-based table does not exist in the expected place for Hive. I can confirm this by checking the Hue Metastore manager to see what tables exist in the default database. The following figure shows that my default database is currently empty. I have added red lines to show that I am currently looking at the default database, and that there is no data. Clearly, when I run an Apache Spark-based application, with a Hive context, I am connecting to a Hive Metastore server. I know this because the log indicates that this is the case and also, my tables created in this way persist when Apache Spark is restarted.
The Hive context within the application that was just run has used a local Hive Metastore server, and has stored data to a local location; actually in this case under /tmp
on HDFS. I now want to use the Hive-based Metastore server, so that I can create tables and data in Hive directly. The next section will show how this can be done.
I already mentioned that I am using Cloudera's CDH 5.3 Hadoop stack. I have Hive, HDFS, Hue, and Zookeeper running. I am using Apache Spark 1.3.1 installed under /usr/local/spark
, in order to create and run applications (I know that CDH 5.3 is released with Spark 1.2, but I wanted to use DataFrames in this instance, which were available in Spark 1.3.x.).
The first thing that I need to do to configure Apache Spark to connect to Hive, is to drop the Hive configuration file called hive-site.xml
into the Spark configuration directory on all servers where Spark is installed:
[hadoop@hc2nn bin]# cp /var/run/cloudera-scm-agent/process/1237-hive-HIVEMETASTORE/hive-site.xml /usr/local/spark/conf
Then, given that I have installed Apache Hive via the CDH Manager to be able to use PostgreSQL, I need to install a PostgreSQL connector JAR for Spark, else it won't know how to connect to Hive, and errors like this will occur:
15/06/25 16:32:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (s) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore. Caused by: java.lang.reflect.InvocationTargetException Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factor Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "dbcp-builtin" pnectionPool gave an error : The specified datastore driver ("org.postgresql.Driver") was not f. Please check your CLASSPATH specification, and the name of the driver. Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The spver ("org.postgresql.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specme of the driver.
I have stripped that error message down to just the pertinent parts, otherwise it would have been many pages long. I have determined the version of PostgreSQL that I have installed, as follows. It appears to be of version 9.0, determined from the Cloudera parcel-based jar file:
[root@hc2nn jars]# pwd ; ls postgresql* /opt/cloudera/parcels/CDH/jars postgresql-9.0-801.jdbc4.jar
Next, I have used the https://jdbc.postgresql.org/ website to download the necessary PostgreSQL connector library. I have determined my Java version to be 1.7, as shown here, which affects which version of library to use:
[hadoop@hc2nn spark]$ java -version java version "1.7.0_75" OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
The site says that if you are using Java 1.7 or 1.8, then you should use the JDBC41 version of the library. So, I have sourced the postgresql-9.4-1201.jdbc41.jar
file. The next step is to copy this file to the Apache Spark install lib
directory, as shown here:
[hadoop@hc2nn lib]$ pwd ; ls -l postgresql* /usr/local/spark/lib -rw-r--r-- 1 hadoop hadoop 648487 Jun 26 13:20 postgresql-9.4-1201.jdbc41.jar
Now, the PostgreSQL library must be added to the Spark CLASSPATH
, by adding an entry to the file called compute-classpath.sh
, in the Spark bin
directory, as shown here:
[hadoop@hc2nn bin]$ pwd ; tail compute-classpath.sh /usr/local/spark/bin # add postgresql connector to classpath appendToClasspath "${assembly_folder}"/postgresql-9.4-1201.jdbc41.jar echo "$CLASSPATH"
In my case, I encountered an error regarding Hive versions between CDH 5.3 Hive and Apache Spark as shown here. I thought that the versions were so close that I should be able to ignore this error:
Caused by: MetaException(message:Hive Schema version 0.13.1aa does not match metastore's schema version 0.13.0 Metastore is not upgraded or corrupt)
I decided, in this case, to switch off schema verification in my Spark version of the hive-site.xml
file. This had to be done in all the Spark-based instances of this file, and then Spark restarted. The change is shown here; the value is set to false
:
<property> <name>hive.metastore.schema.verification</name> <value>false</value> </property>
Now, when I run the same set of application-based SQL as the last section, I can create objects in the Apache Hive default database. First, I will create the empty table called adult2
using the Spark-based Hive context:
hiveContext.sql( " CREATE TABLE IF NOT EXISTS adult2 ( idx INT, age INT, workclass STRING, fnlwgt INT, education STRING, educationnum INT, maritalstatus STRING, occupation STRING, relationship STRING, race STRING, gender STRING, capitalgain INT, capitalloss INT, nativecountry STRING, income STRING ) ")
As you can see, when I run the application and check the Hue Metastore browser, the table adult2
now exists:
I have shown the table entry previously, and it's structure is obtained by selecting the table entry called adult2
, in the Hue default database browser:
Now the external table adult3
Spark based Hive QL can be executed and data access confirmed from Hue. In the last section, the necessary Hive QL was as follows:
hiveContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS adult3 ( idx INT, age INT, workclass STRING, fnlwgt INT, education STRING, educationnum INT, maritalstatus STRING, occupation STRING, relationship STRING, race STRING, gender STRING, capitalgain INT, capitalloss INT, nativecountry STRING, income STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/spark/hive' ")
As you can now see, the Hive-based table called adult3
has been created in the default database by Spark. The following figure is again generated from the Hue Metastore browser:
The following Hive QL has been executed from the Hue Hive query editor. It shows that the adult3
table is accessible from Hive. I have limited the rows to make the image presentable. I am not worried about the data, only the fact that I can access it:
The last thing that I will mention in this section which will be useful when using Hive QL from Spark against Hive, will be user-defined functions or UDF's. As an example, I will consider the row_sequence
function, which is used in the following Scala-based code:
hiveContext.sql(" ADD JAR /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar ") hiveContext.sql(" CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; ") val resRDD = hiveContext.sql(" SELECT row_sequence(),t1.edu FROM ( SELECT DISTINCT education AS edu FROM adult3 ) t1 ORDER BY t1.edu ")
Either existing, or your own, JAR-based libraries can be made available to your Spark Hive session via the ADD JAR
command. Then, the functionality within that library can be registered as a temporary function with CREATE TEMPORARY FUNCTION
using the package-based class name. Then, the new function name can be incorporated in Hive QL statements.
This chapter has managed to connect an Apache Spark-based application to Hive, and run Hive QL against Hive, so that table and data changes persist in Hive. But why is this important? Well, Spark is an in-memory parallel processing system. It is an order faster than Hadoop-based Map Reduce in processing speed. Apache Spark can now be used as a processing engine, whereas the Hive data warehouse can be used for storage. Fast in-memory Spark-based processing speed coupled with big data scale structured data warehouse storage available in Hive.
3.16.79.147