Titan with Cassandra

In this section, the Cassandra NoSQL database will be used as a storage mechanism for Titan. Although it does not use Hadoop, it is a large-scale, cluster-based database in its own right, and can scale to very large cluster sizes. This section will follow the same process. As for HBase, a graph will be created, and stored in Cassandra using the Titan Gremlin shell. It will then be checked using Gremlin, and the stored data will be checked in Cassandra. The raw Titan Cassandra graph-based data will then be accessed from Spark. The first step then will be to install Cassandra on each node in the cluster.

Installing Cassandra

Create a repo file that will allow the community version of DataStax Cassandra to be installed using the Linux yum command. Root access will be required for this, so the su command has been used to switch the user to the root. Install Cassandra on all the nodes:

[hadoop@hc2nn lib]$ su -
[root@hc2nn ~]# vi /etc/yum.repos.d/datastax.repo

[datastax]
name= DataStax Repo for Apache Cassandra
baseurl=http://rpm.datastax.com/community
enabled=1
gpgcheck=0

Now, install Cassandra on each node in the cluster using the Linux yum command:

[root@hc2nn ~]# yum -y install dsc20-2.0.13-1 cassandra20-2.0.13-1

Set up the Cassandra configuration under /etc/cassandra/conf by altering the cassandra.yaml file:

[root@hc2nn ~]# cd /etc/cassandra/conf   ; vi cassandra.yaml

I have made the following changes to specify my cluster name, the server seed IP addresses, the RPC address, and the snitch value. Seed nodes are the nodes that the other nodes will try to connect to first. In this case, the NameNode (103), and node2 (108) have been used as seeds. The snitch method manages network topology and routing:

cluster_name: 'Cluster1'
seeds: "192.168.1.103,192.168.1.108"
listen_address:
rpc_address: 0.0.0.0
endpoint_snitch: GossipingPropertyFileSnitch

Cassandra can now be started on each node as root using the service command:

[root@hc2nn ~]# service cassandra start

Log files can be found under /var/log/cassandra, and the data is stored under /var/lib/cassandra. The nodetool command can be used on any Cassandra node to check the status of the Cassandra cluster:

[root@hc2nn cassandra]# nodetool status
Datacenter: DC1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens  Owns (effective)  Host ID Rack
UN  192.168.1.105  63.96 KB   256     37.2%             f230c5d7-ff6f-43e7-821d-c7ae2b5141d3  RAC1
UN  192.168.1.110  45.86 KB   256     39.9%             fc1d80fe-6c2d-467d-9034-96a1f203c20d  RAC1
UN  192.168.1.109  45.9 KB    256     40.9%             daadf2ee-f8c2-4177-ae72-683e39fd1ea0  RAC1
UN  192.168.1.108  50.44 KB   256     40.5%             b9d796c0-5893-46bc-8e3c-187a524b1f5a  RAC1
UN  192.168.1.103  70.68 KB   256     41.5%             53c2eebd-
a66c-4a65-b026-96e232846243  RAC1

The Cassandra CQL shell command called cqlsh can be used to access the cluster, and create objects. The shell is invoked next, and it shows that Cassandra version 2.0.13 is installed:

[hadoop@hc2nn ~]$ cqlsh
Connected to Cluster1 at localhost:9160.
[cqlsh 4.1.1 | Cassandra 2.0.13 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh>

The Cassandra query language next shows a key space called keyspace1 that is being created and used via the CQL shell:

cqlsh> CREATE KEYSPACE keyspace1 WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

cqlsh> USE keyspace1;

cqlsh:keyspace1> SELECT * FROM system.schema_keyspaces;

 keyspace_name | durable_writes | strategy_class                              | strategy_options
--------------+------+---------------------------------------------+----------------------------
   keyspace1  | True | org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"1"}
      system  | True |  org.apache.cassandra.locator.LocalStrategy |                         {}
system_traces | True | org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"2"}

Since Cassandra is installed and working, it is now time to create a Titan graph using Cassandra for storage. This will be tackled in the next section using the Titan Gremlin shell. It will follow the same format as the HBase section previously.

The Gremlin Cassandra script

As with the previous Gremlin script, this Cassandra version creates the same simple graph. The difference with this script is in the configuration. The backend storage type is defined as Cassandra, and the hostnames are defined to be the Cassandra seed nodes. The key space and the port number are specified and finally, the graph is created:

