GraphX coding

This section will examine Apache Spark GraphX programming in Scala, using the family relationship graph data sample, which was shown in the last section. This data will be stored on HDFS, and will be accessed as a list of vertices and edges. Although this data set is small, the graphs that you build in this way could be very large. I have used HDFS for storage, because if your graph scales to the big data scale, then you will need some type of distributed and redundant storage. As this chapter shows by way of example, that could be HDFS. Using the Apache Spark SQL module, the storage could also be Apache Hive; see Chapter 4, Apache Spark SQL, for details.

Environment

I have used the hadoop Linux account on the server hc2nn to develop the Scala-based GraphX code. The structure for SBT compilation follows the same pattern as the previous examples, with the code tree existing in a subdirectory named graphx, where an sbt configuration file called graph.sbt resides:

[hadoop@hc2nn graphx]$ pwd
/home/hadoop/spark/graphx

[hadoop@hc2nn graphx]$ ls
   src   graph.sbt          project     target

The source code lives, as expected, under a subtree of this level called src/main/scala, and contains five code samples:

[hadoop@hc2nn scala]$ pwd
/home/hadoop/spark/graphx/src/main/scala

[hadoop@hc2nn scala]$ ls
graph1.scala  graph2.scala  graph3.scala  graph4.scala  graph5.scala

In each graph-based example, the Scala file uses the same code to load data from HDFS, and to create a graph; but then, each file provides a different facet of GraphX-based graph processing. As a different Spark module is being used in this chapter, the sbt configuration file graph.sbt has been changed to support this work:

[hadoop@hc2nn graphx]$ more graph.sbt

name := "Graph X"


version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0"

libraryDependencies += "org.apache.spark" %% "spark-core"  % "1.0.0"

libraryDependencies += "org.apache.spark" %% "spark-graphx" % "1.0.0"

// If using CDH, also add Cloudera repo
resolvers += "Cloudera Repository" at https://repository.cloudera.com/artifactory/cloudera-repos/

The contents of the graph.sbt file are shown previously, via the Linux more command. There are only two changes here to note from previous examples—the value of name has changed to represent the content. Also, more importantly, the Spark GraphX 1.0.0 library has been added as a library dependency.

Two data files have been placed on HDFS, under the /data/spark/graphx/ directory. They contain the data that will be used for this section in terms of the vertices, and edges that make up a graph. As the Hadoop file system ls command shows next, the files are called graph1_edges.cvs and graph1_vertex.csv:

[hadoop@hc2nn scala]$ hdfs dfs -ls /data/spark/graphx
Found 2 items
-rw-r--r--   3 hadoop supergroup        129 2015-03-01 13:52 /data/spark/graphx/graph1_edges.csv
-rw-r--r--   3 hadoop supergroup         59 2015-03-01 13:52 /data/spark/graphx/graph1_vertex.csv

The vertex file, shown next, via a Hadoop file system cat command, contains just six lines, representing the graph used in the last section. Each vertex represents a person, and has a vertex ID number, a name and an age value:

[hadoop@hc2nn scala]$ hdfs dfs -cat /data/spark/graphx/graph1_vertex.csv
1,Mike,48
2,Sarah,45
3,John,25
4,Jim,53
5,Kate,22
6,Flo,52

The edge file contains a set of directed edge values in the form of source vertex ID, destination vertex ID, and relationship. So, record one forms a Sister relationship between Flo and Mike:

[hadoop@hc2nn scala]$  hdfs dfs -cat /data/spark/graphx/graph1_edges.csv
6,1,Sister
1,2,Husband
2,1,Wife
5,1,Daughter
5,2,Daughter
3,1,Son
3,2,Son
4,1,Friend
1,5,Father
1,3,Father
2,5,Mother
2,3,Mother

Having explained the sbt environment, and the HDFS-based data, we are now ready to examine some of the GraphX code samples. As in the previous examples, the code can be compiled, and packaged as follows from the graphx subdirectory. This creates a JAR called graph-x_2.10-1.0.jar from which the example applications can be run:

[hadoop@hc2nn graphx]$ pwd
/home/hadoop/spark/graphx

[hadoop@hc2nn graphx]$  sbt package

Loading /usr/share/sbt/bin/sbt-launch-lib.bash
[info] Set current project to Graph X (in build file:/home/hadoop/spark/graphx/)
[info] Compiling 5 Scala sources to /home/hadoop/spark/graphx/target/scala-2.10/classes...
[info] Packaging /home/hadoop/spark/graphx/target/scala-2.10/graph-x_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 30 s, completed Mar 3, 2015 5:27:10 PM

Creating a graph

