aggregateMessages

The core aggregation operation in GraphX is aggregateMessages, which applies a user-defined sendMsg function to each edge triplet in the graph and then uses the mergeMsg function to aggregate these messages at their destination vertex. aggregateMessages is used in many graph algorithms, where we have to exchange information between vertices.

The following is the signature for this API:

def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]

The key functions are the sendMsg and mergeMsg, which determine what gets sent either to source vertex or destination vertex of an edge. Then, mergeMsg processes the messages received from all the Edges and performs a computation or aggregation.

The following is a simple example of calling aggregateMessages on the Graph graph, where we send a message to all destination vertices. The merge strategy at each vertex is to just add all the messages being received:

scala> graph.aggregateMessages[Int](_.sendToDst(1), _ + _).collect
res207: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (6,2), (8,3), (10,4), (2,3), (1,3), (3,3), (7,2), (9,2), (5,2))
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.137.127