5. Graph Processing Paradigms

As discussed in Chapter 1, “Introduction: Why Look Beyond Hadoop Map-Reduce?,” giant 4 (graph processing) requires specialized paradigms for processing. One such paradigm is the Bulk Synchronous Parallel (BSP), proposed by Leslie Valiant (1990). There are several realizations of the BSP in the literature, with Google’s Pregel being the precursor to some of the other tools. Apache Giraph is the open source equivalent of Pregel, and Apache Hama is another similar work. We will look at a few graph-processing tools starting with Pregel. We start with a discussion of what the need is for graph-processing paradigms.

Facebook has recently opened up a search feature (officially known as a graph search) that allows people to search for entities related to people who are in their network. For example, one can search for “all people who are working at Impetus in my network” or for “all entrepreneurs who have businesses in the big data area” or for “people in my network who like the movie Singham.” This is a potentially powerful feature that requires constructing and maintaining what is known as a graph of entities and people, as illustrated in this article: http://spectrum.ieee.org/telecom/internet/the-making-of-facebooks-graph-search/?utm_source=techalert&utm_medium=email&utm_campaign=080813. This is just a simple example of processing/analyzing data in the form of large graphs. Another example is the DBpedia, a semantic web derived from Wikipedia—it contains more than 3 million objects (nodes) and 400 million facts (edges) (Sakr 2013).

Graph database systems such as Neo4j have been proposed to handle transactional workloads on graphs. They allow querying, storing, and managing graphs. But for large graphs that might not fit into main memory, the commonly occurring query patterns are those that require random accesses on the graph. This results in inefficiencies making scalability across a cluster of nodes nontrivial, limiting the size of graphs that can be processed by Neo4j. It should be noted, however, that graph databases are mainly for online transaction processing (OLTP) scenarios, whereas graph-processing systems address online analytical processing (OLAP) scenarios.

Pregel, Giraph, and GraphLab are the frameworks recently proposed to bridge this gap—they process very large graphs across a cluster of nodes with scalability and fault tolerance.

Pregel: Graph-Processing Framework Based on BSP

Pregel was the first realization of the BSP for graph processing. It was built by Google for processing social and other graphs. The main motivation for Pregel was that there was no large graph-processing framework that would work across a distributed system and tolerate failures. Some of the earlier systems such as LEDA (Mehlhorn et al. 1997) or GraphBase (Knuth 1993) were single-node realizations, limiting the size of the graph that can be processed. A few others, such as CGMgraph (Chan et al. 2005) or Parallel BGL (Gregor and Lumsdaine 2005) (which was an attempt at parallelizing the Boost Graph Library), worked on a cluster of nodes and could potentially process larger graphs. But they could not handle node/network failures. Thus, Pregel was proposed as a scalable and fault-tolerant platform for processing large graphs.

Computations in Pregel comprise an input phase, when the graph is initialized; a series of iterations, known as supersteps; and a barrier for synchronizing the supersteps. Each vertex in the graph is associated with a user-defined compute function and a value, which can be examined and modified by the associated compute function. Users can override the Compute() method of the Vertex class. Other methods of the Vertex class allow the compute function to query/modify its own associated value or the value associated with its edges or to send messages to other vertices. Pregel ensures at each superstep that the user-defined compute function is invoked in parallel on each edge. The vertices can send messages through the edges and exchange values with other vertices. The values associated with the vertices and edges are the only state that persists across supersteps. This simplifies graph distribution and failure handling. There is also the global barrier—which moves forward after all compute functions are terminated.

All messages sent to a Vertex v at a superstep t are available when its Compute() method is invoked at superstep t+1. The messages are available through an iterator, but without ordering. The only guarantee is that messages will be delivered without duplication. The overhead of message passing can be mitigated to some extent by using combiners. The user can subclass the combiner class and implement a virtual function, Combine(). This is useful for commutative and associative operators, such as in cases like page rank in which the sum of weights passed is important.

Pregel also has the concept of aggregators, which can also be viewed as a global communication mechanism. A vertex can send a value to an aggregator at a superstep t. The aggregator aggregates the value with values from other vertices and makes it available to all vertices at superstep t+1. Examples of aggregators are min, max, and sum.

Pregel also allows topology changes—for instance, in case of a clustering algorithm, a group of vertices (cluster) can be replaced by a single vertex. A spanning tree construction process can remove nontree edges. Thus, a vertex can issue requests to add or remove edges or other vertices.

Pregel has been implemented on top of Big Table or the Google File System (GFS). Pregel partitions the graph into a set of nodes based on the hash(ID) mod N, where ID is the vertex ID and N is the number of partitions. The architecture is master–slave, with any of the nodes executing copies of the user program capable of playing the role of the master. The workers use a naming service (that of the underlying system, Big Table or GFS) to discover the master and register with it. The master decides the number of partitions (based on a user-defined parameter) and assigns one or more partitions to the workers. Each worker is responsible to maintain computations on its portion of the graph, including executing the compute functions and passing messages to neighbors.

Pregel achieves fault tolerance through check-pointing—the master instructs workers to save computation state, which can include vertex and edge values as well as the incoming messages. The master itself saves the aggregate state to disk.

Similar Efforts

Efforts have been made to use the Hadoop Map-Reduce (MR) paradigm to process graphs. GBASE (Kang et al. 2011) and Surfer (Chen et al. 2010) are two notable examples of graph-processing frameworks over Hadoop MR. GBASE proposes a new block compression scheme for storing homogeneous regions of the graph (partitions) that can save up to 50 times the space using standard compression techniques such as Gzip. GBASE supports two types of queries on the graphs:

Global queries, which involve operations on the whole graph—queries such as PageRank, Random Walk with Restart (RWR), discovery of connected components, and so on.