This section will explain the generic Scala code, up to the point of creating a GraphX graph, from the HDFS-based data. This will save time, as the same code is reused in each example. Once this is explained, I will concentrate on the actual graph-based manipulation in each code example:

The generic code starts by importing the Spark context, graphx, and RDD functionality for use in the Scala code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

Then, an application is defined, which extends the App class, and the application name changes, for each example, from graph1 to graph5. This application name will be used when running the application using spark-submit:

object graph1 extends App
{

The data files are defined in terms of the HDFS server and port, the path that they reside under in HDFS and their file names. As already mentioned, there are two data files that contain the vertex and edge information:

  val hdfsServer = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
  val hdfsPath   = "/data/spark/graphx/"
  val vertexFile = hdfsServer + hdfsPath + "graph1_vertex.csv"
  val edgeFile   = hdfsServer + hdfsPath + "graph1_edges.csv"

The Spark Master URL is defined, as is the application name, which will appear in the Spark user interface when the application runs. A new Spark configuration object is created, and the URL and name are assigned to it:

  val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077"
  val appName = "Graph 1"
  val conf = new SparkConf()
  conf.setMaster(sparkMaster)
  conf.setAppName(appName)

A new Spark context is created using the configuration that was just defined:

  val sparkCxt = new SparkContext(conf)

The vertex information from the HDFS-based file is then loaded into an RDD-based structure called vertices using the sparkCxt.textFile method. The data is stored as a long VertexId, and strings to represent the person's name and age. The data lines are split by commas as this is CSV based-data:

  val vertices: RDD[(VertexId, (String, String))] =
      sparkCxt.textFile(vertexFile).map { line =>
        val fields = line.split(",")
        ( fields(0).toLong, ( fields(1), fields(2) ) )
  }

Similary, the HDFS-based edge data is loaded into an RDD-based data structure called edges. The CSV-based data is again split by comma values. The first two data values are converted into Long values, as they represent the source and destination vertex ID's. The final value, representing the relationship of the edge, is left as a string. Note that each record in the RDD structure edges is actually now an Edge record:

  val edges: RDD[Edge[String]] =
      sparkCxt.textFile(edgeFile).map { line =>
        val fields = line.split(",")
        Edge(fields(0).toLong, fields(1).toLong, fields(2))
  }

A default value is defined in case a connection, or a vertex is missing, then the graph is constructed from the RDD-based structures—vertices, edges, and the default record:

  val default = ("Unknown", "Missing")
  val graph = Graph(vertices, edges, default)

This creates a GraphX-based structure called graph, which can now be used for each of the examples. Remember that although these data samples are small, you can create extremely large graphs using this approach. Many of these algorithms are iterative applications, for instance, PageRank and Triangle Count, and as a result, the programs will generate many iterative Spark jobs.

Example 1 – counting

The graph has been loaded, and we know the data volumes in the data files, but what about the data content in terms of vertices, and edges in the actual graph itself? It is very simple to extract this information by using the vertices, and the edges count function as shown here:

  println( "vertices : " + graph.vertices.count )
  println( "edges    : " + graph.edges.count )

Running the graph1 example, using the example name and the JAR file created previously, will provide the count information. The master URL is supplied to connect to the Spark cluster, and some default parameters are supplied for the executor memory, and the total executor cores:

spark-submit 
  --class graph1 
  --master spark://hc2nn.semtech-solutions.co.nz:7077  
  --executor-memory 700M 
  --total-executor-cores 100 
 /home/hadoop/spark/graphx/target/scala-2.10/graph-x_2.10-1.0.jar

The Spark cluster job called graph1 provides the following output, which is as expected and also, it matches the data files:

vertices : 6
edges    : 12

Example 2 – filtering

What happens if we need to create a subgraph from the main graph, and filter by the person's age or relationships? The example code from the second example Scala file, graph2, shows how this can be done:

  val c1 = graph.vertices.filter { case (id, (name, age)) => age.toLong > 40 }.count

  val c2 = graph.edges.filter { case Edge(from, to, property)
    => property == "Father" | property == "Mother" }.count

  println( "Vertices count : " + c1 )
  println( "Edges    count : " + c2 )

The two example counts have been created from the main graph. The first filters the person-based vertices on the age, only taking those people who are greater than 40 years old. Notice that the age value, which was stored as a string, has been converted into a long for comparison. The previous second example filters the edges on the relationship property of Mother or Father. The two count values: c1 and c2 are created, and printed as the Spark output shows here:

Vertices count : 4
Edges    count : 4

Example 3 – PageRank

The PageRank algorithm provides a ranking value for each of the vertices in a graph. It makes the assumption that the vertices that are connected to the most edges are the most important ones. Search engines use PageRank to provide ordering for the page display during a web search:

