In this section, the Cassandra NoSQL database will be used as a storage mechanism for Titan. Although it does not use Hadoop, it is a large-scale, cluster-based database in its own right, and can scale to very large cluster sizes. This section will follow the same process. As for HBase, a graph will be created, and stored in Cassandra using the Titan Gremlin shell. It will then be checked using Gremlin, and the stored data will be checked in Cassandra. The raw Titan Cassandra graph-based data will then be accessed from Spark. The first step then will be to install Cassandra on each node in the cluster.
Create a repo file that will allow the community version of DataStax Cassandra to be installed using the Linux yum
command. Root access will be required for this, so the su
command has been used to switch the user to the root. Install Cassandra on all the nodes:
[hadoop@hc2nn lib]$ su - [root@hc2nn ~]# vi /etc/yum.repos.d/datastax.repo [datastax] name= DataStax Repo for Apache Cassandra baseurl=http://rpm.datastax.com/community enabled=1 gpgcheck=0
Now, install Cassandra on each node in the cluster using the Linux yum
command:
[root@hc2nn ~]# yum -y install dsc20-2.0.13-1 cassandra20-2.0.13-1
Set up the Cassandra configuration under /etc/cassandra/conf
by altering the cassandra.yaml
file:
[root@hc2nn ~]# cd /etc/cassandra/conf ; vi cassandra.yaml
I have made the following changes to specify my cluster name, the server seed IP addresses, the RPC address, and the snitch value. Seed nodes are the nodes that the other nodes will try to connect to first. In this case, the NameNode (103
), and node2 (108
) have been used as seeds
. The snitch method manages network topology and routing:
cluster_name: 'Cluster1' seeds: "192.168.1.103,192.168.1.108" listen_address: rpc_address: 0.0.0.0 endpoint_snitch: GossipingPropertyFileSnitch
Cassandra can now be started on each node as root using the service command:
[root@hc2nn ~]# service cassandra start
Log files can be found under /var/log/cassandra
, and the data is stored under /var/lib/cassandra
. The nodetool
command can be used on any Cassandra node to check the status of the Cassandra cluster:
[root@hc2nn cassandra]# nodetool status Datacenter: DC1 =============== Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns (effective) Host ID Rack UN 192.168.1.105 63.96 KB 256 37.2% f230c5d7-ff6f-43e7-821d-c7ae2b5141d3 RAC1 UN 192.168.1.110 45.86 KB 256 39.9% fc1d80fe-6c2d-467d-9034-96a1f203c20d RAC1 UN 192.168.1.109 45.9 KB 256 40.9% daadf2ee-f8c2-4177-ae72-683e39fd1ea0 RAC1 UN 192.168.1.108 50.44 KB 256 40.5% b9d796c0-5893-46bc-8e3c-187a524b1f5a RAC1 UN 192.168.1.103 70.68 KB 256 41.5% 53c2eebd- a66c-4a65-b026-96e232846243 RAC1
The Cassandra CQL shell command called cqlsh
can be used to access the cluster, and create objects. The shell is invoked next, and it shows that Cassandra version 2.0.13 is installed:
[hadoop@hc2nn ~]$ cqlsh Connected to Cluster1 at localhost:9160. [cqlsh 4.1.1 | Cassandra 2.0.13 | CQL spec 3.1.1 | Thrift protocol 19.39.0] Use HELP for help. cqlsh>
The Cassandra query language next shows a key space called keyspace1
that is being created and used via the CQL shell:
cqlsh> CREATE KEYSPACE keyspace1 WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; cqlsh> USE keyspace1; cqlsh:keyspace1> SELECT * FROM system.schema_keyspaces; keyspace_name | durable_writes | strategy_class | strategy_options --------------+------+---------------------------------------------+---------------------------- keyspace1 | True | org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"1"} system | True | org.apache.cassandra.locator.LocalStrategy | {} system_traces | True | org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"2"}
Since Cassandra is installed and working, it is now time to create a Titan graph using Cassandra for storage. This will be tackled in the next section using the Titan Gremlin shell. It will follow the same format as the HBase section previously.
As with the previous Gremlin script, this Cassandra version creates the same simple graph. The difference with this script is in the configuration. The backend storage type is defined as Cassandra, and the hostnames are defined to be the Cassandra seed nodes. The key space and the port number are specified and finally, the graph is created:
cassConf = new BaseConfiguration(); cassConf.setProperty("storage.backend","cassandra"); cassConf.setProperty("storage.hostname","hc2nn,hc2r1m2"); cassConf.setProperty("storage.port","9160") cassConf.setProperty("storage.keyspace","titan") titanGraph = TitanFactory.open(cassConf);
From this point, the script is the same as the previous HBase example, so I will not repeat it. This script will be available in the download package as cassandra_create.bash
. The same checks, using the previous configuration, can be carried out in the Gremlin shell to check the data. This returns the same results as the previous checks, and so proves that the graph has been stored:
gremlin> g = titanGraph.traversal() gremlin> g.V().has('name','Mike').valueMap(); ==>[name:[Mike], age:[48]] gremlin> g.V().has('name','Flo').valueMap(); ==>[name:[Flo], age:[52]]
Using the Cassandra CQL shell, and the Titan keyspace
, it can be seen that a number of Titan tables have been created in Cassandra:
[hadoop@hc2nn ~]$ cqlsh cqlsh> use titan; cqlsh:titan> describe tables; edgestore graphindex system_properties systemlog txlog edgestore_lock_ graphindex_lock_ system_properties_lock_ titan_ids
It can also be seen that the data exists in the edgestore
table within Cassandra:
cqlsh:titan> select * from edgestore; key | column1 | value --------------------+--------------------+------------------------------------------------ 0x0000000000004815 | 0x02 | 0x00011ee0 0x0000000000004815 | 0x10c0 | 0xa0727425536fee1ec0 ....... 0x0000000000001005 | 0x10c8 | 0x00800512644c1b149004a0 0x0000000000001005 | 0x30c9801009800c20 | 0x000101143c01023b0101696e6465782d706ff30200
This assures me that a Titan graph has been created in the Gremlin shell, and is stored in Cassandra. Now, I will try to access the data from Spark.
In order to access Cassandra from Spark, I will download the DataStax Spark Cassandra connector and driver libraries. Information and version matching on this can be found at http://mvnrepository.com/artifact/com.datastax.spark/.
The version compatibility section of this URL shows the Cassandra connector version that should be used with each Cassandra and Spark version. The version table shows that the connector version should match the Spark version that is being used. The next URL allows the libraries to be sourced at http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10.
By following the previous URL, and selecting a library version, you will see a compile dependencies table associated with the library, which indicates all of the other dependent libraries, and their versions that you will need. The following libraries are those that are needed for use with Spark 1.3.1. If you use the previous URLs, you will see which version of the Cassandra connector library to use with each version of Spark. You will also see the libraries that the Cassandra connector depends upon. Be careful to choose just (and all of) those library versions that are required:
[hadoop@hc2r1m2 titan_cass]$ pwd ; ls *.jar /home/hadoop/spark/titan_cass spark-cassandra-connector_2.10-1.3.0-M1.jar cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar libthrift-0.9.2.jar cassandra-clientutil-2.1.3.jar guava-14.0.1.jar joda-time-2.3.jar joda-convert-1.2.jar
Now that I have the Cassandra connector library and all of it's dependencies in place, I can begin to think about the Scala code, required to connect to Cassandra. The first thing to do, given that I am using SBT as a development tool, is to set up the SBT build configuration file. Mine looks like this:
[hadoop@hc2r1m2 titan_cass]$ pwd ; more titan.sbt /home/hadoop/spark/titan_cass name := "Spark Cass" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector" % "1.3.0-M1" fr om "file:///home/hadoop/spark/titan_cass/spark-cassandra-connector_2.10-1.3.0-M1.jar" libraryDependencies += "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5" from "file:///home/hadoop/spark/titan_cass/cassandra-driver-core-2.1.5.jar" libraryDependencies += "org.joda" % "time" % "2.3" from "file:///home/hadoop/spark/titan_ cass/joda-time-2.3.jar" libraryDependencies += "org.apache.cassandra" % "thrift" % "2.1.3" from "file:///home/hado op/spark/titan_cass/cassandra-thrift-2.1.3.jar" libraryDependencies += "com.google.common" % "collect" % "14.0.1" from "file:///home/hadoo p/spark/titan_cass/guava-14.0.1.jar resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/clouder a-repos/"
The Scala script for the Cassandra connector example, called spark3_cass.scala
, now looks like the following code. First, the package name is defined. Then, the classes are imported for Spark, and the Cassandra connector. Next, the object application class spark3_cass
ID is defined, and so is the main method:
package nz.co.semtechsolutions import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object spark3_cass { def main(args: Array[String]) {
A Spark configuration object is created using a Spark URL and application name. The Cassandra connection host is added to the configuration. Then, the Spark context is created using the configuration object:
val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077" val appName = "Spark Cass 1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) conf.set("spark.cassandra.connection.host", "hc2r1m2") val sparkCxt = new SparkContext(conf)
The Cassandra keyspace
, and table names that are to be checked are defined. Then, the Spark context method called cassandraTable
is used to connect to Cassandra, and obtain the contents of the edgestore
table as an RDD. The size of this RDD is then printed, and the script exits. We won't look at this data at this time, because all that was needed was to prove that a connection to Cassandra could be made:
val keySpace = "titan" val tableName = "edgestore" val cassRDD = sparkCxt.cassandraTable( keySpace, tableName ) println( "Cassandra Table Rows : " + cassRDD.count ) println( " >>>>> Script Finished <<<<< " ) } // end main } // end spark3_cass
As in the previous examples, the Spark submit
command has been placed in a Bash script called run_titan.bash.cass
. This script, shown next, looks similar to many others used already. The point to note here is that there is a JARs option, which lists all of the JAR files used so that they are available at run time. The order of JAR files in this option has been determined to avoid the class exception errors:
[hadoop@hc2r1m2 titan_cass]$ more run_titan.bash #!/bin/bash SPARK_HOME=/usr/local/spark SPARK_BIN=$SPARK_HOME/bin SPARK_SBIN=$SPARK_HOME/sbin JAR_PATH=/home/hadoop/spark/titan_cass/target/scala-2.10/spark-cass_2.10-1.0.jar CLASS_VAL=$1 CASS_HOME=/home/hadoop/spark/titan_cass/ CASS_JAR1=$CASS_HOME/spark-cassandra-connector_2.10-1.3.0-M1.jar CASS_JAR2=$CASS_HOME/cassandra-driver-core-2.1.5.jar CASS_JAR3=$CASS_HOME/cassandra-thrift-2.1.3.jar CASS_JAR4=$CASS_HOME/libthrift-0.9.2.jar CASS_JAR5=$CASS_HOME/cassandra-clientutil-2.1.3.jar CASS_JAR6=$CASS_HOME/guava-14.0.1.jar CASS_JAR7=$CASS_HOME/joda-time-2.3.jar CASS_JAR8=$CASS_HOME/joda-convert-1.2.jar cd $SPARK_BIN ./spark-submit --jars $CASS_JAR8,$CASS_JAR7,$CASS_JAR5,$CASS_JAR4,$CASS_JAR3,$CASS_JAR6,$CASS_JAR2,$CASS_JAR1 --class $CLASS_VAL --master spark://hc2nn.semtech-solutions.co.nz:7077 --executor-memory 100M --total-executor-cores 50 $JAR_PATH
This application is invoked using the previous Bash script. It connects to Cassandra, selects the data, and returns a Cassandra table data-based count of 218
rows.
[hadoop@hc2r1m2 titan_cass]$ ./run_titan.bash.cass nz.co.semtechsolutions.spark3_cass Cassandra Table Rows : 218 >>>>> Script Finished <<<<<
This proves that the raw Cassandra-based Titan table data can be accessed from Apache Spark. However, as in the HBase example, this is raw table-based Titan data, and not the data in Titan graph form. The next step will be to use Apache Spark as a processing engine for the Titan database. This will be examined in the next section.
3.129.249.92