Targeted queries, which involve operations on subgraphs. GBASE formulates targeted queries as matrix-vector multiplication, which is similar to SQL joins.

Pegasus (Kang et al. 2009) is another effort similar to GBASE—it comes from the same group. Pegasus was the first system to realize whole graph operations over a matrix-vector multiplication primitive known as GIM-V, for Generalized Iterative Matrix-Vector multiplication. All the previously mentioned global queries, including PageRank, RWR, connected components, and diameter estimation, can be formulated using GIM-V. Pegasus is built over Hadoop MR and is available for download at www.cs.cmu.edu/~pegasus/.

Surfer proposed a partitioning scheme for large graphs—the naive Hadoop Distributed File System (HDFS) storage is flat (does not understand the graph structure); this would result in high communication even for simple graph computation tasks such as computing two-hop neighbors. The partitioning scheme is bandwidth-aware and targeted at cloud computing, addressing the variability of machine bandwidth in the cloud. Surfer also proposes an iterative propagation primitive and shows that it outperforms MR primitive for graph processing significantly.

Stratosphere is a recently published work (Ewen et al. 2012). The key idea behind Stratosphere is to add support for iterative processing into existing data flow systems. They distinguish between two types of iterations:

Bulk iterations, where a new partial value is computed at the next iteration using previous iteration results, optionally using certain loop/iterative invariant data. This is the type of iteration in several machine learning (ML) algorithms such as page rank and stochastic gradient descent (SGD), and clustering algorithms like k-means. These are perfectly suited for systems such as Spark.

Incremental iterations, where resultant of each iteration is only partially different from the previous iteration. Sparse computational dependencies exist between elements. Examples for these are connected components algorithms, belief propagation, shortest paths, and so forth. These are perfectly suited for systems such as GraphLab.

Stratosphere provides support for both kinds of interactions in a single data flow system without much performance penalty. They are able to show that performance is comparable to systems such as Spark and Giraph for bulk and incremental iterative algorithms.

Open Source Pregel Implementations

The Pregel paper cited previously has inspired several research groups to come up with their own implementations of Pregel and put it into the open source. (Google has not yet open sourced Pregel.) Some of the efforts in this space include Phoebus, Giraph, and GoldenORB.

Giraph

Giraph is possibly the most prominent open source realization of Pregel. The computation model is exactly as described for Pregel. Giraph computations run as Hadoop jobs. They also use ZooKeeper, a widely used distributed coordination library, for master election by the workers. The graph is partitioned across the worker processes. The master orchestrates the execution of the supersteps and also determines the termination of the jobs, when no vertices are active and no messages are waiting to be delivered. ZooKeeper facilitates saving of the master state in the cluster, which allows any worker to take over as the new master in case of master failures. The master failover is one of the features added by the Giraph system over and above Pregel.

The other interesting feature in Giraph is its capability to allow several input sources, including HDFS files and Hive tables. If the input graph is grouped by vertices (with its directed edges) in the form of an adjacency matrix, this is realized in Giraph by implementing the VertexInputFormat class. If the order does not matter (which implies that every input record is an edge as in relational inputs), this is realized by implementing the class EdgeInput Format in Giraph, in conjunction with the VertexValueInput Format, which is required to read in the vertex values. To read in data from Hive tables, the corresponding classes to use are Hive VertexInputFormat and HiveEdgeInputFormat. Instead of the usual GiraphRunner class, the GiraphHiveRunner class can be used to make all access to Hive easier.

The computation starts by executing the MasterCompute. Compute() method, the first part of executing the superstep. The aggregated value from the earlier superstep is available for each aggregator through the getAggregatedValue() method.

The other feature over Pregel is the capability of Giraph to run out-of-core, meaning that Giraph can be configured to use disks to store graph partitions or messages for very large graphs. Both graph out-of-core (with the giraph.useOutOfCoreGraph=true parameter) and out-of-core messages (with the giraph.useOutOf CoreMessages=true) can be enabled in Giraph. Certain algorithms that generate huge messages might use the latter, such as the clique computations, whereas certain algorithms such as belief propagation might use the former.

The example given next shows the code of the compute function for implementing a page rank algorithm in Giraph (Sakr 2013):

