132 | Big Data Simplied
val words = Array(“one”, “two”, “two”, “three”, “three”, “three”)
valwordPairsRDD = sc.parallelize(words).map(word => (word, 1))
valwordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _).collect()
Points to Ponder
Resilient Distributed Datasets (RDD) is the fundamental data structure of Spark. RDDs are
immutable and fault tolerant in nature. These are distributed collections of objects.
In RDD, the datasets are divided into a logical partition, which is further computed on different
nodes over the cluster.
Spark operation has categorized into two parts, such as Transformations and Actions.
Pair RDD is one type of RDD that data combined with <key, value> pair.
The SparkContext object is created by the Spark driver.
Spark is much faster than conventional Map Reduce due to its speedy memory execution.
6.1.4 Spark Libraries: Spark SQL
The rst library that we are going to study is referred to as Spark SQL. It is named as such because
it works with your data similar to an SQL process. It allows developers to write declarative code,
letting the engine use as much of the data and storage structure as it can to optimize the result
and distributed query behind the scenes. The goal here is to allow the user not to worry about
the distributed nature and focus on the business use-case.
How does Spark SQL compare to its key competitors?
Let us consider Hive. Hive is slower and it often requires complex custom user-defined func-
tions, simply to extend its functionality. Unit testing in Hive can also present its own challenges.
However, Hive is more mature. If you already have an existing Hive database, then there are
mechanisms in Spark to take advantage of the existing Hive table structures, even running 10 to
100 times faster than pure Hive.
Impala, on the other hand, is an established C++ tool which tends to beat Spark in direct
performance benchmarks. It was built from scratch for more specific cases than Spark’s general
engine, so it is especially optimized for this task.
Other than Hive, there are other natively supported data sources, such as JSON and Parquet.
If not native, then it is still fairly simple to pool an external library through Spark packages and
gain support for Avro, Redshift, CSV and a number of other data sources.
Spark SQL has three categories and they are briefly explained below.
DataFrame: A Spark DataFrame is a distributed collection of data organized into named col-
umns that provides operations to filter, group or compute aggregates and it can be used with
M06 Big Data Simplified XXXX 01.indd 132 5/17/2019 2:49:11 PM
Introducing Spark andKafka | 133
Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables
in Hive or external databases.
A DataFrame is a distributed collection of data, which is organized into named columns.
Conceptually, it is equivalent to relational tables with good optimization techniques.
A DataFrame can be constructed from an array of different sources, such as Hive tables,
structured data files, external databases or existing RDDs.
Data Sources: With the addition of the data sources API, Spark SQL now makes it easier to
compute over structured data stored in a wide variety of formats, including Parquet, JSON
and Apache Avro library.
JDBC Server: The built-in JDBC server makes it easy to connect to the structured data stored
in relational database tables and perform big data analytics using traditional BI tools.
What is SQLContext?
SQLContext is a class and it is used for initializing the functionalities of Spark SQL. SparkContext
class object (sc) is required for initializing SQLContext class object.
scala>valsqlcontext = new org.apache.spark.sql.SQLContext(sc)
Let’s create a json le as named below, emp.json inside ‘/usr/local’.
{
{“id” : “100”, “name” : “James”, “age” : “25”}
{“id” : “101”, “name” : “Munmun”, “age” : “28”}
{“id” : “102”, “name” : “Joy”, “age” : “39”}
{“id” : “103”, “name” : “Mohor”, “age” : “23”}
{“id” : “104”, “name” : “Puneet”, “age” : “23”}
{“id” : “105”, “name” : “Aishwarya”, “age” : “26”}
}
scala>valdfs = sqlcontext.read.json(“/usr/local/emp.json”)
scala>dfs.show()
M06 Big Data Simplified XXXX 01.indd 133 5/17/2019 2:49:12 PM
134 | Big Data Simplied
scala>dfs.printSchema()
Use the following command to fetch name-column among the three columns from the DataFrame.
scala>dfs.select(“name”).show()
Use the following command to nd the employees whose age is greater than 23 (age > 23).
scala>dfs.filter(dfs(“age”) > 23).show()
Use the following command for counting the number of employees who are of the same age.
scala>dfs.groupBy(“age”).count().show()
M06 Big Data Simplified XXXX 01.indd 134 5/17/2019 2:49:12 PM
Introducing Spark andKafka | 135
Spark SQL with Hive
scala>import org.apache.spark.sql.hive.HiveContext
scala>valhiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala>import hiveContext._
//to show all the hive tables from spark shell
scala>valhive_tables=hiveContext.sql(“show tables”).collect.
foreach(println)
//to show all hive databases from spark shell
scala>valhive_dbs=hiveContext.sql(“show databases”).collect.
foreach(println)
M06 Big Data Simplified XXXX 01.indd 135 5/17/2019 2:49:13 PM
136 | Big Data Simplied
//create a table named ‘empSpark’ in hive from spark shell
valcreate_table = hiveContext.sql(“create table
empSpark(accnostring,dtstring,ctrycdint,groupidint,namestring,ctry
string) row format delimited fields terminated by ‘,’ stored as
textfile”)
//put a text file emp.txt in HDFS with above schema load it in table
‘empSpark’ from spark
valload_data = hiveContext.sql(“load data inpath ‘/ana/data/employee /
emp.txt’ into table empSpark”)
//to display all the rows in table ‘empSpark’
valload_data = hiveContext.sql(“select * from empSpark”).collect.
foreach(println)
//to display all the countries and distinct country in table ‘empSpark’
valemp_country = hiveContext.sql(“select ctry from empSpark”).collect.
foreach(println)
M06 Big Data Simplified XXXX 01.indd 136 5/17/2019 2:49:13 PM
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.163.197