Let's halt at MLlib that complements other NLP libraries written in Scala. MLlib is primarily important because of scalability, and thus supports a few of the data preparation and text processing algorithms, particularly in the area of feature construction (http://spark.apache.org/docs/latest/ml-features.html).
Although the preceding analysis can already give a powerful insight, the piece of information that is missing from the analysis is term frequency information. The term frequencies are relatively more important in information retrieval, where the collection of documents need to be searched and ranked in relation to a few terms. The top documents are usually returned to the user.
TF-IDF is a standard technique where term frequencies are offset by the frequencies of the terms in the corpus. Spark has an implementation of the TF-IDF. Spark uses a hash function to identify the terms. This approach avoids the need to compute a global term-to-index map, but can be subject to potential hash collisions, the probability of which is determined by the number of buckets of the hash table. The default feature dimension is 2^20=1,048,576.
In the Spark implementation, each document is a line in the dataset. We can convert it into to an RDD of iterables and compute the hashing by the following code:
scala> import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.HashingTF scala> import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vector scala> val hashingTF = new HashingTF hashingTF: org.apache.spark.mllib.feature.HashingTF = org.apache.spark.mllib.feature.HashingTF@61b975f7 scala> val documents: RDD[Seq[String]] = sc.textFile("shakepeare").map(_.split("\W+").toSeq) documents: org.apache.spark.rdd.RDD[Seq[String]] = MapPartitionsRDD[263] at map at <console>:34 scala> val tf = hashingTF transform documents tf: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[264] at map at HashingTF.scala:76
When computing hashingTF
, we only need a single pass over the data, applying IDF needs two passes: first to compute the IDF vector and second to scale the term frequencies by IDF:
scala> tf.cache res26: tf.type = MapPartitionsRDD[268] at map at HashingTF.scala:76 scala> import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.feature.IDF scala> val idf = new IDF(minDocFreq = 2) fit tf idf: org.apache.spark.mllib.feature.IDFModel = org.apache.spark.mllib.feature.IDFModel@514bda2d scala> val tfidf = idf transform tf tfidf: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[272] at mapPartitions at IDF.scala:178 scala> tfidf take(10) foreach println (1048576,[3159,3543,84049,582393,787662,838279,928610,961626,1021219,1021273],[3.9626355004005083,4.556357737874695,8.380602528651274,8.157736974683708,11.513471982269106,9.316247404932888,10.666174121881904,11.513471982269106,8.07948477778396,11.002646358503116]) (1048576,[267794,1021219],[8.783442874448122,8.07948477778396]) (1048576,[0],[0.5688129477150906]) (1048576,[3123,3370,3521,3543,96727,101577,114801,116103,497275,504006,508606,843002,962509,980206],[4.207164322003765,2.9674322162952897,4.125144122691999,2.2781788689373474,2.132236195047438,3.2951341639027754,1.9204575904855747,6.318664992090735,11.002646358503116,3.1043838099579815,5.451238364272918,11.002646358503116,8.43769700104158,10.30949917794317]) (1048576,[0,3371,3521,3555,27409,89087,104545,107877,552624,735790,910062,943655,962421],[0.5688129477150906,3.442878442319589,4.125144122691999,4.462482535201062,5.023254392629403,5.160262034409286,5.646060083831103,4.712188947797486,11.002646358503116,7.006282204641219,6.216822672821767,11.513471982269106,8.898512204232908]) (1048576,[3371,3543,82108,114801,149895,279256,582393,597025,838279,915181],[3.442878442319589,2.2781788689373474,6.017670811187438,3.8409151809711495,7.893585399642122,6.625632265652778,8.157736974683708,10.414859693600997,9.316247404932888,11.513471982269106]) (1048576,[3123,3555,413342,504006,690950,702035,980206],[4.207164322003765,4.462482535201062,3.4399651117812313,3.1043838099579815,11.513471982269106,11.002646358503116,10.30949917794317]) (1048576,[0],[0.5688129477150906]) (1048576,[97,1344,3370,100898,105489,508606,582393,736902,838279,1026302],[2.533299776544098,23.026943964538212,2.9674322162952897,0.0,11.225789909817326,5.451238364272918,8.157736974683708,10.30949917794317,9.316247404932888,11.513471982269106]) (1048576,[0,1344,3365,114801,327690,357319,413342,692611,867249,965170],[4.550503581720725,23.026943964538212,2.7455719545259836,1.9204575904855747,8.268278849083533,9.521041817578901,3.4399651117812313,0.0,6.661441718349489,0.0])
Here we see each document represented by a set of terms and their scores.
LDA in Spark MLlib is a clustering mechanism, where the feature vectors represent the counts of words in a document. The model maximizes the probability of observing the word counts, given the assumption that each document is a mixture of topics and the words in the documents are generated based on Dirichlet distribution (a generalization of beta distribution on multinomial case) for each of the topic independently. The goal is to derive the (latent) distribution of the topics and the parameters of the words generation statistical model.
The MLlib implementation is based on 2009 LDA paper (http://www.jmlr.org/papers/volume10/newman09a/newman09a.pdf) and uses GraphX to implement a distributed Expectation Maximization (EM) algorithm for assigning topics to the documents.
Let's take the Enron e-mail corpus discussed in Chapter 7, Working with Graph Algorithms, where we tried to figure out communications graph. For e-mail clustering, we need to extract the body of the e-mail and place is as a single line in the training file:
$ mkdir enron $ cat /dev/null > enron/all.txt $ for f in $(find maildir -name *. -print); do cat $f | sed '1,/^$/d;/^$/d' | tr " " " " >> enron/all.txt; echo "" >> enron/all.txt; done $
Now, let's use Scala/Spark to construct a corpus dataset containing the document ID, followed by a dense array of word counts in the bag:
$ spark-shell --driver-memory 8g --executor-memory 8g --packages com.github.fommil.netlib:all:1.1.2 Ivy Default Cache set to: /home/alex/.ivy2/cache The jars for the packages stored in: /home/alex/.ivy2/jars :: loading settings :: url = jar:file:/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/jars/spark-assembly-1.5.0-cdh5.5.2-hadoop2.6.0-cdh5.5.2.jar!/org/apache/ivy/core/settings/ivysettings.xml com.github.fommil.netlib#all added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.github.fommil.netlib#all;1.1.2 in central found net.sourceforge.f2j#arpack_combined_all;0.1 in central found com.github.fommil.netlib#core;1.1.2 in central found com.github.fommil.netlib#netlib-native_ref-osx-x86_64;1.1 in central found com.github.fommil.netlib#native_ref-java;1.1 in central found com.github.fommil#jniloader;1.1 in central found com.github.fommil.netlib#netlib-native_ref-linux-x86_64;1.1 in central found com.github.fommil.netlib#netlib-native_ref-linux-i686;1.1 in central found com.github.fommil.netlib#netlib-native_ref-win-x86_64;1.1 in central found com.github.fommil.netlib#netlib-native_ref-win-i686;1.1 in central found com.github.fommil.netlib#netlib-native_ref-linux-armhf;1.1 in central found com.github.fommil.netlib#netlib-native_system-osx-x86_64;1.1 in central found com.github.fommil.netlib#native_system-java;1.1 in central found com.github.fommil.netlib#netlib-native_system-linux-x86_64;1.1 in central found com.github.fommil.netlib#netlib-native_system-linux-i686;1.1 in central found com.github.fommil.netlib#netlib-native_system-linux-armhf;1.1 in central found com.github.fommil.netlib#netlib-native_system-win-x86_64;1.1 in central found com.github.fommil.netlib#netlib-native_system-win-i686;1.1 in central downloading https://repo1.maven.org/maven2/net/sourceforge/f2j/arpack_combined_all/0.1/arpack_combined_all-0.1-javadoc.jar ... [SUCCESSFUL ] net.sourceforge.f2j#arpack_combined_all;0.1!arpack_combined_all.jar (513ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/core/1.1.2/core-1.1.2.jar ... [SUCCESSFUL ] com.github.fommil.netlib#core;1.1.2!core.jar (18ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-osx-x86_64/1.1/netlib-native_ref-osx-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-osx-x86_64;1.1!netlib-native_ref-osx-x86_64.jar (167ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-linux-x86_64/1.1/netlib-native_ref-linux-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-linux-x86_64;1.1!netlib-native_ref-linux-x86_64.jar (159ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-linux-i686/1.1/netlib-native_ref-linux-i686-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-linux-i686;1.1!netlib-native_ref-linux-i686.jar (131ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-win-x86_64/1.1/netlib-native_ref-win-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-win-x86_64;1.1!netlib-native_ref-win-x86_64.jar (210ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-win-i686/1.1/netlib-native_ref-win-i686-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-win-i686;1.1!netlib-native_ref-win-i686.jar (167ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_ref-linux-armhf/1.1/netlib-native_ref-linux-armhf-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_ref-linux-armhf;1.1!netlib-native_ref-linux-armhf.jar (110ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-osx-x86_64/1.1/netlib-native_system-osx-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-osx-x86_64;1.1!netlib-native_system-osx-x86_64.jar (54ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-linux-x86_64/1.1/netlib-native_system-linux-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-linux-x86_64;1.1!netlib-native_system-linux-x86_64.jar (47ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-linux-i686/1.1/netlib-native_system-linux-i686-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-linux-i686;1.1!netlib-native_system-linux-i686.jar (44ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-linux-armhf/1.1/netlib-native_system-linux-armhf-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-linux-armhf;1.1!netlib-native_system-linux-armhf.jar (35ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-win-x86_64/1.1/netlib-native_system-win-x86_64-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-win-x86_64;1.1!netlib-native_system-win-x86_64.jar (62ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/netlib-native_system-win-i686/1.1/netlib-native_system-win-i686-1.1-natives.jar ... [SUCCESSFUL ] com.github.fommil.netlib#netlib-native_system-win-i686;1.1!netlib-native_system-win-i686.jar (55ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/native_ref-java/1.1/native_ref-java-1.1.jar ... [SUCCESSFUL ] com.github.fommil.netlib#native_ref-java;1.1!native_ref-java.jar (24ms) downloading https://repo1.maven.org/maven2/com/github/fommil/jniloader/1.1/jniloader-1.1.jar ... [SUCCESSFUL ] com.github.fommil#jniloader;1.1!jniloader.jar (3ms) downloading https://repo1.maven.org/maven2/com/github/fommil/netlib/native_system-java/1.1/native_system-java-1.1.jar ... [SUCCESSFUL ] com.github.fommil.netlib#native_system-java;1.1!native_system-java.jar (7ms) :: resolution report :: resolve 3366ms :: artifacts dl 1821ms :: modules in use: com.github.fommil#jniloader;1.1 from central in [default] com.github.fommil.netlib#all;1.1.2 from central in [default] com.github.fommil.netlib#core;1.1.2 from central in [default] com.github.fommil.netlib#native_ref-java;1.1 from central in [default] com.github.fommil.netlib#native_system-java;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-linux-armhf;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-linux-i686;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-linux-x86_64;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-osx-x86_64;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-win-i686;1.1 from central in [default] com.github.fommil.netlib#netlib-native_ref-win-x86_64;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-linux-armhf;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-linux-i686;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-linux-x86_64;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-osx-x86_64;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-win-i686;1.1 from central in [default] com.github.fommil.netlib#netlib-native_system-win-x86_64;1.1 from central in [default] net.sourceforge.f2j#arpack_combined_all;0.1 from central in [default] :: evicted modules: com.github.fommil.netlib#core;1.1 by [com.github.fommil.netlib#core;1.1.2] in [default] -------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 19 | 18 | 18 | 1 || 17 | 17 | --------------------------------------------------------------------- ... scala> val enron = sc textFile("enron") enron: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21 scala> enron.flatMap(_.split("\W+")).map(_.toLowerCase).distinct.count res0: Long = 529199 scala> val stopwords = scala.collection.immutable.TreeSet ("", "i", "a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "from", "had", "has", "he", "her", "him", "his", "in", "is", "it", "its", "not", "of", "on", "she", "that", "the", "to", "was", "were", "will", "with", "you") stopwords: scala.collection.immutable.TreeSet[String] = TreeSet(, a, an, and, are, as, at, be, but, by, for, from, had, has, he, her, him, his, i, in, is, it, its, not, of, on, she, that, the, to, was, were, will, with, you) scala> scala> val terms = enron.flatMap(x => if (x.length < 8192) x.toLowerCase.split("\W+") else Nil).filterNot(stopwords).map(_,1).reduceByKey(_+_).collect.sortBy(- _._2).slice(0, 1000).map(_._1) terms: Array[String] = Array(enron, ect, com, this, hou, we, s, have, subject, or, 2001, if, your, pm, am, please, cc, 2000, e, any, me, 00, message, 1, corp, would, can, 10, our, all, sent, 2, mail, 11, re, thanks, original, know, 12, 713, http, may, t, do, 3, time, 01, ees, m, new, my, they, no, up, information, energy, us, gas, so, get, 5, about, there, need, what, call, out, 4, let, power, should, na, which, one, 02, also, been, www, other, 30, email, more, john, like, these, 03, mark, 04, attached, d, enron_development, their, see, 05, j, forwarded, market, some, agreement, 09, day, questions, meeting, 08, when, houston, doc, contact, company, 6, just, jeff, only, who, 8, fax, how, deal, could, 20, business, use, them, date, price, 06, week, here, net, 15, 9, 07, group, california,... scala> def getBagCounts(bag: Seq[String]) = { for(term <- terms) yield { bag.count(_==term) } } getBagCounts: (bag: Seq[String])Array[Int] scala> import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala> val corpus = enron.map(x => { if (x.length < 8192) Some(x.toLowerCase.split("\W+").toSeq) else None } ).map(x => { Vectors.dense(getBagCounts(x.getOrElse(Nil)).map(_.toDouble).toArray) }).zipWithIndex.map(_.swap).cache corpus: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[14] at map at <console>:30 scala> import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel} import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel} scala> import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala> val ldaModel = new LDA().setK(10).run(corpus) ... scala> ldaModel.topicsMatrix.transpose res2: org.apache.spark.mllib.linalg.Matrix = 207683.78495933366 79745.88417942637 92118.63972404732 ... (1000 total) 35853.48027575886 4725.178508682296 111214.8860582083 ... 135755.75666585402 54736.471356209106 93289.65563593085 ... 39445.796099155996 6272.534431534215 34764.02707696523 ... 329786.21570967307 602782.9591026317 42212.22143362559 ... 62235.09960154089 12191.826543794878 59343.24100019015 ... 210049.59592560542 160538.9650732507 40034.69756641789 ... 53818.14660186875 6351.853448001488 125354.26708575874 ... 44133.150537842856 4342.697652158682 154382.95646078113 ... 90072.97362336674 21132.629704311104 93683.40795807641 ...
We can also list the words and their relative importance for the topic in the descending order:
scala> ldaModel.describeTopics foreach { x : (Array[Int], Array[Double]) => { print(x._1.slice(0,10).map(terms(_)).mkString(":")); print("-> "); print(x._2.slice(0,10).map(_.toFloat).mkString(":")); println } } com:this:ect:or:if:s:hou:2001:00:we->0.054606363:0.024220783:0.02096761:0.013669214:0.0132700335:0.012969772:0.012623918:0.011363528:0.010114557:0.009587474 s:this:hou:your:2001:or:please:am:com:new->0.029883621:0.027119286:0.013396418:0.012856948:0.01218803:0.01124849:0.010425644:0.009812181:0.008742722:0.0070441025 com:this:s:ect:hou:or:2001:if:your:am->0.035424445:0.024343235:0.015182628:0.014283071:0.013619815:0.012251413:0.012221165:0.011411696:0.010284024:0.009559739 would:pm:cc:3:thanks:e:my:all:there:11->0.047611523:0.034175437:0.022914853:0.019933242:0.017208714:0.015393614:0.015366959:0.01393391:0.012577525:0.011743208 ect:com:we:can:they:03:if:also:00:this->0.13815293:0.0755843:0.065043546:0.015290086:0.0121941045:0.011561104:0.011326733:0.010967959:0.010653805:0.009674695 com:this:s:hou:or:2001:pm:your:if:cc->0.016605735:0.015834121:0.01289918:0.012708308:0.0125788655:0.011726159:0.011477625:0.010578845:0.010555539:0.009609056 com:ect:we:if:they:hou:s:00:2001:or->0.05537054:0.04231919:0.023271963:0.012856676:0.012689817:0.012186356:0.011350313:0.010887237:0.010778923:0.010662295 this:s:hou:com:your:2001:or:please:am:if->0.030830953:0.016557815:0.014236835:0.013236604:0.013107091:0.0126846135:0.012257128:0.010862533:0.01027849:0.008893094 this:s:or:pm:com:your:please:new:hou:2001->0.03981197:0.013273305:0.012872894:0.011672661:0.011380969:0.010689667:0.009650983:0.009605533:0.009535899:0.009165275 this:com:hou:s:or:2001:if:your:am:please->0.024562683:0.02361607:0.013770585:0.013601272:0.01269994:0.012360005:0.011348433:0.010228578:0.009619628:0.009347991
To find out the top documents per topic or top topics per document, we need to convert this model to DistributedLDA
or LocalLDAModel
, which extend LDAModel
:
scala> ldaModel.save(sc, "ldamodel") scala> val sameModel = DistributedLDAModel.load(sc, "ldamode2l") scala> sameModel.topDocumentsPerTopic(10) foreach { x : (Array[Long], Array[Double]) => { print(x._1.mkString(":")); print("-> "); print(x._2.map(_.toFloat).mkString(":")); println } } 59784:50745:52479:60441:58399:49202:64836:52490:67936:67938-> 0.97146696:0.9713364:0.9661418:0.9661132:0.95249915:0.9519995:0.94945914:0.94944507:0.8977366:0.8791358 233009:233844:233007:235307:233842:235306:235302:235293:233020:233857-> 0.9962034:0.9962034:0.9962034:0.9962034:0.9962034:0.99620336:0.9954057:0.9954057:0.9954057:0.9954057 14909:115602:14776:39025:115522:288507:4499:38955:15754:200876-> 0.83963907:0.83415157:0.8319566:0.8303818:0.8291597:0.8281472:0.82739806:0.8272517:0.82579833:0.8243338 237004:71818:124587:278308:278764:278950:233672:234490:126637:123664-> 0.99929106:0.9968135:0.9964454:0.99644524:0.996445:0.99644494:0.99644476:0.9964447:0.99644464:0.99644417 156466:82237:82252:82242:341376:82501:341367:340197:82212:82243-> 0.99716955:0.94635135:0.9431836:0.94241136:0.9421047:0.9410431:0.94075173:0.9406304:0.9402021:0.94014835 335708:336413:334075:419613:417327:418484:334157:335795:337573:334160-> 0.987011:0.98687994:0.9865438:0.96953565:0.96953565:0.96953565:0.9588571:0.95852506:0.95832515:0.9581657 243971:244119:228538:226696:224833:207609:144009:209548:143066:195299-> 0.7546907:0.7546907:0.59146744:0.59095955:0.59090924:0.45532238:0.45064417:0.44945204:0.4487876:0.44833568 242260:214359:126325:234126:123362:233304:235006:124195:107996:334829-> 0.89615464:0.8961442:0.8106028:0.8106027:0.8106023:0.8106023:0.8106021:0.8106019:0.76834095:0.7570231 209751:195546:201477:191758:211002:202325:197542:193691:199705:329052-> 0.913124:0.9130985:0.9130918:0.9130672:0.5525752:0.5524637:0.5524494:0.552405:0.55240136:0.5026157 153326:407544:407682:408098:157881:351230:343651:127848:98884:129351-> 0.97206575:0.97206575:0.97206575:0.97206575:0.97206575:0.9689198:0.968068:0.9659192:0.9657442:0.96553063
18.226.164.82