Interaction with Cassandra

Apache Cassandra is an open source NoSQL database. It falls in the Column family based database category. It is developed at Facebook and built on the concepts on Amazon's Dynamo. It is designed to store large amounts of data in a distributed fashion.

Cassandra is highly scalable with no single point of failure. It satisfies availability and partition tolerance categories of the CAP theorem with tunable consistency, that is, Cassandra is by default eventually consistent, however, consistency in Cassandra can be increased at the cost of availability. Cassandra is made to support very high-speed writes and due to its excellent features, it is one of the most widely used databases in the NoSQl World.

To read more about Cassandra, you can refer to https://academy.datastax.com/planet-cassandra.

To interact with Cassandra from Spark, spark-cassandra-connector is required. The following is the Maven template for spark-cassandra-connector:

<dependency> 
               <groupId>com.datastax.spark</groupId> 
               <artifactId>spark-cassandra-connector_2.11</artifactId> 
               <version>2.0.0-M1</version> 
</dependency> 
 
The preceding specified version works with Spark 2.x.x versions. Users can change the version as per their requirement.

Consider, we have a table in Cassandra with the following definition in keyspace named my_keyspace:

CREATE TABLE my_keyspace.emp ( 
    empid int PRIMARY KEY, 
    emp_dept text, 
    emp_name text 
) 

To read the data, an Employee POJO object is needed, as follows:

public class Employee implements Serializable{     
private Integer empid; private String emp_name; private String emp_dept; public String toString() { return "Employee [empid=" + empid + ", emp_name=" + emp_name + ",
emp_dept=" + emp_dept + "]"; } public Integer getEmpid() { return empid; } public void setEmpid(Integer empid) { this.empid = empid; } public String getEmp_name() { return emp_name; } public void setEmp_name(String emp_name) { this.emp_name = emp_name; } public String getEmp_dept() { return emp_dept; } public void setEmp_dept(String emp_dept) { this.emp_dept = emp_dept; } }
The name of the columns of the Cassandra table should exactly match the name of the fields on the class, or else it throws exceptions. Also, the object must be serializable.

To connect Spark to Cassandra, Cassandra configurations need to provide to Spark as follows:

SparkConf conf =new SparkConf().setMaster("local").setAppName("Cassandra Example"); 
conf.set("spark.cassandra.connection.host", "127.0.0.1"); 
 
JavaSparkContext jsc=new JavaSparkContext(conf); 
 

Now, the rows of the Cassandra table can be mapped to the preceding POJO class using the CassandraJavaUtil library provided by spark-cassandra-connector as follows:

JavaRDD<Employee> cassandraRDD = CassandraJavaUtil.javaFunctions(jsc).cassandraTable("my_keyspace", "emp",CassandraJavaUtil.mapRowTo(Employee.class)); 
 

As shown in the preceding snippet, Cassandra rows have mapped to Employee objects and this returns an RDD of employee objects. An example of the collect() action can be executed on this RDD as follows:

cassandraRDD.collect().forEach(System.out::println);     

To load only one column from the table, the CassandraJavaUtil library can be used as follows:

JavaRDD<String> selectEmpDept =CassandraJavaUtil.javaFunctions(jsc).cassandraTable("my_keyspace", "emp",CassandraJavaUtil.mapColumnTo(String.class)).select("emp_dept"); 

Here we have a used datatype of column emp_dept as an argument of the mapColumnTo method. To find out the other operations that can be executed on Cassandra data using the CassandraJavaUtil library, please refer to the following URL: https://github.com/datastax/spark-cassandra-connector.

To write the results back to Cassandra, the same library can be used, as follows:

CassandraJavaUtil.javaFunctions(cassandraTable) 
        .writerBuilder("my_keyspace", "emp", CassandraJavaUtil.mapToRow(Employee.class)).saveToCassandra(); 

Similar to the read operation, the POJO object is mapped to the column of the Cassandra table before persisting the results.

saveToCassandra() is an action.

This is a bit of a tedious way. Let's look into another way to load the Cassandra table as a dataset. A dataset is an RDD with schema. Using the dataset API, an RDD can be viewed as a tabular structure, as it loads schema as well as data.

To load a Cassandra table as a datasets, we need to create a SparkSession object. As explained in Chapter 1, Introduction to Spark, SparkSession is the single entry point for a Spark program. Prior to Spark 2.x, SparkContext, SQLContext, and so on needed to be created separately. However, in Spark 2.x, SparkSession performs all these roles.

We will discuss more about dataset and SparkSession while discussing Spark SQL in Chapter 8, Working with Spark SQL.

Let's start by creating a SparkSession object:

 SparkSession sparkSession = SparkSession.builder().master("local").appName("My App") 
                      .config("spark.cassandra.connection.host", "localhost").getOrCreate(); 

The next step is to load the data from Cassandra using the SparkSession object:

 Map<String, String> map = new HashMap<>(); 
     map.put("table", "table_name"); 
     map.put("keyspace", "keyspace_name"); 
 
Dataset<Row> cassDataset = sparkSession.read().format("org.apache.spark.sql.cassandra").options(map).load();

As per the preceding code, it accepts a map object where users can specify table properties.

Now, similar to the collect() function of RDD, the show() function can be executed on the dataset to read the data:

cassDataset.show(); 
The show() function is an action.

This will return the data along with column names. So, it is easier to interpret the data, as a schema is also provided. Also, temporary view can be created on the dataset, using which, users can run SQL on the data:

cassDataset.createOrReplaceTempView("cass_data_view"); 
 sparkSession.sql("Select * from cass_data_view"); 

Similar to Cassandra, Spark Connectors are available for other databases, for example, MongoDB, HBase, and so on, which can be leveraged to connect Spark to these databases.

In this section, we learnt about connecting Spark to various data sources and reading the data. In the next section, we will learn how to handle different types of structured data in Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.134.151