public class SimplePageRankVertex extends
Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
  public void compute(Iterator<DoubleWritable> msg_Iterator) {
    if (getSuperstep() >= 1) {
       double my_rank = 0;
       while (msg_Iterator.hasNext()) {
            my_rank += msgIterator.next().get();
       }
     setVertexValue(new DoubleWritable((0.15f / getNumVertices()) + 0.85f * my_rank);
  }
  if (getSuperstep() < 30) {

      long edges = getOutEdgeIterator().size();
      sentMsgToAllEdges(new DoubleWritable(getVertexValue(). get() / edges));
  } else {
      voteToHalt();
  }
}

GoldenORB

GoldenORB is supported by Ravel, a start-up based out of Austin, Texas. GoldenORB is similar to Giraph in that it is another open source realization of the Pregel system for graph processing. The computing model is exactly the same as in Giraph and Pregel. A class known as an OrbRunner is similar to the GiraphRunner class of Giraph.

Phoebus

Phoebus is yet another open source Pregel implementation. It is written in Erlang, the declarative functional language for distributed and parallel programming. A full distributed setup of Phoebus requires a distributed file system—it has been made to work with HDFS as the underlying file system. It is a work in progress, in the sense that fault tolerance and error handling are still not incorporated—it does not handle even worker failures. In contrast, Giraph is able to handle even master failures.

Apache Hama

Apache Hama is one of the open source systems similar to Giraph and is a realization of the BSP paradigm. It has been optimized with respect to performance for linear algebra applications. Apache Hama even outperforms Message Passing Interface (MPI) for very large matrices when there are faulty nodes in the system (Seo et al. 2010).

Stanford GPS

Stanford’s Graph Processing System (GPS) is yet another BSP implementation modeled after Pregel. However, it has made certain significant contributions, distinct from some of the other open source Pregel-type systems explained earlier. One of the key ideas in GPS is its capability to partition the graph dynamically based on the communication pattern—if there are too many messages between certain vertices, GPS reorganizes the graph partition to collocate those vertices. Stanford GPS also provides a scheme known as Large Adjacency List Partition (LALP) in which adjacency lists of high-degree vertices are partitioned across a cluster of nodes. This is in some sense similar to the partitioning achieved by GraphLab, which is, however, based on edge-cut and not vertex-cut like GPS. The GPS paper published by Salihoglu and Widom (2013) shows that the performance of the dynamic repartitioning can be two times better for a sufficiently large number of iterations. The third contribution made by GPS is the new application programming interface (API) known as master.compute() to denote global computations, which is executed at the beginning of each BSP superstep. The master.compute() is a computation that is scheduled on the master and is typically the type that can check the terminating condition of the iterations, for instance. This can be useful in some specific graph computations that are multistage—involve multiple stages of BSP supersteps—such as finding the connected components of a graph, finding the largest connected component out of these, and, finally, running an iterative partitioning algorithm on this component.

GraphLab

Pregel, due to its BSP basis, is suited for only some graph computations—those that do not need too much communication/interaction between vertices. Page rank is perfectly compatible with such a computation model. However, this might not be the case when we consider a graph coloring problem. The graph coloring problem is to find an appropriate coloring of the vertices of a graph such that no two adjacent vertices share the same color. A simplistic solution to the graph coloring problem is to let the vertices use the smallest color not used by their neighbors at each superstep of the BSP. This program can be slow to converge, because two vertices might pick the same color and are likely to get into a loop—requiring some randomness to get them out of deadlock. The same is the case with a few other ML algorithms as well, such as belief propagation. It has been shown by Gal Elidan (2006) that convergence of the belief propagation algorithm can be significantly sped up by asynchronously reading the values, instead of waiting for the tick (superstep) to complete. This highlights the nonsuitability of BSP for certain graph problems and motivates the need to look for asynchronous approaches.

Although Pregel is good at graph parallel abstraction, is easy to reason with, and ensures deterministic computation, it leaves it to the user to architect the movement of data. Further, like all BSP systems, it suffers from the curse of the slow jobs—meaning that even a single slow job (which could be due to load fluctuations or other reasons) can slow down the whole computation. One alternative to BSP is Piccolo (Power and Li 2010), which is a purely asynchronous system for graph processing. A Piccolo program comprises user-defined kernel functions that can execute in parallel across many machines and control functions that create tables, and launch and manage kernel functions. The kernel functions have access to distributed shared memory across the cluster (using get and put primitives on key-value tables). Other than the shared state, the kernel functions do not need any synchronization and can execute concurrently. This results in significant gains in speedup, because the waiting time for synchronization is avoided. However, the programming model of Piccolo is the other extreme, pure asynchrony. However, for certain ML algorithms, such as Gibbs sampling or statistical simulations, asynchronous graph processing might result in nonconvergence or instability (Gonzalez et al. 2011).

This is the precise motivation for GraphLab (Low et al. 2010)—the capability to process large graphs efficiently, without sacrificing the serializability of vertex programs. The first version of GraphLab was targeted at multicore processing systems and was designed for the shared-memory settings. They also proposed a set of concurrent access models that provide a range of sequential consistency guarantees on the vertex programs.

GraphLab: Multicore Version

The data model of GraphLab comprises the data graph and a shared table, which is a map from keys to arbitrary data blocks. Users can associate parameters with vertices and edges, which is slightly different from Pregel, where users could associate only a single value associated with vertices and edges. We can look at the map as a data structure for global shared state.

Users can also associate two types of computation (function) with the vertices: One is the update function, which is local to the vertex on which it is defined; the other is the sync mechanism. We can view the update as a map function, and the sync mechanism as a reduce operation. The important difference is that the sync mechanism can run concurrently with the update function, which cannot happen in the MR paradigm.

The update functions are stateless and operate on a small neighborhood of the vertex with which it is associated. The neighborhood is known as the scope of the function. The scope of the update function is the data in its vertex, the data in its inbound and outbound edges, and the data in the neighboring vertices. The sync mechanism provides the access to the global state. The user defines the key (an entry in the shared table), supplies the initial value of the key, and specifies a fold function, an apply function, and an optional merge function for parallel tree reductions. The fold function is used to aggregate data across all vertices and obeys the same consistency rules as the update function. The merge function is used to combine the results of multiple parallel folds. The apply function can be used to rescale the key before it is written back to the shared table. For certain ML algorithms, the sync mechanism is used to check and detect termination (keep track of convergence).

Three consistency models are supported by GraphLab for vertex computations. These are important to balance performance and consistency—it must be noted that for the highest performance, the update functions should all be executing in parallel. But this might result in conflicting updates at shared edges/vertices. Thus, some updates might be delayed, say, in neighboring vertices, resulting in nonconflicting updates. The precise notion of delay is captured as the consistency model. The full consistency model ensures that when an update function is executing, no other function will execute in the scope of the original update function. Thus, parallel execution of the update functions occurs only in the vertices that do not have common neighbors. The edge consistency model is weaker and only ensures that no other function will read or modify data on the vertex and on adjacent edges. The parallel execution can occur on nonadjacent vertices. The vertex consistency model is the weakest and consequently permits the maximum parallelism. It only ensures that no other function will operate on the vertex when the update function is executing. It permits parallelism even on neighboring vertices.

Sequential consistency for the update functions is defined analogous to the way it is defined in databases. A GraphLab program is sequentially consistent if for every parallel execution of update functions, there exists a sequential execution that produces equivalent results. It can be observed that the full consistency model ensures sequential consistency by itself. The edge consistency model also ensures sequential consistency if the update functions executing in parallel do not modify the data on adjacent vertices. The vertex consistency model can also be made sequentially consistent by requiring that the update functions operate only on their respective vertex data.

Scheduling is the process of specifying the order of execution of the update function. Scheduling might be involved, so GraphLab provides two base schedulers. The synchronous scheduler ensures that all vertices are updated simultaneously. The round-robin scheduler updates the vertices sequentially, with the most recently available data. GraphLab also allows users to build custom schedulers using the concept of a set scheduler. The user only needs to specify the set of vertices and the corresponding update function; the scheduler can be constructed by GraphLab while ensuring safety. The set scheduler works by reconstructing the schedule in the form of a Directed Acyclic Graph (DAG). The first version was implemented on a multicore machine using the pthreads library.

Distributed GraphLab

The distributed version of GraphLab was first reported in a paper by Low et al. (2010). The main focus was to make GraphLab work in a distributed setting. Let us refer to this version as the DG (Distributed GraphLab). DG solves some of the complex challenges that arise in a distributed system—how to partition the graph, how to ensure consistency semantics despite the latency of a distributed setting, and so on. The partitioning of the graph into k-partitions (where k is much greater than the number of machines in the cluster) is achieved through random hashing or distributed planar graph partitioning techniques. Each of the k-partitions of the graph is known as an atom and is stored in a distributed file system such as HDFS. An atom file comprises a series of graph-generating commands such as AddVertex and AddEdge. The atom also stores the ghosts: a set of edges that cut across partition boundaries along with the incident vertices. A meta-graph that comprises the atoms (and their locations in the file system) and the connectivity information is stored as an atom index file. The meta-graph is partitioned across the set of physical nodes, with each node constructing its local portion of the graph from the respective atom files. The ghosts are also instantiated as distributed cache entries. Cache coherence is achieved through a versioning mechanism.

A DG engine is responsible for ensuring appropriate consistency models, scheduling, executing update functions and sync mechanisms, and so on. Two types of engines have been built as part of DG: chromatic and locking engines. The chromatic engine uses graph coloring to achieve the desired consistency semantics. For instance, a vertex coloring of the graph1 is used with simultaneous execution of all vertices with the same color. This happens in a BSP superstep-like barrier synchronization step known as a color-step. After the color-step, a different color is chosen and simultaneous execution of all vertices of this color is the next color-step. This ensures edge consistency semantics. The vertex consistency model is trivially satisfied by assigning all vertices the same color. The full consistency model is ensured by constructing a second order coloring of the graph. (No vertex shares the same color as its distance 2 neighbors.)

1 Vertex coloring assigns a color to each vertex of a graph such that no adjacent vertices have the same color.

Although the chromatic engine is elegant, it is inflexible and requires graph coloring protocols for efficiency. The distributed locking engine uses locks to ensure consistency semantics. For example, the full consistency semantics is ensured by acquiring a write lock on the central vertex and its one-hop neighbors. Vertex consistency semantics requires only a write lock on the specified vertex, whereas edge consistency requires a write lock on the vertex and read locks on the adjacent vertices. Since locking schemes are prone to deadlocks, DG uses machine IDs to impose a canonical order on the lock acquisition, which results in deadlock avoidance. Moreover, DG uses optimization to reduce locking overheads:

• Caching the ghost information allows valid cache accesses for unmodified entries.

• Batching the synchronization/lock requests allows a machine to request several locks across scopes simultaneously.

Another important issue in a distributed setting that DG must address is fault tolerance. DG provides an asynchronous check-pointing mechanism based on the classic Chandy-Lamport algorithm (Chandy and Lamport 1985). DG has been evaluated by building an alternating least squares (ALS)-based collaborative filtering for Netflix movie recommendations, a video co-segmenting system based on the Loopy Belief Propagation and Gaussian Mixture Model algorithms, and a CoEM algorithm based on the Named Entity Recognition application. They have been able to show significant speedups over Hadoop, matching an equivalent handcrafted MPI realization.

PowerGraph (PG)

A different way of understanding the graph-processing abstraction is by viewing any graph computation as comprising three possible phases/steps: Gather, Apply, and Scatter (GAS). The Gather phase is when a vertex collects information about neighbors (their page ranks, for instance), and the Apply phase is when the computation of the vertex is run using the values read in the Gather phase. The last phase is a Scatter phase, in which the information about this vertex (its modified page rank, for example) is sent out to its neighbors. It must be noted that the superstep of Pregel is run after the Apply phase, whereas the Gather phase is realized using message passing (and message combiners for optimization). The Apply and Scatter phases are both realized inside the user-defined vertex function. Contrastingly, the Gather phase of GraphLab is realized asynchronously—the vertex function can access values of the neighboring vertices as and when it is required through the shared-memory abstraction. PowerGraph (PG) explicitly allows users to define all three of the GAS phases from inside the vertex program. Important to note is that it allows the vertex program to be distributed over the cluster of nodes for efficiency. The question that might arise in the reader’s mind is, “Why should the vertex program be distributed?”

The answer to this question also provides the exact motivation for PG. Many real-life graphs, such as Facebook/LinkedIn/Twitter graphs, are natural graphs and follow a power law distribution. This means that there are many vertices with very few connections, while very few vertices have a large number of connections. Popular icons might be followed by millions of followers on Twitter, for example, whereas the ordinary user might have only hundreds of followers. (According to this report, the average number of followers per Twitter user is 208: http://expandedramblings.com/index.php/march-2013-by-the-numbers-a-few-amazing-twitter-stats/#.Ut4RS9K6aIU.) The power law distribution implies significant skew, implying that the usual way of distributing the graph over the network (using random hashing, as in Pregel/GraphLab) might be inefficient—the computation/communication/storage on the high-degree vertices might take too much overhead, and low-degree vertices will have much less overhead. Note that this is the answer to our question of why the vertex program might have to be partitioned across the cluster.

Vertex programs in PG should implement a GASVertex Program interface and explicitly define the gather, sum, apply, and scatter functions. The gather and sum are used to collect information on neighbors, similar to map and reduce. The gather function is invoked on the set of edges adjacent to the central vertex. The gather_nbrs parameter determines the appropriate edges—it can have values none, in, out, and all. The result of the gather function is accumulated using a commutative and associative sum operator into a temporary accumulator, which is passed to the Apply phase. The apply function uses this accumulator and computes the value for this vertex—the complexity of this function and the size of the accumulator determine scalability. The scatter function is invoked in parallel on the adjacent edges, as determined by the scatter_nbrs parameter, similar to gather_nbrs.

The PG maintains a set of active vertices on which to execute the vertex program. The order is not guaranteed, with only a guarantee that the vertex program executes on all active vertices eventually. This allows PG to balance determinism and parallelism—it must be noted that a completely deterministic program can be sequential, limiting the parallelism and hence efficiency. Both synchronous (BSP-like) and asynchronous (GraphLab-like) execution is supported by PG. In synchronous mode, PG executes vertex programs synchronously on all active vertices, with a barrier at the end. The synchronous mode is similar to the operation of Pregel and leads to deterministic execution, often with limited efficiency. PG enforces serializability of the vertex program in asynchronous mode, unlike Piccolo, which is purely asynchronous and could lead to nondeterminism. PG addresses the problem of inefficient locking in GraphLab by introducing the notion of parallel locking. Whereas GraphLab used the approach of Dijkstra (2002) for sequentially acquiring locks, PG realizes the implementation of Chandy and Misra (1984) for parallel locking.

An important contribution of PG is the approach for partitioning the graph. Traditionally, the graph partitioning has been based on edge-cut, a process of partitioning the vertices evenly across the nodes and minimizing the cross-cutting edges. This has been shown to perform rather poorly for natural power law graphs compared to multilevel schemes (Abou-Rjeili and Karypis 2006). Multilevel schemes construct an approximation of the original graph (a process known as coarsening) whose size is much smaller than the original graph. They then partition this smaller graph (even simpler algorithms can be good due to the small size of this graph). The final phase is a refinement phase, in which the solution is projected to the final graph in a series of successive refinements. The approach of PG for partitioning is to evenly partition the edges of the graph among the set of nodes (using a random hashing or a greedy approach), allowing vertices to span machines in the cluster. Edge data is stored exactly once, implying that change in edge data would be local. However, changes in vertex data would need synchronization across the set of machines spanned by that vertex. The replicas of a vertex are synchronized using a master–slave scheme with a randomly assigned master as the coordinator and having exclusive write access. Slaves have read-only copies of the vertex data.

Consequently, PG is able to reduce the communication/computation imbalance in partitioning power law graphs and outperforms both GraphLab and Pregel. A set of independent performance studies published by Elser and Montresor (2013) helps us to evaluate the performance of graph-processing frameworks. They have implemented a distributed solution of the K-core decomposition problem (Montresor et al. 2011) on Apache Hadoop over the MR paradigm, as well as on top of Stratosphere, over Apache Hama, over Giraph, and over GraphLab using the GAS interface. The results have shown that the Apace Hadoop implementation was the slowest for most file sizes, with GraphLab being the fastest framework. The remaining frameworks (Giraph, Hama, and Stratosphere) come somewhere in between the two extremes. This is especially true for large graphs of more than billions of vertices. For smaller graphs, Stratosphere was the fastest. However, another point to note is that only the Apache Hadoop implementation tolerates failures and recovers from errors. Among the other frameworks, GraphLab and Stratosphere have support to detect node failures, but are not able to recover from failures at this point. We ran some experiments to verify the fault tolerance of GraphLab programs. We found that the distributed snapshot algorithms help in restarting the computation, but there is no support currently for auto-restart of computations—one has to manually restart the computation using the snapshots that are stored. Another independent study has shown that GraphLab has lower horizontal scalability due to its single file graph-loading process, compared to other platforms such as Giraph or Stratosphere. This study was published in Super Computing 2013 (Guo et al. 2013).

Realizing the Page Rank Algorithm in GraphLab

Any program on the top of GraphLab can be implemented in the following way:

1. Define the data stored in the vertex and edge.

2. Define the type of graph.

3. Load the graph.

4. Finalize the graph.

5. Write the vertex program.

Let us take a sample program, PageRank, and see the workflow of the program. GraphLab provides us with a data structure named distributed_graph. It is defined in the namespace of GraphLab. graphlab::distributed_graph is templatized with two template arguments:

VertexData:The type of data to be stored in vertices

EdgeData:The type of data to be stored in edges

Specific to our program page rank, we will define a structure describing a web page that consists of the page rank and page name. This structure can be considered as the vertex. For serialization, we also include save and load functions:

• The save function’s signature is save (graphlab::oarchive& oarc). This function writes the page rank and page name into the output archive object. The object of oarchive will write to ostream if provided with a reference.

• The load function is of void type and is vice versa of save.

Edge data can be considered as empty since we don’t require anything to be stored in edge.

After defining vertex data and edge data, using typedef, we define the graph:

typedef graphlab::distributed_graph<web_page,
graphlab::empty> graph_type

Then we have to populate the graph. We can use hard-core values if the number of vertices is considerably less. Otherwise, we can define a line parser. The line parser has the following signature:

bool line_parser(graph_type& graph, const std::string&
filename, const std::string& textline)

The graph is given as input to this function in the form of a text file. Let us consider that we have the identity of the vertex first and then the name of the page, followed by the links to which the page is connected in a single line. We will parse each single line and use some of the functions that GraphLab provides to transfer the data into physical existence of the graph. These functions are add_vertex and add_edge:

add_vertex is defined in distributed graph and has vertex_id_type& vid and VertexData as its parameters and creates a vertex having vertex id and containing the vertex data for us. Each vertex can be added exactly once. It is a Boolean function and it returns true if it succeeds and -1 if it fails. vertex_id_type is the vertex identifier type.

add_edge is defined in distributed graph and has the signature add_edge( vertex_id_type source, vertex_id_type target, EdgeData& edata). This function creates the edge between source and target. It is of Boolean type. It returns true on success and false if it is a self-edge and -1 if we are trying to create a vertex.

After defining the parser, we have to load graph data from the file. This is done by the following function:

Graphlab::distributed_graph::load(std::string path, line_
parser_type line_parser)

The load() assumes each line in the file is independent and passes the line to the line parser function defined by us. We can make multiple calls to the load function on agreeing to the constraint that each vertex is added not more than once.

Vertex Program

The vertex program is the key to GraphLab. It is the primary user-defined computation. The vertex program is said to have three phases, namely, Gather (G), Apply (A), and Scatter (S). As per the founders of GraphLab, “A unique instance of the vertex program is run on each vertex in the graph and can interact with the neighboring vertex programs through the gather and scatter functions as well as by signaling neighboring vertex-programs.”

In the Gather phase, the vertex program is called on each edge of the vertex’s adjacent edges. In the Apply phase, the values returned by gather are added on each vertex. Finally, the responsibilities of the Scatter phase are updating edge data, signaling (messaging) adjacent vertices, and updating the gather cache state when caching is enabled. The scatter function is similar to the Gather phase except that nothing is returned here.

Coming to implementation of these concepts in our page rank program, we can define these functions in a class that extends graphlab::ivertex_program<graph_type, double (return type of gather)> and graphlab::IS_POD_TYPE.

Inheriting from Graphlab::IS_POD_TYPE will test whether T is a Plain Old Data (POD) type and will force the serializer to treat the derived type as POD type.

The class is defined as

class pagerank_program : public graphlab::ivertex_
program<graph_type, double>, public graphlab::IS_POD_TYPE

Inside this class, we will now define the gather function with the return type as double and arguments of icontext_type and vertex_type:

icontext_type: This is a class whose object acts like a mediator between the vertex program and the GraphLab execution environment. Each of the vertex program methods is passed as a reference to the engine’s context. The context is similar to the Spark context we have explained before; it allows the vertex program to access information about the current context and send information to the other vertices. This class is templatized over graph type, gather type, and message type.

vertex_type: This is the vertex object that provides access to the vertex data and information about the vertex.

The gather function, which returns gather_type (double in our page rank example), has a reference to context type, constant reference to vertex type, and reference to edge type as arguments:

• The edge_type represents an edge in the graph and provides access to the data associated with that edge, as well as the source and target distributed::vertex_type objects.

• The context, as discussed, is used to interact with the engine.

• We must also specify the vertex on which the computation is to be run.

In short, the gather function holds the responsibility of collecting information about adjacent vertices and edges.

The apply function is of void type and has reference to the context type, vertex type, and gather type as arguments. This function is invoked when the Gather phase has completed. After gathering information from adjacent vertices and edges, the apply function applies all the information on vertex data and hence the vertex data is modified.

The scatter function is similar to the gather function but returns nothing (only in this case, as the propagation of page ranks occur in asynchrony through the gather function).

Let us discuss these functions specific to page rank.

In Gather Phase

On each vertex, the Gather phase is called to get the information of adjacent vertices’ page rank. Since we are bothered about incoming links to each web page in calculating page rank, we take the page rank of incoming links only. According to the page rank algorithm, the following part is computed in the Gather phase. The gather function is executed on each incoming edge of each vertex:

Vertex v; i=0;
    For each INEDGE E:
       i = i+ (E.Page rank)/E.NUMBER_OF_OUT_EDGES)
    End

We will return each computation in the for loop and the engine will sum it up for us.

In Apply Phase

PageRank of v = 0.85*acc+0.15.

The preceding computation is done and the page rank of each vertex is updated. That is, the vertex data is written. The returned information of the gather function is summed up in the Gather phase and the data on the vertex is updated in the Apply phase. We do not really need the Scatter phase in this example.

Main():

After having coded all the necessary functions, we will use GraphLab APIs in the main function to get in contact with the engine provided by the GraphLab.

We have to initialize the distributed communication layer. This layer acts as a primary means of communication between GraphLab processes in distributed mode. As an example of the use of distributed object, say dc is dc.cout(), which provides a wrapper around standard cout.

After calling the load function in main(), we will finalize the graph. After the graph has been finalized, no more changes are made to it. Next is the computation of page rank on the graph we created just now. For computation, we will now require the initialization of the engine.

GraphLab has an asynchronous consistent engine and a synchronous engine. The omni engine helps the user select which engine to use, which is passed as an argument to the omni engine while it is being initialized. The omni engine is a templatized class with the vertex program as typename. It is initialized as

graphlab::omni_engine<pagerank_program> engine(dc, graph,
"async")

This will select the asynchronous engine. A brief description of the engine is that it executes vertex programs asynchronously and can ensure that adjacent vertices are never executed simultaneously. The engine initializes vertex program, vertex data. Then the engine spawns the threads whose number depends on our CPUs. Each core has a thread. Each thread does the following tasks:

• Extracts the next task from the scheduler. We have many options for schedulers, such as first in, first out (FIFO) schedulers, multiqueued, chromatic, and round robin schedulers.

• Applies locks on each vertex where it is ensured that no adjacent vertex is executed at the same time.

• Executes the Gather phase.

• Executes the Apply phase.

• Executes the Scatter phase.

• Releases the locks.

The full code of the page rank algorithm in GraphLab is given in Appendix A, “Code Sketches.”

Realizing Stochastic Gradient Descent in GraphLab

This document is the description of the working semantics of sgd.cpp, which is located in home_folder_of_graphlab/toolkits/collobarative_filtering/. Let us consider the problem Q(x) = ΣQi(x). Here Q is an objective function that has the sum of functions that completely depend on the ith observation of the training data set.

Consider risk minimization problems. Here Qi(x) corresponds to the loss function at the ith observation. Q(x) is the empirical risk, which is to be minimized. To optimize the empirical risk, we use the first-order optimization algorithm, that is, gradient descent. In stochastic gradient algorithm, Q(x) is approximated at a single example,

x := x – γQi(x)

where γ is the learning rate.

When we feed the algorithm with training data, we iterate through every data set.

Implementation:

We will follow the same approach in implementing the SGD method.

In this program, we need to use the Eigen library for the computations involving matrices and vectors. In the graph we are constructing, let us define vertex data to be the vector of type VectorXd defined in the Eigen library. Each row and each column in the input matrix corresponds to different vertices.

Our problem is to predict the ratings of users to a particular movie. The input data is of Train, Validate, and Predict types. Each vector is represented by a vector of parameters. We have to find out these latent parameters—in other words, we have to find each “x” (described previously) to minimize the empirical loss.

Vertex data is a structure consisting of a constant that specifies the number of latent values to use—a vector of latent parameters that is specific to the vertex. The structure also consists of a constructor that randomizes the vertex data and functions to load and save data from the binary archive.

The structure edge_data stores the entry in the matrix and also stores the most recent error estimate. An enum is defined for the type of the data on the edge. After defining edge data and vertex data, we can now define the graph.

Let us discuss the parser function, which returns the Boolean type in distributed graph construction. It takes the type of graph, constant reference to string, and constant reference to string again as input arguments. This function first checks the role of data, that is, Train, Validate, or Predict. A string stream is created to parse the line and the source and target vertex_id_type is initialized. The predicted rating is intialized to 0. For training and validate data sets, the predicted value is read, and for the predicted data set, this step is barred. On success, true is returned.

For successful implementation of this algorithm, we also need to define some functions, including these:

get_other_vertex() of return type graph_type::vertex_type has reference to edge_type and vertex_type as its arguments. If given a vertex and an edge, it returns the other vertex of that edge.

extract_12_error() is of return type double and has a constant reference to edge_type as its only argument. It computes the error message associated with that age.

Then we need to define the class gather_type, which has a constructor that initializes the latent parameters vector. We also have save and load functions defined in the class to save and read the values from the binary archive. We will also define the operator += in the class which takes constant reference to the same object of the same class as an argument. It basically adds the vector present in the object.

After learning the previous information, we can now discuss the vertex program. The vertex program has been explained previously. As discussed earlier, the class sgd_vertex_program extends the graphlab::ivertex_program<graph_type, gather_type, message_type> where message_type is defined to be vec_type; that is, Eigen vector.

We will define the convergence tolerance, lambda, gamma, MAXVAL, and MINVAL, which are static and of double type. MINVAL and MAXVAL give the range of predicted values, gamma is the learning rate, lambda is used for the calculation of change in gradient, and convergence tolerance is the convergence criteria.

We also define the function gather_edges, which returns gather_type and has reference to context and constant reference to vertex type as its arguments. It returns all the edges of a particular vertex.

Gather Phase:

The Gather phase is implemented by the function gather with reference to context type, edge type, and constant reference to vertex type. In this phase, we take a particular user node and get a copy of the item node. We will do the dot-product of user and item node to find the prediction. The error is computed between the predicted value and the edge data.

An init function is defined to store the change of the gradient of an item node which is to be applied in the apply() function.

Apply Phase:

For training data, we will update the linear model here.

Updating is in the sense that we compute/change the gradient of the user node and item node. For the user node, we will update the gradient using the cumulative sum of gradient updates that are computed in gather. And for the item node, we will update using the received sum from the init function.

Scatter Phase:

In the Scatter phase, we want to reschedule neighbors. For this, we will define a function that returns all edges of the vertex present in the context type. In this regard, we have to discuss the signal function defined graphlab::icontext. This function signals a vertex with a particular message. The signal function is used to notify neighboring vertices when it warrants future computation on those vertices. (For example, there is a significant change in the page rank and the neighbor’s page ranks have to be recomputed.) In the SGD vertex program, we also have a function to signal all vertices on one side of the bipartite function.

Main():

The main() function that uses GraphLab APIs is almost similar to that of the main function explained in the page rank algorithm.

The full code of the SGD is given in Appendix A.

References

Abou-Rjeili, Amine, and George Karypis. 2006. “Multilevel Algorithms for Partitioning Power-Law Graphs.” In Proceedings of the 20th International Conference on Parallel and Distributed Processing (IPDPS ‘06). IEEE Computer Society, Washington, DC, 124-124.

Chan, Albert, Frank K. H. A. Dehne, and Ryan Taylor. 2005. “CGMGRAPH/CGMLIB: Implementing and Testing CGM Graph Algorithms on PC Clusters and Shared Memory Machines.” IJHPCA 19(1):81-97.

Chandy, K. M., and J. Misra. 1984. “The Drinking Philosophers Problem.” ACM Transactions on Programming Languages and Systems 6(4):632-646.

Chandy, K. Mani, and Leslie Lamport. 1985. “Distributed Snapshots: Determining Global States of Distributed Systems.” ACM Transactions on Computer Systems 3(1):63-75.

Chen, Rishan, Xuetian Weng, Bingsheng He, and Mao Yang. 2010. “Large Graph Processing in the Cloud.” In Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data (SIGMOD ’10). ACM, New York, NY, 1123-1126.

Dijkstra, Edsger W. 2002. “Hierarchical Ordering of Sequential Processes.” In The Origin of Concurrent Programming. Per Brinch Hansen, ed. Springer-Verlag New York, Inc., New York, NY, 198-227.

Elidan, Gal. 2006. “Residual Belief Propagation: Informed Scheduling for Asynchronous Message Passing.” In Proceedings of the Twenty-Second Conference on Uncertainty in Artificial Intelligence. AUAI Press, Arlington, Virginia.

Elser, Benedikt, and Alberto Montresor. 2013. “An Evaluation Study of BigData Frameworks for Graph Processing.” IEEE BigData Conference 2013, Santa Clara, California. IEEE, Washington, DC, 60-67.

Ewen, Stephan, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl. 2012. “Spinning Fast Iterative Data Flows.” Proc. VLDB Endow 5(11):1268-1279.

Gonzalez, J., Y. Low, A. Gretton, and C. Guestrin. 2011. “Parallel Gibbs Sampling: From Colored Fields to Thin Junction Trees.” In Fourteenth International Conference on Artificial Intelligence and Statistics, Fort Lauderdale, FL. JMLR 15:324-332.

Gregor, Douglas, and Andrew Lumsdaine. 2005. “The Parallel BGL: A Generic Library for Distributed Graph Computations.” In Proceedings of Parallel Object-Oriented Scientific Computing (POOSC).

Guo, Yong, Marcin Biczak, Ana Lucia Varbanescu, Alexandru Iosup, Claudio Martella, and Theodore L. Willke. 2013. “Towards Benchmarking Graph-Processing Platforms.” Poster, Super Computing 2013, Denver, Colarado. Available at http://sc13.supercomputing.org/sites/default/files/PostersArchive/tech_posters/post152s2-file2.pdf.

Kang, U., Charalampos E. Tsourakakis, and Christos Faloutsos. 2009. “PEGASUS: A Peta-Scale Graph Mining System Implementation and Observations.” In Proceedings of the 2009 Ninth IEEE International Conference on Data Mining (ICDM ‘09). IEEE Computer Society, Washington, DC, 229-238.

Kang, U., Hanghang Tong, Jimeng Sun, Ching-Yung Lin, and Christos Faloutsos. 2011. “GBASE: A Scalable and General Graph Management System.” In Proceedings of the 17th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD ‘11). ACM, New York, NY, 1091-1099.

