Filter

A function call to filter() restricts the vertex set to the set of vertices satisfying the given predicate. This operation preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask rather than allocating new memory:

def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] 

Using filter, we can filter out everything but the vertex for user Mark, which can be done either using the vertexId or the User.name attribute. We can also filter for the User.occupation attribute.

The following is the code to accomplish the same:

scala> graph.vertices.filter(x => x._1 == 2).take(10)
res118: Array[(org.apache.spark.graphx.VertexId, User)] = Array((2,User(Mark,Doctor)))

scala> graph.vertices.filter(x => x._2.name == "Mark").take(10)
res119: Array[(org.apache.spark.graphx.VertexId, User)] = Array((2,User(Mark,Doctor)))

scala> graph.vertices.filter(x => x._2.occupation == "Doctor").take(10)
res120: Array[(org.apache.spark.graphx.VertexId, User)] = Array((4,User(Liz,Doctor)), (2,User(Mark,Doctor)), (9,User(Dan,Doctor)))

We can also perform filter on the edges too, using either the source vertexId or the destination vertexId. So we can filter out the edges to show only the edges, which originate from John (vertexId = 1):

scala> graph.edges.filter(x => x.srcId == 1)
res123: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[2672] at filter at <console>:134

scala> graph.edges.filter(x => x.srcId == 1).take(10)
res124: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,friend), Edge(1,3,friend), Edge(1,10,friend))
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.200.164