  val tolerance = 0.0001
  val ranking = graph.pageRank(tolerance).vertices
  val rankByPerson = vertices.join(ranking).map {
    case (id, ( (person,age) , rank )) => (rank, id, person)
  }

The previous example code creates a tolerance value, and calls the graph pageRank method using it. The vertices are then ranked into a new value ranking. In order to make the ranking more meaningful the ranking values are joined with the original vertices RDD. The rankByPerson value then contains the rank, vertex ID, and person's name.

The PageRank result, held in rankByPerson, is then printed record by record, using a case statement to identify the record contents, and a format statement to print the contents. I did this, because I wanted to define the format of the rank value which can vary:

  rankByPerson.collect().foreach {
    case (rank, id, person) =>
      println ( f"Rank $rank%1.2f id $id person $person")
  }

The output from the application is then shown here. As expected, Mike and Sarah have the highest rank, as they have the most relationships:

Rank 0.15 id 4 person Jim
Rank 0.15 id 6 person Flo
Rank 1.62 id 2 person Sarah
Rank 1.82 id 1 person Mike
Rank 1.13 id 3 person John
Rank 1.13 id 5 person Kate

Example 4 – triangle counting

The triangle count algorithm provides a vertex-based count of the number of triangles, associated with this vertex. For instance, vertex Mike (1) is connected to Kate (5), who is connected to Sarah (2); Sarah is connected to Mike (1) and so, a triangle is formed. This can be useful for route finding, where minimum, triangle-free, spanning tree graphs need to be generated for route planning.

The code to execute a triangle count, and print it, is simple, as shown next. The graph triangleCount method is executed for the graph vertices. The result is saved in the value tCount, and then printed:

  val tCount = graph.triangleCount().vertices
  println( tCount.collect().mkString("
") )

The results of the application job show that the vertices called, Flo (4) and Jim (6), have no triangles, whereas Mike (1) and Sarah (2) have the most, as expected, as they have the most relationships:

(4,0)
(6,0)
(2,4)
(1,4)
(3,2)
(5,2)

Example 5 – connected components

When a large graph is created from the data, it might contain unconnected subgraphs, that is, subgraphs that are isolated from each other, and contain no bridging or connecting edges between them. This algorithm provides a measure of this connectivity. It might be important, depending upon your processing, to know that all the vertices are connected.

The Scala code, for this example, calls two graph methods: connectedComponents, and stronglyConnectedComponents. The strong method required a maximum iteration count, which has been set to 1000. These counts are acting on the graph vertices:

  val iterations = 1000
  val connected  = graph.connectedComponents().vertices
  val connectedS = graph.stronglyConnectedComponents(iterations).vertices

The vertex counts are then joined with the original vertex records, so that the connection counts can be associated with the vertex information, such as the person's name:

  val connByPerson = vertices.join(connected).map {
    case (id, ( (person,age) , conn )) => (conn, id, person)
  }

  val connByPersonS = vertices.join(connectedS).map {
    case (id, ( (person,age) , conn )) => (conn, id, person)
  }
The results are then output using a case statement, and formatted printing:
  connByPerson.collect().foreach {
    case (conn, id, person) =>
      println ( f"Weak $conn  $id $person" )
  }

As expected for the connectedComponents algorithm, the results show that for each vertex, there is only one component. This means that all the vertices are the members of a single graph, as the graph diagram earlier in the chapter showed:

Weak 1  4 Jim
Weak 1  6 Flo
Weak 1  2 Sarah
Weak 1  1 Mike
Weak 1  3 John
Weak 1  5 Kate

The stronglyConnectedComponents method gives a measure of the connectivity in a graph, taking into account the direction of the relationships between them. The results for the stronglyConnectedComponents algorithm output is as follows:

  connByPersonS.collect().foreach {
    case (conn, id, person) =>
      println ( f"Strong $conn  $id $person" )
  }

You might notice from the graph that the relationships, Sister and Friend, act from vertices Flo (6) and Jim (4), to Mike (1) as the edge and vertex data shows here:

6,1,Sister
4,1,Friend

1,Mike,48
4,Jim,53
6,Flo,52

So, the strong method output shows that for most vertices, there is only one graph component signified by the 1 in the second column. However, vertices 4 and 6 are not reachable due to the direction of their relationship, and so they have a vertex ID instead of a component ID:

Strong 4  4 Jim
Strong 6  6 Flo
Strong 1  2 Sarah
Strong 1  1 Mike
Strong 1  3 John
Strong 1  5 Kate
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.222.185