Knuth, Donald E. 1993. The Stanford Graphbase: A Platform for Combinatorial Computing. ACM, New York, NY.

Low, Y., J. Gonzalez, A. Kyrola, D. Bickson, C. Guestrin, and J. M. Hellerstein. 2010. “GraphLab: A New Parallel Framework for Machine Learning.” In Proceedings of Uncertainty in Artificial Intelligence (UAI). AUAI Press, Corvallis, Oregon, 340-349.

Mehlhorn, Kurt, Stefan Näher, and Christian Uhrig. 1997. “The LEDA Platform of Combinatorial and Geometric Computing.” In Proceedings of the 24th International Colloquium on Automata, Languages and Programming (ICALP ‘97). Pierpaolo Degano, Roberto Gorrieri, and Alberto Marchetti-Spaccamela, eds. Springer-Verlag, London, UK, 7-16.

Montresor, Alberto, Francesco De Pellegrini, and Daniele Miorandi. 2011. “Distributed k-Core Decomposition.” In Proceedings of the 30th Annual ACM SIGACT-SIGOPS Symposium on Principles of Distributed Computing (PODC ‘11). ACM, New York, NY, 207-208.

Power, Russell, and Jinyang Li. 2010. “Piccolo: Building Fast, Distributed Programs with Partitioned Tables.” In Proceedings of the 9th USENIX Conference on Operating Systems Design and Implementation (OSDI ‘10). USENIX Association, Berkeley, CA, 1-14.

Sakr, Sherif. 2013. “Processing Large-Scale Graph Data: A Guide to Current Technology.” IBM Developerworks.

Salihoglu, Semih, and Jennifer Widom. 2013. “GPS: A Graph Processing System.” In Proceedings of the 25th International Conference on Scientific and Statistical Database Management (SSDBM). Alex Szalay, Tamas Budavari, Magdalena Balazinska, Alexandra Meliou, and Ahmet Sacan, eds. ACM, New York, NY, Article 22, 12 pages.

Seo, Sangwon, Edward J. Yoon, Jaehong Kim, Seongwook Jin, Jin-Soo Kim, and Seungryoul Maeng. 2010. “HAMA: An Efficient Matrix Computation with the MapReduce Framework.” In Proceedings of the 2010 IEEE Second International Conference on Cloud Computing Technology and Science (CLOUDCOM ‘10). IEEE Computer Society, Washington, DC, 721-726.

Valiant, Leslie G. 1990. “A Bridging Model for Parallel Computation.” Communications of the ACM 33(8):103-111.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.72.153