This section will examine Apache Spark GraphX programming in Scala, using the family relationship graph data sample, which was shown in the last section. This data will be stored on HDFS, and will be accessed as a list of vertices and edges. Although this data set is small, the graphs that you build in this way could be very large. I have used HDFS for storage, because if your graph scales to the big data scale, then you will need some type of distributed and redundant storage. As this chapter shows by way of example, that could be HDFS. Using the Apache Spark SQL module, the storage could also be Apache Hive; see Chapter 4, Apache Spark SQL, for details.
I have used the hadoop Linux account on the server hc2nn
to develop the Scala-based GraphX code. The structure for SBT compilation follows the same pattern as the previous examples, with the code tree existing in a subdirectory named graphx
, where an sbt
configuration file called graph.sbt
resides:
[hadoop@hc2nn graphx]$ pwd /home/hadoop/spark/graphx [hadoop@hc2nn graphx]$ ls src graph.sbt project target
The source code lives, as expected, under a subtree of this level called src/main/scala
, and contains five code samples:
[hadoop@hc2nn scala]$ pwd /home/hadoop/spark/graphx/src/main/scala [hadoop@hc2nn scala]$ ls graph1.scala graph2.scala graph3.scala graph4.scala graph5.scala
In each graph-based example, the Scala file uses the same code to load data from HDFS, and to create a graph; but then, each file provides a different facet of GraphX-based graph processing. As a different Spark module is being used in this chapter, the sbt
configuration file graph.sbt
has been changed to support this work:
[hadoop@hc2nn graphx]$ more graph.sbt name := "Graph X" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-graphx" % "1.0.0" // If using CDH, also add Cloudera repo resolvers += "Cloudera Repository" at https://repository.cloudera.com/artifactory/cloudera-repos/
The contents of the graph.sbt
file are shown previously, via the Linux more
command. There are only two changes here to note from previous examples—the value of name has changed to represent the content. Also, more importantly, the Spark GraphX 1.0.0 library has been added as a library dependency.
Two data files have been placed on HDFS, under the /data/spark/graphx/
directory. They contain the data that will be used for this section in terms of the vertices, and edges that make up a graph. As the Hadoop file system ls
command shows next, the files are called graph1_edges.cvs
and graph1_vertex.csv
:
[hadoop@hc2nn scala]$ hdfs dfs -ls /data/spark/graphx Found 2 items -rw-r--r-- 3 hadoop supergroup 129 2015-03-01 13:52 /data/spark/graphx/graph1_edges.csv -rw-r--r-- 3 hadoop supergroup 59 2015-03-01 13:52 /data/spark/graphx/graph1_vertex.csv
The vertex
file, shown next, via a Hadoop file system cat
command, contains just six lines, representing the graph used in the last section. Each vertex represents a person, and has a vertex ID number, a name and an age value:
[hadoop@hc2nn scala]$ hdfs dfs -cat /data/spark/graphx/graph1_vertex.csv 1,Mike,48 2,Sarah,45 3,John,25 4,Jim,53 5,Kate,22 6,Flo,52
The edge file contains a set of directed edge values in the form of source vertex ID, destination vertex ID, and relationship. So, record one forms a Sister relationship between Flo
and Mike
:
[hadoop@hc2nn scala]$ hdfs dfs -cat /data/spark/graphx/graph1_edges.csv 6,1,Sister 1,2,Husband 2,1,Wife 5,1,Daughter 5,2,Daughter 3,1,Son 3,2,Son 4,1,Friend 1,5,Father 1,3,Father 2,5,Mother 2,3,Mother
Having explained the sbt environment, and the HDFS-based data, we are now ready to examine some of the GraphX code samples. As in the previous examples, the code can be compiled, and packaged as follows from the graphx
subdirectory. This creates a JAR called graph-x_2.10-1.0.jar
from which the example applications can be run:
[hadoop@hc2nn graphx]$ pwd /home/hadoop/spark/graphx [hadoop@hc2nn graphx]$ sbt package Loading /usr/share/sbt/bin/sbt-launch-lib.bash [info] Set current project to Graph X (in build file:/home/hadoop/spark/graphx/) [info] Compiling 5 Scala sources to /home/hadoop/spark/graphx/target/scala-2.10/classes... [info] Packaging /home/hadoop/spark/graphx/target/scala-2.10/graph-x_2.10-1.0.jar ... [info] Done packaging. [success] Total time: 30 s, completed Mar 3, 2015 5:27:10 PM
This section will explain the generic Scala code, up to the point of creating a GraphX graph, from the HDFS-based data. This will save time, as the same code is reused in each example. Once this is explained, I will concentrate on the actual graph-based manipulation in each code example:
The generic code starts by importing the Spark context, graphx, and RDD functionality for use in the Scala code:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD
Then, an application is defined, which extends the App
class, and the application name changes, for each example, from graph1
to graph5
. This application name will be used when running the application using spark-submit
:
object graph1 extends App {
The data files are defined in terms of the HDFS server and port, the path that they reside under in HDFS and their file names. As already mentioned, there are two data files that contain the vertex
and edge
information:
val hdfsServer = "hdfs://hc2nn.semtech-solutions.co.nz:8020" val hdfsPath = "/data/spark/graphx/" val vertexFile = hdfsServer + hdfsPath + "graph1_vertex.csv" val edgeFile = hdfsServer + hdfsPath + "graph1_edges.csv"
The Spark Master URL is defined, as is the application name, which will appear in the Spark user interface when the application runs. A new Spark configuration object is created, and the URL and name are assigned to it:
val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077" val appName = "Graph 1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName)
A new Spark context is created using the configuration that was just defined:
val sparkCxt = new SparkContext(conf)
The vertex information from the HDFS-based file is then loaded into an RDD-based structure called vertices
using the sparkCxt.textFile
method. The data is stored as a long VertexId
, and strings to represent the person's name and age. The data lines are split by commas as this is CSV based-data:
val vertices: RDD[(VertexId, (String, String))] = sparkCxt.textFile(vertexFile).map { line => val fields = line.split(",") ( fields(0).toLong, ( fields(1), fields(2) ) ) }
Similary, the HDFS-based edge data is loaded into an RDD-based data structure called edges
. The CSV-based data is again split by comma values. The first two data values are converted into Long values, as they represent the source and destination vertex ID's. The final value, representing the relationship of the edge, is left as a string. Note that each record in the RDD structure edges is actually now an Edge
record:
val edges: RDD[Edge[String]] = sparkCxt.textFile(edgeFile).map { line => val fields = line.split(",") Edge(fields(0).toLong, fields(1).toLong, fields(2)) }
A default value is defined in case a connection, or a vertex is missing, then the graph is constructed from the RDD-based structures—vertices
, edges
, and the default
record:
val default = ("Unknown", "Missing") val graph = Graph(vertices, edges, default)
This creates a GraphX-based structure called graph
, which can now be used for each of the examples. Remember that although these data samples are small, you can create extremely large graphs using this approach. Many of these algorithms are iterative applications, for instance, PageRank and Triangle Count, and as a result, the programs will generate many iterative Spark jobs.
The graph has been loaded, and we know the data volumes in the data files, but what about the data content in terms of vertices, and edges in the actual graph itself? It is very simple to extract this information by using the vertices, and the edges count function as shown here:
println( "vertices : " + graph.vertices.count ) println( "edges : " + graph.edges.count )
Running the graph1
example, using the example name and the JAR file created previously, will provide the count information. The master URL is supplied to connect to the Spark cluster, and some default parameters are supplied for the executor memory, and the total executor cores:
spark-submit --class graph1 --master spark://hc2nn.semtech-solutions.co.nz:7077 --executor-memory 700M --total-executor-cores 100 /home/hadoop/spark/graphx/target/scala-2.10/graph-x_2.10-1.0.jar
The Spark cluster job called graph1
provides the following output, which is as expected and also, it matches the data files:
vertices : 6 edges : 12
What happens if we need to create a subgraph from the main graph, and filter by the person's age or relationships? The example code from the second example Scala file, graph2
, shows how this can be done:
val c1 = graph.vertices.filter { case (id, (name, age)) => age.toLong > 40 }.count val c2 = graph.edges.filter { case Edge(from, to, property) => property == "Father" | property == "Mother" }.count println( "Vertices count : " + c1 ) println( "Edges count : " + c2 )
The two example counts have been created from the main graph. The first filters the person-based vertices on the age, only taking those people who are greater than 40 years old. Notice that the age
value, which was stored as a string, has been converted into a long for comparison. The previous second example filters the edges on the relationship property of Mother
or Father
. The two count values: c1
and c2
are created, and printed as the Spark output shows here:
Vertices count : 4 Edges count : 4
The PageRank algorithm provides a ranking value for each of the vertices in a graph. It makes the assumption that the vertices that are connected to the most edges are the most important ones. Search engines use PageRank to provide ordering for the page display during a web search:
val tolerance = 0.0001 val ranking = graph.pageRank(tolerance).vertices val rankByPerson = vertices.join(ranking).map { case (id, ( (person,age) , rank )) => (rank, id, person) }
The previous example code creates a tolerance
value, and calls the graph pageRank
method using it. The vertices are then ranked into a new value ranking. In order to make the ranking more meaningful the ranking values are joined with the original vertices RDD. The rankByPerson
value then contains the rank, vertex ID, and person's name.
The PageRank result, held in rankByPerson
, is then printed record by record, using a case statement to identify the record contents, and a format statement to print the contents. I did this, because I wanted to define the format of the rank value which can vary:
rankByPerson.collect().foreach { case (rank, id, person) => println ( f"Rank $rank%1.2f id $id person $person") }
The output from the application is then shown here. As expected, Mike
and Sarah
have the highest rank, as they have the most relationships:
Rank 0.15 id 4 person Jim Rank 0.15 id 6 person Flo Rank 1.62 id 2 person Sarah Rank 1.82 id 1 person Mike Rank 1.13 id 3 person John Rank 1.13 id 5 person Kate
The triangle count algorithm provides a vertex-based count of the number of triangles, associated with this vertex. For instance, vertex Mike
(1) is connected to Kate
(5), who is connected to Sarah
(2); Sarah
is connected to Mike
(1) and so, a triangle is formed. This can be useful for route finding, where minimum, triangle-free, spanning tree graphs need to be generated for route planning.
The code to execute a triangle count, and print it, is simple, as shown next. The graph triangleCount
method is executed for the graph vertices. The result is saved in the value tCount
, and then printed:
val tCount = graph.triangleCount().vertices println( tCount.collect().mkString(" ") )
The results of the application job show that the vertices called, Flo
(4) and Jim
(6), have no triangles, whereas Mike
(1) and Sarah
(2) have the most, as expected, as they have the most relationships:
(4,0) (6,0) (2,4) (1,4) (3,2) (5,2)
When a large graph is created from the data, it might contain unconnected subgraphs, that is, subgraphs that are isolated from each other, and contain no bridging or connecting edges between them. This algorithm provides a measure of this connectivity. It might be important, depending upon your processing, to know that all the vertices are connected.
The Scala code, for this example, calls two graph methods: connectedComponents
, and stronglyConnectedComponents
. The strong method required a maximum iteration count, which has been set to 1000
. These counts are acting on the graph vertices:
val iterations = 1000 val connected = graph.connectedComponents().vertices val connectedS = graph.stronglyConnectedComponents(iterations).vertices
The vertex counts are then joined with the original vertex records, so that the connection counts can be associated with the vertex information, such as the person's name:
val connByPerson = vertices.join(connected).map { case (id, ( (person,age) , conn )) => (conn, id, person) } val connByPersonS = vertices.join(connectedS).map { case (id, ( (person,age) , conn )) => (conn, id, person) } The results are then output using a case statement, and formatted printing: connByPerson.collect().foreach { case (conn, id, person) => println ( f"Weak $conn $id $person" ) }
As expected for the connectedComponents
algorithm, the results show that for each vertex, there is only one component. This means that all the vertices are the members of a single graph, as the graph diagram earlier in the chapter showed:
Weak 1 4 Jim Weak 1 6 Flo Weak 1 2 Sarah Weak 1 1 Mike Weak 1 3 John Weak 1 5 Kate
The stronglyConnectedComponents
method gives a measure of the connectivity in a graph, taking into account the direction of the relationships between them. The results for the stronglyConnectedComponents
algorithm output is as follows:
connByPersonS.collect().foreach { case (conn, id, person) => println ( f"Strong $conn $id $person" ) }
You might notice from the graph that the relationships, Sister
and Friend
, act from vertices Flo
(6) and Jim
(4), to Mike
(1) as the edge and vertex data shows here:
6,1,Sister 4,1,Friend 1,Mike,48 4,Jim,53 6,Flo,52
So, the strong method output shows that for most vertices, there is only one graph component signified by the 1
in the second column. However, vertices 4
and 6
are not reachable due to the direction of their relationship, and so they have a vertex ID instead of a component ID:
Strong 4 4 Jim Strong 6 6 Flo Strong 1 2 Sarah Strong 1 1 Mike Strong 1 3 John Strong 1 5 Kate
3.144.38.92