cassConf = new BaseConfiguration();
cassConf.setProperty("storage.backend","cassandra");
cassConf.setProperty("storage.hostname","hc2nn,hc2r1m2");
cassConf.setProperty("storage.port","9160")
cassConf.setProperty("storage.keyspace","titan")
titanGraph = TitanFactory.open(cassConf);

From this point, the script is the same as the previous HBase example, so I will not repeat it. This script will be available in the download package as cassandra_create.bash. The same checks, using the previous configuration, can be carried out in the Gremlin shell to check the data. This returns the same results as the previous checks, and so proves that the graph has been stored:

gremlin> g = titanGraph.traversal()

gremlin> g.V().has('name','Mike').valueMap();
==>[name:[Mike], age:[48]]

gremlin> g.V().has('name','Flo').valueMap();
==>[name:[Flo], age:[52]]

Using the Cassandra CQL shell, and the Titan keyspace, it can be seen that a number of Titan tables have been created in Cassandra:

[hadoop@hc2nn ~]$ cqlsh
cqlsh> use titan;
cqlsh:titan> describe tables;
edgestore        graphindex        system_properties systemlog  txlog
edgestore_lock_  graphindex_lock_  system_properties_lock_  titan_ids

It can also be seen that the data exists in the edgestore table within Cassandra:

cqlsh:titan> select * from edgestore;
 key                | column1            | value
--------------------+--------------------+------------------------------------------------
 0x0000000000004815 |               0x02 |                                     0x00011ee0
 0x0000000000004815 |             0x10c0 |                           0xa0727425536fee1ec0
.......
 0x0000000000001005 |             0x10c8 |                       0x00800512644c1b149004a0
 0x0000000000001005 | 0x30c9801009800c20 |   0x000101143c01023b0101696e6465782d706ff30200

This assures me that a Titan graph has been created in the Gremlin shell, and is stored in Cassandra. Now, I will try to access the data from Spark.

The Spark Cassandra connector

In order to access Cassandra from Spark, I will download the DataStax Spark Cassandra connector and driver libraries. Information and version matching on this can be found at http://mvnrepository.com/artifact/com.datastax.spark/.

The version compatibility section of this URL shows the Cassandra connector version that should be used with each Cassandra and Spark version. The version table shows that the connector version should match the Spark version that is being used. The next URL allows the libraries to be sourced at http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10.

By following the previous URL, and selecting a library version, you will see a compile dependencies table associated with the library, which indicates all of the other dependent libraries, and their versions that you will need. The following libraries are those that are needed for use with Spark 1.3.1. If you use the previous URLs, you will see which version of the Cassandra connector library to use with each version of Spark. You will also see the libraries that the Cassandra connector depends upon. Be careful to choose just (and all of) those library versions that are required:

[hadoop@hc2r1m2 titan_cass]$ pwd ; ls *.jar
/home/hadoop/spark/titan_cass

spark-cassandra-connector_2.10-1.3.0-M1.jar
cassandra-driver-core-2.1.5.jar
cassandra-thrift-2.1.3.jar
libthrift-0.9.2.jar
cassandra-clientutil-2.1.3.jar
guava-14.0.1.jar
joda-time-2.3.jar
joda-convert-1.2.jar

Accessing Cassandra with Spark

Now that I have the Cassandra connector library and all of it's dependencies in place, I can begin to think about the Scala code, required to connect to Cassandra. The first thing to do, given that I am using SBT as a development tool, is to set up the SBT build configuration file. Mine looks like this:

[hadoop@hc2r1m2 titan_cass]$ pwd ; more titan.sbt
/home/hadoop/spark/titan_cass

name := "Spark Cass"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.3.1"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector"  % "1.3.0-M1" fr
om "file:///home/hadoop/spark/titan_cass/spark-cassandra-connector_2.10-1.3.0-M1.jar"
libraryDependencies += "com.datastax.cassandra" % "cassandra-driver-core"  % "2.1.5" from
"file:///home/hadoop/spark/titan_cass/cassandra-driver-core-2.1.5.jar"
libraryDependencies += "org.joda"  % "time" % "2.3" from "file:///home/hadoop/spark/titan_
cass/joda-time-2.3.jar"
libraryDependencies += "org.apache.cassandra" % "thrift" % "2.1.3" from "file:///home/hado
op/spark/titan_cass/cassandra-thrift-2.1.3.jar"
libraryDependencies += "com.google.common" % "collect" % "14.0.1" from "file:///home/hadoo
p/spark/titan_cass/guava-14.0.1.jar
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/clouder
a-repos/"

