While graph for Scala may be considered a DSL for graph operations and querying, one should go to GraphX for scalability. GraphX is build on top of a powerful Spark framework. As an example of Spark/GraphX operations, I'll use the CMU Enron e-mail dataset (about 2 GB). The actual semantic analysis of the e-mail content is not going to be important to us until the next chapters. The dataset can be downloaded from the CMU site. It has e-mail from mailboxes of 150 users, primarily Enron managers, and about 517,401 e-mails between them. The e-mails may be considered as an indication of a relation (edge) between two people: Each email is an edge between a source (From:
) and a destination (To:
) vertices.
Since GraphX requires the data in RDD format, I'll have to do some preprocessing. Luckily, it is extremely easy with Scala—this is why Scala is the perfect language for semi-structured data. Here is the code:
package org.akozlov.chapter07 import scala.io.Source import scala.util.hashing.{MurmurHash3 => Hash} import scala.util.matching.Regex import java.util.{Date => javaDateTime} import java.io.File import net.liftweb.json._ import Extraction._ import Serialization.{read, write} object EnronEmail { val emailRe = """[a-zA-Z0-9_.+-][email protected]""".r.unanchored def emails(s: String) = { for (email <- emailRe findAllIn s) yield email } def hash(s: String) = { java.lang.Integer.MAX_VALUE.toLong + Hash.stringHash(s) } val messageRe = """(?:Message-ID:s+)(<[A-Za-z0-9_.+-@]+>)(?s)(?:.*?)(?m) |(?:Date:s+)(.*?)$(?:.*?) |(?:From:s+)([a-zA-Z0-9_.+-][email protected])(?:.*?) |(?:Subject: )(.*?)$""".stripMargin.r.unanchored case class Relation(from: String, fromId: Long, to: String, toId: Long, source: String, messageId: String, date: javaDateTime, subject: String) implicit val formats = Serialization.formats(NoTypeHints) def getFileTree(f: File): Stream[File] = f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree) else Stream.empty) def main(args: Array[String]) { getFileTree(new File(args(0))).par.map { file => { "\.$".r findFirstIn file.getName match { case Some(x) => try { val src = Source.fromFile(file, "us-ascii") val message = try src.mkString finally src.close() message match { case messageRe(messageId, date, from , subject) => val fromLower = from.toLowerCase for (to <- emails(message).filter(_ != fromLower).toList.distinct) println(write(Relation(fromLower, hash(fromLower), to, hash(to), file.toString, messageId, new javaDateTime(date), subject))) case _ => } } catch { case e: Exception => System.err.println(e) } case _ => } } } } }
First, we use the MurmurHash3
class to generate node IDs, which are of type Long
, as they are required for each node in GraphX. The emailRe
and messageRe
are used to match the file content to find the required content. Scala allows you to parallelize the programs without much work.
Note the par
call on line 50, getFileTree(new File(args(0))).par.map
. This will make the loop parallel. If processing the whole Enron dataset can take up to an hour even on 3 GHz processor, adding parallelization reduces it by about 8 minutes on a 32-core Intel Xeon E5-2630 2.4 GHz CPU Linux machine (it took 15 minutes on an Apple MacBook Pro with 2.3 GHz Intel Core i7).
Running the code will produce a set of JSON records that can be loaded into Spark (to run it, you'll need to put joda-time and lift-json library jars on the classpath), as follows:
# (mkdir Enron; cd Enron; wget -O - http://www.cs.cmu.edu/~./enron/enron_mail_20150507.tgz | tar xzvf -) ... # sbt --error "run-main org.akozlov.chapter07.EnronEmail Enron/maildir" > graph.json # spark --driver-memory 2g --executor-memory 2g ... scala> val df = sqlContext.read.json("graph.json") df: org.apache.spark.sql.DataFrame = [[date: string, from: string, fromId: bigint, messageId: string, source: string, subject: string, to: string, toId: bigint]
Nice! Spark was able to figure out the fields and types on it's own. If Spark was not able to parse all the records, one would have a _corrupt_record
field containing the unparsed records (one of them is the [success]
line at the end of the dataset, which can be filtered out with a grep -Fv [success]
). You can see them with the following command:
scala> df.select("_corrupt_record").collect.foreach(println) ...
The nodes (people) and edges (relations) datasets can be extracted with the following commands:
scala> import org.apache.spark._ ... scala> import org.apache.spark.graphx._ ... scala> import org.apache.spark.rdd.RDD ... scala> val people: RDD[(VertexId, String)] = df.select("fromId", "from").unionAll(df.select("toId", "to")).na.drop.distinct.map( x => (x.get(0).toString.toLong, x.get(1).toString)) people: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[146] at map at <console>:28 scala> val relationships = df.select("fromId", "toId", "messageId", "subject").na.drop.distinct.map( x => Edge(x.get(0).toString.toLong, x.get(1).toString.toLong, (x.get(2).toString, x.get(3).toString))) relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[(String, String)]] = MapPartitionsRDD[156] at map at <console>:28 scala> val graph = Graph(people, relationships).cache graph: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@7b59aa7b
Node IDs in GraphX
As we saw in Graph for Scala, specifying the edges is sufficient for defining the nodes and the graph. In Spark/GraphX, nodes need to be extracted explicitly, and each node needs to be associated with n id of the Long
type. While this potentially limits the flexibility and the number of unique nodes, it enhances the efficiency. In this particular example, generating node ID as a hash of the e-mail string was sufficient as no collisions were detected, but the generation of unique IDs is usually a hard problem to parallelize.
The first GraphX graph is ready!! It took a bit more work than Scala for Graph, but now it's totally ready for distributed processing. A few things to note: first, we needed to explicitly convert the fields to Long
and String
as the Edge
constructor needed help in figuring out the types. Second, Spark might need to optimize the number of partitions (likely, it created too many):
scala> graph.vertices.getNumPartitions res1: Int = 200 scala> graph.edges.getNumPartitions res2: Int = 200
To repartition, there are two calls: repartition and coalesce. The latter tries to avoid shuffle, as follows:
scala> val graph = Graph(people.coalesce(6), relationships.coalesce(6)) graph: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@5dc7d016 scala> graph.vertices.getNumPartitions res10: Int = 6 scala> graph.edges.getNumPartitions res11: Int = 6
However, this might limit parallelism if one performs computations over a large cluster. Finally, it's a good idea to use cache
method that pins the data structure in memory:
scala> graph.cache res12: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@5dc7d016
It took a few more commands to construct a graph in Spark, but four is not too bad. Let's compute some statistics (and show the power of Spark/GraphX, in the following table:
Computing basic statistics on Enron e-mail graph.
Statistics |
Spark command |
Value for Enron |
---|---|---|
Total # of relations (pairwise communications) |
|
3,035,021 |
Number of e-mails (message IDs) |
|
371,135 |
Number of connected pairs |
|
217,867 |
Number of one-way communications |
|
193,183 |
Number of distinct subject lines |
|
110,273 |
Total # of nodes |
|
23,607 |
Number of destination-only nodes |
|
17,264 |
Number of source-only nodes |
|
611 |
One of the most straightforward ways to estimate people's importance in an organization is to look at the number of connections or the number of incoming and outgoing communicates. The GraphX graph has built-in inDegrees
and outDegrees
methods. To rank the emails with respect to the number of incoming emails, run:
scala> people.join(graph.inDegrees).sortBy(_._2._2, ascending=false).take(10).foreach(println) (268746271,([email protected],18523)) (1608171805,([email protected],15867)) (1578042212,([email protected],13878)) (960683221,([email protected],13717)) (3784547591,([email protected],12980)) (1403062842,([email protected],12082)) (2319161027,([email protected],12018)) (969899621,([email protected],10777)) (1362498694,([email protected],10296)) (4151996958,([email protected],10160))
To rank the emails with respect to the number of egressing emails, run:
scala> people.join(graph.outDegrees).sortBy(_._2._2, ascending=false).take(10).foreach(println) (1578042212,([email protected],139786)) (2822677534,([email protected],106442)) (3035779314,([email protected],94666)) (2346362132,([email protected],90570)) (861605621,([email protected],74319)) (14078526,([email protected],58797)) (2058972224,([email protected],58718)) (871077839,([email protected],57559)) (3852770211,([email protected],50106)) (241175230,([email protected],40425)) …
Let's apply some more complex algorithms to the Enron dataset.
Connected components determine whether the graph is naturally partitioned into several parts. In the Enron relationship graph, this would mean that two or several groups communicate mostly between each other:
scala> val groups = org.apache.spark.graphx.lib.ConnectedComponents.run(graph).vertices.map(_._2).distinct.cache groups: org.apache.spark.rdd.RDD[org.apache.spark.graphx.VertexId] = MapPartitionsRDD[2404] at distinct at <console>:34 scala> groups.count res106: Long = 18 scala> people.join(groups.map( x => (x, x))).map(x => (x._1, x._2._1)).sortBy(_._1).collect.foreach(println) (332133,[email protected]) (81833994,[email protected]) (115247730,[email protected]) (299810291,[email protected]) (718200627,[email protected]) (847455579,[email protected]) (919241773,[email protected]) (1139366119,[email protected]) (1156539970,[email protected]) (1265773423,[email protected]) (1493879606,[email protected]) (1511379835,[email protected]) (2114016426,[email protected]) (2200225669,[email protected]) (2914568776,[email protected]) (2934799198,[email protected]) (2975592118,[email protected]) (3678996795,[email protected])
We see 18 groups. Each one of the groups can be counted and extracted by filtering the ID. For instance, the group associated with <[email protected]>
can be found by running a SQL query on DataFrame:
scala> df.filter("fromId = 919241773 or toId = 919241773").select("date","from","to","subject","source").collect.foreach(println) [2000-09-19T18:40:00.000Z,[email protected],[email protected],NO ACTION REQUIRED - TEST,Enron/maildir/dasovich-j/all_documents/1567.] [2000-09-19T18:40:00.000Z,[email protected],[email protected],NO ACTION REQUIRED - TEST,Enron/maildir/dasovich-j/notes_inbox/504.]
This group is based on a single e-mail sent on September 19, 2000, from <[email protected]>
to <etc.survey@enron>
. The e-mail is listed twice, only because it ended up in two different folders (and has two distinct message IDs). Only the first group, the largest subgraph, contains more than two e-mail addresses in the organization.
The triangle counting algorithm is relatively straightforward and can be computed in the following three steps:
We need to convert the multigraph to an undirected graph with srcId < dstId
, which is a precondition for the algorithm:
scala> val unedges = graph.edges.map(e => if (e.srcId < e.dstId) (e.srcId, e.dstId) else (e.dstId, e.srcId)).map( x => Edge(x._1, x._2, 1)).cache unedges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[87] at map at <console>:48 scala> val ungraph = Graph(people, unedges).partitionBy(org.apache.spark.graphx.PartitionStrategy.EdgePartition1D, 10).cache ungraph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@77274fff scala> val triangles = org.apache.spark.graphx.lib.TriangleCount.run(ungraph).cache triangles: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6aec6da1 scala> people.join(triangles.vertices).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println) (31761,[email protected]) (24101,[email protected]) (23522,[email protected]) (21694,[email protected]) (20847,[email protected]) (18460,[email protected]) (17951,[email protected]) (16929,[email protected]) (16390,[email protected]) (16197,[email protected])
While there is no direct relationship between the triangle count and the importance of people in the organization, the people with higher triangle count arguably are more social—even though a clique or a strongly connected component count might be a better measure.
In the mathematical theory of directed graphs, a subgraph is said to be strongly connected if every vertex is reachable from every other vertex. It could happen that the whole graph is just one strongly connected component, but on the other end of the spectrum, each vertex could be its own connected component.
If you contract each connected component to a single vertex, you get a new directed graph that has a property to be without cycles—acyclic.
The algorithm for SCC detection is already built into GraphX:
scala> val components = org.apache.spark.graphx.lib.StronglyConnectedComponents.run(graph, 100).cache components: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@55913bc7 scala> components.vertices.map(_._2).distinct.count res2: Long = 17980 scala> people.join(components.vertices.map(_._2).distinct.map( x => (x, x))).map(x => (x._1, x._2._1)).sortBy(_._1).collect.foreach(println) (332133,[email protected]) (466265,[email protected]) (471258,[email protected]) (497810,[email protected]) (507806,[email protected]) (639614,[email protected]) (896860,[email protected]) (1196652,[email protected]) (1240743,[email protected]) (1480469,[email protected]) (1818533,[email protected]) (2337461,[email protected]) (2918577,[email protected])
There are 18,200 strongly connected components with only an average 23,787/18,200 = 1.3 users per group.
The PageRank algorithm gives us an estimate of how important a person by analysing the links, which are the emails in this case. For example, let's run PageRank on Enron email graph:
scala> val ranks = graph.pageRank(0.001).vertices ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[955] at RDD at VertexRDD.scala:57 scala> people.join(ranks).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println) scala> val ranks = graph.pageRank(0.001).vertices ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[955] at RDD at VertexRDD.scala:57 scala> people.join(ranks).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println) (32.073722548483325,[email protected]) (29.086568868043248,[email protected]) (28.14656912897315,[email protected]) (26.57894933459292,[email protected]) (25.865486865014493,[email protected]) (23.86746232662471,[email protected]) (22.489814482022275,[email protected]) (21.968039409295585,[email protected]) (20.903053536275547,[email protected]) (20.39124651779771,[email protected])
Ostensibly, these are the go-to people. PageRank tends to emphasize the incoming edges, and Tana Jones returns to the top of the list compared to the 9th place in the triangle counting.
SVD++ is a recommendation engine algorithm, developed specifically for Netflix competition by Yahuda Koren and team in 2008—the original paper is still out there in the public domain and can be Googled as kdd08koren.pdf
. The specific implementation comes from the .NET MyMediaLite library by ZenoGarther (https://github.com/zenogantner/MyMediaLite), who granted Apache 2 license to the Apache Foundation. Let's assume I have a set of users (on the left) and items (on the right):
The preceding diagram is a graphical representation of the recommendation problem. The nodes on the left represent users. The nodes on the right represent items. User 1 recommends items A and C, while users 2 and 3 recommend only a single item A. The rest of the edges are missing. The common question is to find recommendation ranking of the rest of the items, the edges may also have a weight or recommendation strength attached to them. The graph is usually sparse. Such graph is also often called bipartite, as the edges only go from one set of nodes to another set of nodes (the user does not recommend another user).
For the recommendation engine, we typically need two types of nodes—users and items. The recommendations are based on the rating matrix of (user, item, and rating) tuples. One of the implementation of the recommendation algorithm is based on Singular Value Decomposition (SVD) of the preceding matrix. The final scoring has four components: the baseline, which is the sum of average for the whole matrix, average for the users, and average for the items, as follows:
Here, the , , and can be understood as the averages for the whole population, user (among all user recommendations), and item (among all the users). The final part is the Cartesian product of two rows:
The problem is posed as a minimization problem (refer to Chapter 4, Supervised and Unsupervised Learning):
Here, is a regularization coefficient also discussed in Chapter 4, Supervised and Unsupervised Learning. So, each user is associated with a set of numbers (, and each item with , . In this particlar implementation, the optimal coefficients are found by gradient descent. This is the basic of SVD optimization. In linear algebra, SVD takes an arbitrary matrix A and represents it as a product of an orthogonal matrix U, a diagonal matrix , and a unitary matrix V, for example, the columns are mutually orthogonal. Arguably, if one takes the largest entries of the matrix, the product is reduced to the product of a very tall matrix and a wide matric, where is called the rank of decomposition. If the remaining values are small, the new numbers approximate the original numbers for the relation, A. If m and n are large to start with, and in practical online shopping situations, m is the items and can be in hundreds of thousands, and n is the users and can be hundreds of millions, the saving can be substantial. For example, for r=10, m=100,000, and n=100,000,000, the savings are as follows:
SVD can also be viewed as PCA for matrices with . In the Enron case, we can treat senders as users and recipients as items (we'll need to reassign the node IDs), as follows:
scala> val rgraph = graph.partitionBy(org.apache.spark.graphx.PartitionStrategy.EdgePartition1D, 10).mapEdges(e => 1).groupEdges(_+_).cache rgraph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@2c1a48d6 scala> val redges = rgraph.edges.map( e => Edge(-e.srcId, e.dstId, Math.log(e.attr.toDouble)) ).cache redges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[57] at map at <console>:36 scala> import org.apache.spark.graphx.lib.SVDPlusPlus import org.apache.spark.graphx.lib.SVDPlusPlus scala> implicit val conf = new SVDPlusPlus.Conf(10, 50, 0.0, 10.0, 0.007, 0.007, 0.005, 0.015) conf: org.apache.spark.graphx.lib.SVDPlusPlus.Conf = org.apache.spark.graphx.lib.SVDPlusPlus$Conf@15cdc117 scala> val (svd, mu) = SVDPlusPlus.run(redges, conf) svd: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double] = org.apache.spark.graphx.impl.GraphImpl@3050363d mu: Double = 1.3773578970633769 scala> val svdRanks = svd.vertices.filter(_._1 > 0).map(x => (x._2._3, x._1)) svdRanks: org.apache.spark.rdd.RDD[(Double, org.apache.spark.graphx.VertexId)] = MapPartitionsRDD[1517] at map at <console>:31 scala> val svdRanks = svd.vertices.filter(_._1 > 0).map(x => (x._1, x._2._3)) svdRanks: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[1520] at map at <console>:31 scala> people.join(svdRanks).sortBy(_._2._2, ascending=false).map(x => (x._2._2, x._2._1)).take(10).foreach(println) (8.864218804309887,[email protected]) (5.935146713012661,[email protected]) (5.740242927715701,[email protected]) (5.441934324464593,[email protected]) (4.910272928389445,[email protected]) (4.701529779800544,[email protected]) (4.4046392452058045,[email protected]) (4.374738019256556,[email protected]) (4.303078586979311,[email protected]) (3.8295412053860867,[email protected])
The svdRanks
is the user-part of the prediction. The distribution lists take a priority as this is usually used for mass e-mailing. To get the user-specific part, we need to provide the user ID:
scala> import com.github.fommil.netlib.BLAS.{getInstance => blas} scala> def topN(uid: Long, num: Int) = { | val usr = svd.vertices.filter(uid == -_._1).collect()(0)._2 | val recs = svd.vertices.filter(_._1 > 0).map( v => (v._1, mu + usr._3 + v._2._3 + blas.ddot(usr._2.length, v._2._1, 1, usr._2, 1))) | people.join(recs).sortBy(_._2._2, ascending=false).map(x => (x._2._2, x._2._1)).take(num) | } topN: (uid: Long, num: Int)Array[(Double, String)] scala> def top5(x: Long) : Array[(Double, String)] = topN(x, 5) top5: (x: Long)Array[(Double, String)] scala> people.join(graph.inDegrees).sortBy(_._2._2, ascending=false).map(x => (x._1, x._2._1)).take(10).toList.map(t => (t._2, top5(t._1).toList)).foreach(println) ([email protected],List((4.866184418005094E66,[email protected]), (3.9246829664352734E66,[email protected]), (3.9246829664352734E66,[email protected]), (3.871029763863491E66,[email protected]), (3.743135924382312E66,[email protected]))) ([email protected],List((2.445163626935533E66,[email protected]), (1.9584692804232504E66,[email protected]), (1.9105427465629028E66,[email protected]), (1.9105427465629028E66,[email protected]), (1.8931872324048717E66,[email protected]))) ([email protected],List((2.8924566115596135E66,[email protected]), (2.3157345904446663E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2385865127706285E66,[email protected]))) ([email protected],List((6.1758464471309754E66,[email protected]), (5.279291610047078E66,[email protected]), (4.967589820856654E66,[email protected]), (4.909283344915057E66,[email protected]), (4.869177440115682E66,[email protected]))) ([email protected],List((5.7702834706832735E66,[email protected]), (4.703038082326939E66,[email protected]), (4.703038082326939E66,[email protected]), (4.579565962089777E66,[email protected]), (4.4298763869135494E66,[email protected]))) ([email protected],List((9.198688613290757E67,[email protected]), (8.078107057848099E67,[email protected]), (6.922806078209984E67,[email protected]), (6.787266892881456E67,[email protected]), (6.420473603137515E67,[email protected]))) ([email protected],List((1.302856119148208E66,[email protected]), (1.0678968544568682E66,[email protected]), (1.031255083546722E66,[email protected]), (1.009319696608474E66,[email protected]), (9.901391892701356E65,[email protected]))) ([email protected],List((9.770393472845669E65,[email protected]), (7.97370292724488E65,[email protected]), (7.97370292724488E65,[email protected]), (7.751983820970696E65,[email protected]), (7.500175024539423E65,[email protected]))) ([email protected],List((6.856103529420811E65,[email protected]), (5.611272903720188E65,[email protected]), (5.611272903720188E65,[email protected]), (5.436280144720843E65,[email protected]), (5.2621103015001885E65,[email protected]))) ([email protected],List((5.0579114162531735E65,[email protected]), (4.136838933824579E65,[email protected]), (4.136838933824579E65,[email protected]), (4.0110663808847004E65,[email protected]), (3.8821438267917902E65,[email protected]))) scala> people.join(graph.outDegrees).sortBy(_._2._2, ascending=false).map(x => (x._1, x._2._1)).take(10).toList.map(t => (t._2, top5(t._1).toList)).foreach(println) ([email protected],List((2.8924566115596135E66,[email protected]), (2.3157345904446663E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2385865127706285E66,[email protected]))) ([email protected],List((3.135142195254243E65,[email protected]), (3.135142195254243E65,[email protected]), (2.773512892785554E65,[email protected]), (2.350799070225962E65,[email protected]), (2.2055288158758267E65,[email protected]))) ([email protected],List((5.773492048248794E66,[email protected]), (5.067434612038159E66,[email protected]), (4.389028076992449E66,[email protected]), (4.1791711984241975E66,[email protected]), (4.009544764149938E66,[email protected]))) ([email protected],List((2.834710591578977E68,[email protected]), (2.488253676819922E68,[email protected]), (2.1516048969715738E68,[email protected]), (2.0405329247770104E68,[email protected]), (1.9877213034021861E68,[email protected]))) ([email protected],List((3.453167402163105E64,[email protected]), (3.208849221485621E64,[email protected]), (3.208849221485621E64,[email protected]), (3.0374270093157086E64,[email protected]), (2.886581252384442E64,[email protected]))) ([email protected],List((5.1729089729525785E66,[email protected]), (4.220843848723133E66,[email protected]), (4.220843848723133E66,[email protected]), (4.1044435240204605E66,[email protected]), (3.9709951893268635E66,[email protected]))) ([email protected],List((2.513139130001457E65,[email protected]), (2.1037756300035247E65,[email protected]), (2.0297519350719265E65,[email protected]), (1.9587139280519927E65,[email protected]), (1.947164483486155E65,[email protected]))) ([email protected],List((4.516267307013845E66,[email protected]), (3.653408921875843E66,[email protected]), (3.653408921875843E66,[email protected]), (3.590298037045689E66,[email protected]), (3.471781765250177E66,[email protected]))) ([email protected],List((2.0719309635087482E66,[email protected]), (1.732651408857978E66,[email protected]), (1.732651408857978E66,[email protected]), (1.6348480059915056E66,[email protected]), (1.5880693846486309E66,[email protected]))) ([email protected],List((5.596589595417286E66,[email protected]), (4.559474243930487E66,[email protected]), (4.559474243930487E66,[email protected]), (4.4421474044331
Here, we computed the top five recommended e-mail-to list for top in-degree and out-degree users.
SVD has only 159 lines of code in Scala and can be the basis for some further improvements. SVD++ includes a part based on implicit user feedback and item similarity information. Finally, the Netflix winning solution had also taken into consideration the fact that user preferences are time-dependent, but this part has not been implemented in GraphX yet.
18.191.61.243