Text mining With Spark

Let's see some simple text mining techniques in Spark. For this example we will be using the packages nltk and pattern.

To install the required libraries, do the following:

pip install pattern nltk 

In the next example we will take a list of sentences as our corpus, we then parallelize this list as a Spark RDD object, then we will pass the sentences through a standard textual pre-processing pipeline.

RDD is the standard data format accepted by Spark. On creation Spark, takes the input data and cuts it up into smaller chunks and distributes this data across the cluster. There are other formats offered by Spark as data frames, which resemble the pandas data frame, but for the moment we will stick with RDDs.

To work with Spark, now we will use Jupyter Notebook. Jupyter Notebook is an interactive notebook especially useful for experimenting with this new technology and/or new techniques.

To install Jupyter do the following:

 pip install jupyter 

Next we will create the directory ~/notebooks where we will store all the notebooks:

mkdir ~/notebooks 
cd ~/notebooks 
jupyter notebook  

This is to launch the notebook server accessible at http://localhost:8888, and which should look something like this:

Next, use the New button to create a Python2 notebook. We will write and execute our code from this notebook.

To load the pyspark package we need to include the Python folder found in the Spark installed directory:

import sys 
spark_home = '/etc/spark' 
sys.path.insert(0, spark_home + "/python") 
 
from pyspark import SparkConf, SparkContext 
 
urlMaster = 'spark://Arjuns-MacBook-Pro.local:7077' 
 
conf = ( 
    SparkConf() 
        .setAppName('spark.app') 
        .setMaster(urlMaster) 
) 
sc = SparkContext(conf=conf) 

This creates a component of SparkContext for us, which is our communication gate to the SparkMaster.

Next we build the test corpus from nltk.corpus.brown:

from nltk.corpus import brown 
sentences = brown.sents()[:1000] 
corpus = sc.parallelize(sentences).map(lambda s: ' '.join(s)) 

Here we loaded a list of 1,000 sentences and, using the SparkContext object, we parallelized this object, instantly distributing the list as an RDD object.

Next, we will use the pattern package to chunk this sentence into noun and verb phrases:

from itertools import chain 
from pattern.text.en import parsetree 
 
def get_chunks(sentence): 
    return list(chain.from_iterable( 
            map( 
                lambda sentence: sentence.chunks,  
                parsetree(sentence) 
            ) 
        )) 
 
chunks = corpus  
    .map(get_chunks) 
     
print chunks.take(2) 

That should give you the following output:

[[Chunk('The Fulton County Grand Jury/NP'), Chunk('said/VP'), Chunk('Friday an investigation/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('recent primary election/NP'), Chunk('produced/VP'), Chunk('no evidence/NP'), Chunk('that any irregularities/NP'), Chunk('took/VP'), Chunk('place/NP')], [Chunk('The jury/NP'), Chunk('further said/VP'), Chunk('in/PP'), Chunk('term-end presentments/NP'), Chunk('that/PP'), Chunk('the City Executive Committee/NP'), Chunk('had/VP'), Chunk('over-all charge/NP'), Chunk('of/PP'), Chunk('the election/NP'), Chunk('deserves/VP'), Chunk('the praise/NP'), Chunk('thanks/NP'), Chunk('of/PP'), Chunk('the City/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('for/PP'), Chunk('the manner/NP'), Chunk('in/PP'), Chunk('which the election/NP'), Chunk('was conducted/VP')]]  

As you see, PySpark doesn't add much complexity to the usual process. If you are familiar with functional programming, you will feel right at home. Writing PySpark code is just as simple as writing functional Python code! The only new concepts are the RDD API: they are very simple but powerful tools that increase productivity.

For example, let's say we want to create a word count for all the nouns present in the corpus:

First, we will get all the noun-like words from the chunks:

def match_noun_like_pos(pos): 
    import re 
    return re.match(re.compile('^N.*'), pos) != None 
 
noun_like = chunks  
    .flatMap(lambda chunks: chunks)  
    .filter(lambda chunk: chunk.part_of_speech == 'NP')  
    .flatMap(lambda chunk: chunk.words)  
    .filter(lambda word: match_noun_like_pos(word.part_of_speech))  
    .map(lambda word: word.string.lower()) 
 
print noun_like.take(2) 

That should give you the following output:

[u'fulton', u'county', u'grand', u'jury', u'friday', u'investigation', u'atlanta', u'primary', u'election', u'evidence']  

Next we will do a word count on the noun_like words:

noun_word_count = noun_like  
    .map(lambda word: (word, 1))  
    .reduceByKey(lambda a, b: a + b)  
    .sortBy(lambda d: d[1], ascending=False) 
     
print noun_word_count.take(10) 

That should give you the following output:

[(u'state', 85), (u'city', 58), (u'administration', 52), (u'president', 52), (u'mr.', 52), (u'year', 46), (u'committee', 39), (u'bill', 39), (u'states', 37), (u'county', 35)]  

If you noticed here, we implemented the MapReduce pattern in Spark. There are many amazing things that can be achieved using Spark, and so we highly recommend further exploration of Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.214.230