The Scala script for the Cassandra connector example, called spark3_cass.scala, now looks like the following code. First, the package name is defined. Then, the classes are imported for Spark, and the Cassandra connector. Next, the object application class spark3_cass ID is defined, and so is the main method:

package nz.co.semtechsolutions

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import com.datastax.spark.connector._

object spark3_cass
{

  def main(args: Array[String]) {

A Spark configuration object is created using a Spark URL and application name. The Cassandra connection host is added to the configuration. Then, the Spark context is created using the configuration object:

    val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077"
    val appName = "Spark Cass 1"
    val conf = new SparkConf()

    conf.setMaster(sparkMaster)
    conf.setAppName(appName)

    conf.set("spark.cassandra.connection.host", "hc2r1m2")

    val sparkCxt = new SparkContext(conf)

The Cassandra keyspace, and table names that are to be checked are defined. Then, the Spark context method called cassandraTable is used to connect to Cassandra, and obtain the contents of the edgestore table as an RDD. The size of this RDD is then printed, and the script exits. We won't look at this data at this time, because all that was needed was to prove that a connection to Cassandra could be made:

    val keySpace =  "titan"
    val tableName = "edgestore"

    val cassRDD = sparkCxt.cassandraTable( keySpace, tableName )

    println( "Cassandra Table Rows : " + cassRDD.count )

    println( " >>>>> Script Finished <<<<< " )

  } // end main

} // end spark3_cass

As in the previous examples, the Spark submit command has been placed in a Bash script called run_titan.bash.cass. This script, shown next, looks similar to many others used already. The point to note here is that there is a JARs option, which lists all of the JAR files used so that they are available at run time. The order of JAR files in this option has been determined to avoid the class exception errors:

[hadoop@hc2r1m2 titan_cass]$ more run_titan.bash

#!/bin/bash

SPARK_HOME=/usr/local/spark
SPARK_BIN=$SPARK_HOME/bin
SPARK_SBIN=$SPARK_HOME/sbin

JAR_PATH=/home/hadoop/spark/titan_cass/target/scala-2.10/spark-cass_2.10-1.0.jar
CLASS_VAL=$1

CASS_HOME=/home/hadoop/spark/titan_cass/

CASS_JAR1=$CASS_HOME/spark-cassandra-connector_2.10-1.3.0-M1.jar
CASS_JAR2=$CASS_HOME/cassandra-driver-core-2.1.5.jar
CASS_JAR3=$CASS_HOME/cassandra-thrift-2.1.3.jar
CASS_JAR4=$CASS_HOME/libthrift-0.9.2.jar
CASS_JAR5=$CASS_HOME/cassandra-clientutil-2.1.3.jar
CASS_JAR6=$CASS_HOME/guava-14.0.1.jar
CASS_JAR7=$CASS_HOME/joda-time-2.3.jar
CASS_JAR8=$CASS_HOME/joda-convert-1.2.jar

cd $SPARK_BIN

./spark-submit 
  --jars $CASS_JAR8,$CASS_JAR7,$CASS_JAR5,$CASS_JAR4,$CASS_JAR3,$CASS_JAR6,$CASS_JAR2,$CASS_JAR1 
  --class $CLASS_VAL 
  --master spark://hc2nn.semtech-solutions.co.nz:7077  
  --executor-memory 100M 
  --total-executor-cores 50 
  $JAR_PATH

This application is invoked using the previous Bash script. It connects to Cassandra, selects the data, and returns a Cassandra table data-based count of 218 rows.

[hadoop@hc2r1m2 titan_cass]$ ./run_titan.bash.cass nz.co.semtechsolutions.spark3_cass

Cassandra Table Rows : 218
 >>>>> Script Finished <<<<<

This proves that the raw Cassandra-based Titan table data can be accessed from Apache Spark. However, as in the HBase example, this is raw table-based Titan data, and not the data in Titan graph form. The next step will be to use Apache Spark as a processing engine for the Titan database. This will be examined in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.249.92