Having seen yet another artificial example, let's turn to a more interesting example next, which we will use to investigate some of the core properties that we have studied in the previous section. The data we will be considering in this chapter can be found at http://networkrepository.com/, an open network data repository with a vast amount of interesting data. First, we will load a relatively small data set retrieved from Twitter, which can be downloaded from http://networkrepository.com/rt-occupywallstnyc.php. Download the zip file available on this page, that is, store rt_occupywallstnyc.zip and unpack it to access the file, rt_occupywallstnyc.edges. The file is in the CSV format with commas as separators. Each row represents a retweet of a tweet concerning the occupy Wall Street movement in New York City. The first two columns show Twitter user IDs and the third represents an ID for the retweet; that is, the user in the second column retweeted a tweet from the respective user in the first.
The first ten items look as follows:
3212,221,1347929725
3212,3301,1347923714
3212,1801,1347714310
3212,1491,1347924000
3212,1483,1347923691
3212,1872,1347939690
1486,1783,1346181381
2382,3350,1346675417
2382,1783,1342925318
2159,349,1347911999
For instance, we can see that the tweets from user 3,212 have been retweeted at least six times, but since we don't know if the file is ordered in any way and that contains roughly 3.6k vertices, we should utilize GraphX to answer such questions for us.
To build a graph, we will proceed by first creating an RDD of edges from this file, that is, RDD[Edge[Long]], by using basic Spark functionality:
val edges: RDD[Edge[Long]] =
sc.textFile("./rt_occupywallstnyc.edges").map { line =>
val fields = line.split(",")
Edge(fields(0).toLong, fields(1).toLong, fields(2).toLong)
}
Recall that IDs in GraphX are of the Long type, which is why we cast all the values to Long after loading the text file and splitting each line by comma; that is, our edge data type in this case is Long. Here, we assume that the file in question resides in the same folder that we started spark-shell in; adapt it to your needs, if necessary. Having such an edge RDD, we can now use the fromEdges method of the Graph companion object as follows:
val rtGraph: Graph[String, Long] = Graph.fromEdges(edges, defaultValue = "")
It may not come as a surprise that we need to supply edges to this method, but the defaultValue keyword deserves some explanation. Note that so far, we only have knowledge of edges, and while the vertex IDs are implicitly available as sources and targets of edges, we still have not settled on a vertex data type VD needed for any GraphX graph. The defaultValue allows you to create a default vertex data value, which comes with a type. In our case, we chose an empty string, which explains the signature of rtGraph.
With this first real-world data graph loaded, let's check for some basic properties. Using the notation from earlier, the order and degree of the graph can be computed as follows:
val order = rtGraph.numVertices
val degree = rtGraph.numEdges
The preceding code will yield 3,609 and 3,936, respectively. As for the degree of individual vertices, GraphX provides the degrees method on Graphs that returns a graph of integer vertex data type, which is used to store degrees. Let's compute the average degree of our retweet graph:
val avgDegree = rtGraph.degrees.map(_._2).reduce(_ + _) / order.toDouble
The result of this operation should be roughly 2.18, which means that each vertex has about two edges connected to it on average. The notation used in this concise operation may seem a bit dense, mostly due to the many wildcards used, so let's dissect it a little. To explain this, we first call degrees, as discussed. Afterwards, we extract the degrees only by mapping to the second item of the pair; that is, we forget the vertex IDs. This leaves us with an RDD of integer values, which we can sum up by reducing by addition. The last step is casting order.toDouble to make sure we get floating division and then dividing by this total. The next code listing shows the same four steps expanded in more detail:
val vertexDegrees: VertexRDD[Int] = rtGraph.degrees
val degrees: RDD[Int] = vertexDegrees.map(v => v._2)
val sumDegrees: Int = degrees.reduce((v1, v2) => v1 + v2 )
val avgDegreeAlt = sumDegrees / order.toDouble
Next, we compute in-degree and out-degree of this directed graph by simply calling inDegrees and outDegrees, respectively. To make things more interesting, let's compute the maximum in-degree, as well as the minimum out-degree, over all the vertices present in the graph and return its ID as well. We tackle the maximum in-degree first:
val maxInDegree: (Long, Int) = rtGraph.inDegrees.reduce(
(v1,v2) => if (v1._2 > v2._2) v1 else v2
)
Carrying out this computation, you should see that the vertex with ID 1783 has in-degree 401, meaning that the user with this ID retweeted 401 different tweets. So, an interesting follow-up question to ask is, "From how many different users has this user retweeted?" Again, we can answer this in a very quick manner by counting the distinct sources of this target in all the edges:
rtGraph.edges.filter(e => e.dstId == 1783).map(_.srcId).distinct()
Executing this command should prompt 34, so on average, user 1783 retweeted about 12 tweets from any given user that he retweeted from at all in this data set. This in turn means that we found a meaningful example of a multigraph--there are pairs of vertices in this graph with many different connections between each other. Answering the question of minimum out-degree is now straightforward:
val minOutDegree: (Long, Int) = rtGraph.outDegrees.reduce(
(v1,v2) => if (v1._2 < v2._2) v1 else v2
)
The answer is 1 in this case, which means that in this data set, each tweet has been retweeted at least once.
Recall that a triplet of a property graph consists of an edge and its data, as well as both of the joining vertices and their respective data. In Spark GraphX, this concept is implemented in a class called EdgeTriplet, in which we can retrieve the edge data as attr and vertex data and IDs naturally through srcAttr, dstAttr, srcId, and dstId. To get triplets for our retweet graph, we can simply call the following:
val triplets: RDD[EdgeTriplet[String, Long]] = rtGraph.triplets
Triplets often prove practical, as we can directly retrieve the corresponding edge and vertex data, which would otherwise live in separate RDDs in the graph. For instance, we can quickly transform the generated triplets to give us somewhat readable data for each retweet by executing the following:
val tweetStrings = triplets.map(
t => t.dstId + " retweeted " + t.attr + " from " + t.srcId
)
tweetStrings.take(5)
The preceding code results in the following output:
When we discussed the friendGraph example earlier, we took note that mapEdges was, in certain regards, superior to first calling edges and then map them. The same holds true for vertices and triplets as well. Let's say we want to change the vertex data of our graph to simply be the vertex IDs instead of the previously chosen default value. This can be most quickly and efficiently achieved by mapping the vertices as follows:
val vertexIdData: Graph[Long, Long] = rtGraph.mapVertices( (id, _) => id)
Similarly, instead of retrieving triplets first, we can start equally well from our initial graph and directly transform triplets using mapTriplets, returning a Graph object with modified edge data. To achieve the same effect as with the preceding tweetStrings but keeping the graph structure intact, we can run the following:
val mappedTripletsGraph = rtGraph.mapTriplets(
t => t.dstId + " retweeted " + t.attr + " from " + t.srcId
)
As a last example of basic graph processing functionality, we will now look at the subgraphs of a given graph and how to join graphs with each other. Consider the task of extracting information of all the Twitter users in our graph that have been retweeted at least 10 times. We have already seen how to obtain out-degree from rtGraph.outDegrees. To make this information accessible in our original graph, we need to join this information to it. For this purpose, GraphX has the functionality provided by outerJoinVertices in place. To do so, we need to provide a VertexRDD of vertex data type, U, to join with and a function that determines how to aggregate the vertex data. If we call the RDD to join other, this looks as follows on paper:
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
Note that since we carry out an outer join, not all IDs in the original graph may have a corresponding value in other, which is why we see the Option type in the respective map function. Doing this for our concrete example at hand works as follows:
val outDegreeGraph: Graph[Long, Long] =
rtGraph.outerJoinVertices[Int, Long](rtGraph.outDegrees)(
mapFunc = (id, origData, outDeg) => outDeg.getOrElse(0).toLong
)
We join with our original graph with the out-degree, VertexRDD, and as the map function, we simply discard the original vertex data and replace it with out-degree. If there is no out-degree available, we simply set it to 0 by using getOrElse to resolve the Option.
Next, we want to retrieve the subgraph of this graph, in which each vertex has at least 10 retweets. A subgraph of a graph consists of a subset of the original vertices and edges. Formally, we define a subgraph to be the result of a predicate on edges, vertices, or both. By this, we mean an expression evaluated on the vertices or edges that returns either true or false. The signature of the subgraph method on graphs is defined as follows:
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
Note that since the default functions are provided, we can choose to provide only one of either vpred or epred. In our concrete example, we want to restrict to vertices with a degree of at least 10, which can be done as follows:
val tenOrMoreRetweets = outDegreeGraph.subgraph(
vpred = (id, deg) => deg >= 10
)
tenOrMoreRetweets.vertices.count
tenOrMoreRetweets.edges.count
The resulting graph has a mere 10 vertices and 5 edges, but it's interesting to see that these influencers seem to connect to each other in about as much as the average.
To close this section, an interesting technique to know is that of masking. Assume that we now want to know the subgraph of vertices with less than 10 retweets, which is somewhat the opposite of the preceding tenOrMoreRetweets. Of course, this can be done by a subgraph definition, but we can also mask the original graph by tenOrMoreRetweets, as follows:
val lessThanTenRetweets = rtGraph.mask(tenOrMoreRetweets)
If we wanted, we could reconstruct rtGraph by joining tenOrMoreRetweets to lessThanTenRetweets.