aggregateMessages

Aggregating data about vertices is very common in graph-based computations. For example, finding the total number of friends on Facebook for a user or finding the total number of followers on Twitter. aggregateMessages transformation is the primary aggregate function of GraphX. The following is the signature of the method:

aggregateMessages (scala.Function1< EdgeContext < VD , ED , A>,scala.runtime.BoxedUnit> sendMsg, scala.Function2<A, A, A> mergeMsg, TripletFields tripletFields, scala.reflect.ClassTag<A> evidence$11)

aggregateMessages works as a map reduce function. The sendMessage function can be considered as a mapFunction, which helps to send a message from the source to the destination or vice versa. It takes the EdgeContext object as parameter, which exposes methods to send messages between source and destination vertices. mergeMsg function can be considered as a reduce function, which helps to aggregate the messages sent using the sendMessage function.

The Triplets object can be used to specify which part of EdgeContext, such as source attributes or destination attributes, is used in the sendMsg function to optimize the behavior. The default value of this parameter is TripletFields.All.

In this example, we will count that a vertex is the destination of how many directed edges. It is similar to counting how many followers a user has on Twitter.

The following is the definition of the sendMsg function:

public class AbsFunc1 extends AbstractFunction1<EdgeContext<string,string,Integer>, BoxedUnit> implements Serializable {
@Override
public BoxedUnit apply(EdgeContext<string, string, Integer> arg0) {
arg0.sendToDst(1);
return BoxedUnit.UNIT;
}
}

Here we are sending a message with the value 1 from the source to the destination vertex.

The mergeMsg function can be defined as follows:

public class AbsFunc1 extends scala.runtime.AbstractFunction2<Integer, Integer, Integer> implements Serializable {
@Override
public Integer apply(Integer i1, Integer i2) {
return i1+i2;
}
}

In this function, we are performing a sum on all the messages received at a vertex. This can be visualized as a word count program in map reduce.

Using the preceding user-defined functions, the aggregateMessages operation can be executed as follows:

VertexRDD<Integer> aggregateMessages = graph.aggregateMessages(new AbsFunc4(), new AbsFunc5(), TripletFields.All, scala.reflect.ClassTag$.MODULE$.apply(Integer.class));
aggregateMessages.toJavaRDD().collect().forEach(System.out::println);

As shown in the preceding example, it returns a type of VertexRDD, which contains the vertex ID and aggregated result. The vertices that are not the destination for any directed edge are not included in the resultant RDD.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.34.146