Clustering

Given a graph, a natural question to ask is if there are any subgraphs to it that naturally belong together, that is, that cluster the graph in some way. This question can be addressed in many ways, one of which we have already implemented ourselves, namely by studying connected components. Instead of using our own implementation, let's use GraphX's built-in version this time. To do so, we can simply call connectedComponents directly on the graph itself:

val actorComponents = actorGraph.connectedComponents().cache 
actorComponents.vertices.map(_._2).distinct().count

As in our own implementation, the vertex data of the graph contains cluster IDs, which correspond to the minimum available vertex ID within the cluster. This allows us to directly count connected components, by collecting distinct cluster IDs. The answer for our restricted cluster graph is 173. Computing components, we cache the graph so we can further use it for other computations. For instance, we might ask how large the connected components are, for example by computing the maximum and the minimum cluster size in terms of vertices. We can do this by using the cluster ID as key and reducing each group by counting its items:

val clusterSizes =actorComponents.vertices.map(
v => (v._2, 1)).reduceByKey(_ + _)
clusterSizes.map(_._2).max
clusterSizes.map(_._2).min

It turns out the largest cluster spans a respectable group of 193,518 actors, while the smallest consists of a mere three actors. Next, let's ignore the fact that the graph in question does not actually have directionality, since appearing in a movie together is symmetric, and act as if the edge pairs were directed. We don't have to impose anything here, since an edge in Spark GraphX always has a source and a target. This allows us to study strongly connected components as well. We can call this algorithm similarly to that for connected components, but in this case we have to specify a number of iterations as well. The reason for this is that it's much more computationally demanding to "trace" directed edges in the same way we did for connected components and convergence is slower.

Let's settle for just one iteration to carry out the computation, since it is very expensive:

val strongComponents = actorGraph.stronglyConnectedComponents(numIter = 1)
strongComponents.vertices.map(_._2).distinct().count

This computation might take a few minutes to complete. In case you have problems running even this example on your machine, consider further restricting actorGraph.

Next, let's compute triangles for the actor graph, yet another way to cluster it. To do so, we need to slightly prepare the graph, namely we have to canonicalize the edges and specify a graph partition strategy. To canonicalize a graph means to get rid of loops and duplicate edges and make sure that the source ID is always smaller than the target ID for all the edges:

val canonicalGraph = actorGraph.mapEdges(
e => 1).removeSelfEdges().convertToCanonicalEdges()

Graph partition strategies, like RDD partitions we have already encountered, are concerned with the question of how to distribute a graph across the cluster efficiently. Of course, what efficiently means depends in large part on what we do with our graph. Roughly speaking, there are two basic partition strategies, namely vertex cut and edge cut. Vertex cut strategy means enforce split edges in a disjointed manner by cutting vertices, that is, vertices are repeated across partitions, if necessary. Edge cut strategy does the opposite in that vertices are unique throughout the cluster, but we may duplicate edges. GraphX has four partition strategies that are all based on vertex cut.  We will not discuss them here in detail, but rather just use RandomVertexCut, which hashes vertex IDs so that all same-direction edges between vertices are located on the same partition.

Note that when creating a graph without specifying a partition strategy, the graph is distributed by simply adopting the structure of the underlying EdgeRDD that has been provided for construction. Depending on your use-case, this might not be ideal, for instance because edge partitions might be strongly imbalanced.

To partition canonicalGraph and continue with triangle counting, we now partition our graph using said strategy as follows:

val partitionedGraph = canonicalGraph.partitionBy(PartitionStrategy.RandomVertexCut)

Computing triangles is conceptually simple. We first collect all neighboring vertices for each vertex and then compute the intersection of these sets for each edge. The logic is, if both source and target vertex sets contain the same third vertex, the three form a triangle. As a last step, we send the count of the intersection set to both source and target, thereby counting each triangle twice and we simply divide by two to get a triangle count per vertex. Doing the triangle count now boils down to running:

import org.apache.spark.graphx.lib.TriangleCount
val triangles = TriangleCount.runPreCanonicalized(partitionedGraph)

In fact, instead of canonicalising actorGraph explicitly, we could simply have gotten away with just imposing triangleCount directly on the initial graph, that is, by computing the following:

actorGraph.triangleCount()

Equivalently, we can also import TriangleCount and call it on our actor graph as follows:

import org.apache.spark.graphx.lib.TriangleCount
TriangleCount.run(actorGraph)

Note, however, that these last two equivalent operations will in fact canonicalize the graph in question the same way we did, and canonicalisation is a computationally very expensive operation. So, whenever you see the chance to already load your graph in canonical form, the first approach shown will be more efficient.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.148.107.193