©  Raju Kumar Mishra 2018
Raju Kumar MishraPySpark Recipeshttps://doi.org/10.1007/978-1-4842-3141-8_6

6. I/O in PySpark

Raju Kumar Mishra
(1)
Bangalore, Karnataka, India
 
File input/output (I/O) operations are an integral part of many software activities and for data
A data scientist deals with many types of files, including text files, comma-separated values (CSV) files, JavaScript Object Notation (JSON) files, and many more. The Hadoop Distributed File System (HDFS) is a very good distributed file system.
This chapter covers the following recipes:
  • Recipe 6-1. Read a simple text file
  • Recipe 6-2. Write an RDD to a simple text file
  • Recipe 6-3. Read a directory
  • Recipe 6-4. Read data from HDFS
  • Recipe 6-5. Save an RDD to HDFS
  • Recipe 6-6. Read data from a sequential file
  • Recipe 6-7. Write data into a sequential file
  • Recipe 6-8. Read a CSV file
  • Recipe 6-9. Write an RDD to a CSV file
  • Recipe 6-10. Read a JSON file
  • Recipe 6-11. Write an RDD to a JSON file
  • Recipe 6-12. Read table data from HBase by using PySpark

Recipe 6-1. Read a Simple Text File

Problem

You want to read a simple text file.

Solution

You have a simple text file named shakespearePlays.txt. The file content is as follows:
  • Love’s Labour’s Lost
  • A Midsummer Night’s Dream
  • Much Ado About Nothing
  • As You Like It
The shakespearePlays.txt file has four lines. You want to read this file by using PySpark. After reading the file, you want to calculate the following:
  • Total number of lines in the file
  • Total number of characters in the file
To read a simple file, you can use two functions: textFile() and wholeTextFiles(). These two functions are defined on our SparkContext object.
The textFile() method reads a text file and results in an RDD of lines. The textFile() method is a transformation, so textFile() does not read the data until the first action is called. Because the file is not available at the time textFile() is run, it will not throw an error. It will throw an error when the first action is called. Why is this? Like other transformations, the textFile() function will be called when the first action is called.
Another method, wholeTextFiles(), works in similar way as textFile() except it reads the file as a key/value pair. The file name is read as the key, and the file data is read as the value associated with that key.

How It Works

Let’s see how these built-in methods work.

Step 6-1-1. Reading a Text File by Using the textFile() Function

The textFile() function takes three inputs. The first input to textFile() is the path of the file that has to be read. The second argument is minPartitions, which defines the minimum number of data partitions in the RDD. The third argument is use_unicode. If use_unicode is False, the file is read as a string. Here is the textFile() function:
>>> playData = sc.textFile('/home/pysparkbook/pysparkBookData/shakespearePlays.txt',2)
Here is the output:
17/03/18 07:10:17 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 61.8 KB, free 208.0 KB)
17/03/18 07:10:17 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 19.3 KB, free 227.3 KB)
In the preceding code, we have provided the file path and set minPartitions to 2.
>>> playDataList = playData.collect()
>>> type(playDataList)
Here is the output:
<type 'list'>
We can see that the textFile() function has read the file and created the RDD. The elements of the RDD are lines:
>>> playDataList[0:4]
Here is the output:
  [u"Love's Labour's Lost",
   u"A Midsummer Night's Dream",
   u'Much Ado About Nothing',
   u'As You Like It']

Step 6-1-2. Reading a Text File by Using wholeTextFiles()

The wholeTextFiles() function also takes three inputs. The first input to wholeTextFiles() is the path of the file that has to be read. The second argument is minPartitions, which defines the minimum number of data partitions in the RDD. The third argument is use_unicode. If use_unicode is False, the file is read as a string. Let’s read the same text file, this time by using the wholeTextFiles() function:
>>> playData = sc.wholeTextFiles('/home/pysparkbook/pysparkBookData/shakespearePlays.txt',2)
Here is the output:
17/03/18 07:19:06 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 209.0 KB, free 446.0 KB)
17/03/18 07:19:06 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 19.4 KB, free 465.4 KB)
As we know, the wholeTextFiles() function reads the file as a key/value pair, in which the file name is the key, and the content is the value. Let’s get the file name:
>>> playData.keys().collect()
Here is the output:
[u'file:/home/pysparkbook/pysparkBookData/shakespearePlays.txt']
And now let’s fetch the content of the file:
>>> playData.values().collect()
Here is the output:
[u"Love's Labour's Lost",
 u"A Midsummer Night's Dream",
 u'Much Ado About Nothing',
 u'As You Like It']

Step 6-1-3. Counting the Number of Lines in a File

We can count the number of lines in our file by using the count() function :
>>> playData.count()
Here is the output:
4

Step 6-1-4. Counting the Number of Characters on Each Line

To calculate the total number of characters in our file, we can calculate the number of characters in each line and then sum them. To calculate the total number of characters in each line, we can use the len() function. The len() function is defined in Python, and it calculates the number of elements in a sequence. For Python strings, the len() function will return the total number of characters in that string. Let’s observe the workings of the len() function in the following example:
>>> pythonString = "My python"
>>> len(pythonString)
Here is the output:
9
We have created a string named pythonString. Then we’ve assigned a value, My python, to pythonString. The string My python has nine characters, including the space. Therefore, the len() function on the pythonString variable has returned 9 as the output. Now let’s use the len() function in the RDD map() function:
>>> playDataLineLength = playData.map(lambda x : len(x))
>>> playDataLineLength.collect()
Here is the output:
[21, 25, 22, 14]
Finally, we have calculated the number of characters in each line. If we apply sum() on the playDataLineLength RDD, we will get the total number of characters in the file:
>>> totalNumberOfCharacters = playDataLineLength.sum()
>>> totalNumberOfCharacters
Here is the output:
82
Finally, we have calculated that the total number of characters in our file is 82. Remember, the count doesn’t include the newline character.

Recipe 6-2. Write an RDD to a Simple Text File

Problem

You want to write an RDD to a simple text file .

Solution

In Recipe 6-1, you calculated the number of characters in each line as the RDD playDataLineLength. Now you want to save it in a text file.
We can save an RDD as a text file by using the saveAsTextFile() function . This method is defined on the RDD—not on SparkContext, as we saw in the case of the textFile() and wholeTextFiles() functions. You have to provide the output directory. The file name is not required. The directory name you are providing must not already exist; otherwise, the write operation will fail. The RDD exists in partitions. So PySpark will start many processes in parallel to write the file.
The saveAsTextFile() function takes two inputs. The first input is path, which is basically the path of the directory where the RDD has to be saved. The second argument is compressionCodecClass, an optional argument with a default value of None. We can use compression codecs such as Gzip to compress files and thereby provide more-efficient computations.

How It Works

So first let’s start with the code for counting the number of characters in each line. We have already done it, but for the sake of clarity, I have provided the code for calculating the number of characters in each line again.

Step 6-2-1. Counting the Number of Characters on Each Line

Let’s read the file and count the number of characters per line:
>>> playData = sc.textFile('/home/pysparkbook/pysparkBookData/shakespearePlays.txt',4)
>>> playDataLineLength = playData.map(lambda x : len(x))
>>> playDataLineLength.collect()
Here is the output:
[21, 25, 22, 14]
Our playDataLineLength RDD has as its elements the number of characters in each line.

Step 6-2-2. Saving the RDD to a File

Now that we have the counted RDD, we want to save that RDD into a directory called savedData:
>>> playDataLineLength.saveAsTextFile('/home/pysparkbook/savedData')
Let’s investigate the savedData directory. We will find that, inside the directory, there are five files. Four files—part-00000, part-00000, part-00000, and part-00000—will have the data, and the _SUCCESS file denotes that the data has been written successfully. Have you wondered why the data is in four files? The answer is that the playDataLineLength RDD has been distributed in four parts. Four parallel processes have been used to write the file.
Let’s see what is inside those files. We will find that each data point is inside a separate file:
savedData$ cat part-00000
Here is the output:
21
savedData$ cat part-00001
Here is the output:
25
savedData$ cat part-00002
Here is the output:
22
savedData$ cat part-00003
Here is the output:
14

Recipe 6-3. Read a Directory

Problem

You want to read a directory .

Solution

In a directory, there are many files. You want to read the directory (all files at once).
Reading many files together from a directory is a very common task nowadays. To read a directory, we use the textFile() function or the wholetextFiles() function. The textFile() function reads small files in the directory and merges them. In contrast, the wholeTextFiles() function reads files as a key/value pair, with the name of file as the key, and the content of the file as the value.
You are provided with the directory name manyFiles. This directory consists of two files named playData1.txt and playData2.txt. Let’s investigate the content of these files one by one:
manyFiles$ cat playData1.txt
Here is the output:
Love's Labour's Lost
A Midsummer Night's Dream
manyFiles$ cat playData2.txt
Here is the output:
Much Ado About Nothing
As You Like It
You job is to read all these files from the directory in one go.

How It Works

We will use both functions, textFile() and wholeTextFiles(), one at a time, to read the directory noted previously. Let’s start with textFile().

Step 6-3-1. Reading a Directory by Using textFile()

In previous recipes, we provided the absolute file path as the input to the textFile() function in order to read the file. The best part of the textFile() function is that just by changing the path input, we can change how this function reads data. In order to read all files from a directory, we have to provide the absolute path of the directory as input to textFile(). The following line of code reads all the files in the manyFiles directory by using the textFile() function:
>>> manyFilePlayData = sc.textFile('/home/pysparkbook/pysparkBookData/manyFiles',4)
>>> manyFilePlayData.collect()
Here is the output:
[u'Much Ado About Nothing',
 u'As You Like It',
 u"Love's Labour's Lost",
 u"A Midsummer Night's Dream"]
The output is very clear. The textFile() function has read all the files in the directory and merged the content in the files. It has created an RDD of the merged data.

Step 6-3-2. Reading a Directory by Using wholeTextFiles()

Let’s now read the same set of files by using the wholeTextFiles() function . As we did with textFile(), here we also provide the path of the directory as one of the inputs to the wholeTextFiles() function. The following code shows the use of the wholeTextFiles() function to read a directory:
>>> manyFilePlayDataKeyValue = sc.wholeTextFiles('/home/pysparkbook/pysparkBookData/manyFiles',4)
>>> manyFilePlayDataKeyValue.collect()
Here is the output:
[(u'file:/home/pysparkbook/pysparkBookData/manyFiles/playData2.txt', u'Much Ado About Nothing As You Like It '),
 (u'file:/home/pysparkbook/pysparkBookData/manyFiles/playData1.txt', u"Love's Labour's Lost A Midsummer Night's Dream ")]
The output has key/value pairs of file names and contents. Further, we have to process as required by the problem requirements.

Recipe 6-4. Read Data from HDFS

Problem

You want to read a file from HDFS by using PySpark.

Solution

HDFS is very good for storing high-volume files. You are given the file filamentData.csv in HDFS. This file is under the bookData directory. Our bookData is under the root directory of HDFS. You want to read this file by using PySpark.
To read a file from HDFS, we first need to know the fs.default.name property from the core-site.xml property file. We are going to get the core-site.xml file inside the Hadoop configuration directory. For us, the value of fs.default.name is hdfs://localhost:9746. The full path of our file in HDFS will be hdfs://localhost:9746/bookData/ filamentData.csv.
We can use the textFile() function to read the file from HDFS by using the full path of the file.

How It Works

Reading a file from HDFS is as easy as reading data from a local machine. In the following code line, we use the textFile() function to read the required file:
>>> filamentdata = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)
>>> filamentdata.take(4)
Here is the output:
[u'filamentA,100W,605',
 u'filamentB,100W,683',
 u'filamentB,100W,691',
 u'filamentB,200W,561']
The result confirms that we are able to read our file from HDFS.

Recipe 6-5. Save RDD Data to HDFS

Problem

You want to save RDD data to HDFS .

Solution

RDD data can be saved to HDFS by using the saveAsTextFile() function. In Recipe 6-2, we saved the RDD in a file on the local file system. We are going to save the same RDD, playDataLineLength, to HDFS.
Similar to the way we worked with the textFile() function, we have to provide the full path of the file, including the NameNode URI, to saveAsTextFile() to write an RDD to HDFS.

How It Works

Because you are a keen reader and might not be looking for distractions, we’ll start with writing code that counts the total number of characters in each line.

Step 6-5-1. Counting the Number of Characters on Each Line

>>> playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
>>> playDataLineLength = playData.map(lambda x : len(x))
>>> playDataLineLength.collect()
Here is the output:
[21, 25, 22, 14]

Step 6-5-2. Saving an RDD to HDFS

The playDataLineLength RDD is written using following code line:
>>> playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')
We have saved the RDD in the savedData directory, which is inside the root directory of HDFS. Remember that the savedData directory didn’t exist before we saved the data; otherwise, the saveAsTextFile() function would throw an error. We have saved the RDD. Now we are going to investigate the saved data. We will find five files in the savedData directory: part-00000, part-00001, part-0002, part-00003, and our _SUCCESS file. Let’s see the data of each file, one by one, by using the HDFS cat command. This command displays file data to the console:
$ hadoop fs -cat /savedData/part-00000
Here is the output:
21
$ hadoop fs -cat /savedData/part-00001
Here is the output:
25
$ hadoop fs -cat /savedData/part-00002
Here is the output:
22
$ hadoop fs -cat /savedData/part-00003
Here is the output:
14
Each file has a single data point because our RDD has four partitions.

Recipe 6-6. Read Data from a Sequential File

Problem

You want to read data from a sequential file .

Solution

A sequential file uses the key/value file format. Here, the key values are in binary format. This is a commonly used file format for Hadoop. The keys and values are types of the Hadoop Writable class.
We have data in a sequential file in HDFS, in the sequenceFileToRead directory inside the root directory. In the file inside the directory, we have the data in Table 6-1.
Table 6-1.
Sequential File Data
A430628_1_En_6_Figa_HTML.gif
We can read the sequence file by using the sequenceFile() method defined in the SparkContext class.

How It Works

The sequenceFile() function takes many arguments. Let me discuss some of them. The first argument is path, which is the path of the sequential file. The second argument is keyClass, which indicates the key class of data in the sequence file. The argument valueClass represents the data type of the values. Remember that the key and value classes are children of the Hadoop Writable classes.
>>> simpleRDD = sc.sequenceFile('hdfs://localhost:9746/sequenceFileToRead')
>>> simpleRDD.collect()
Here is the output:
[(u'p', 20),
(u'q', 30),
(u'r', 20),
(u'm', 25)]
Finally, we have read our sequential file successfully.

Recipe 6-7. Write Data to a Sequential File

Problem

You want to write data into a sequential file .

Solution

Many times we like to save the results from PySpark processing to a sequence file. We have an RDD of subject data, as shown in Table 6-2, and you want to write it to a sequence file.
Table 6-2.
Student Subjects Data
A430628_1_En_6_Figb_HTML.gif

How It Works

In this recipe, we are first going to create an RDD and then save it into a sequence file.

Step 6-7-1. Creating a Paired RDD

First, we’ll create a list of tuples:
>>> subjectsData = [('si1','Python'),
...                 ('si3','Java'),
...                 ('si1','Java'),
...                 ('si2','Python'),
...                 ('si3','Ruby'),
...                 ('si4','C++'),
...                 ('si5','C'),
...                 ('si4','Python'),
...                 ('si2','Java')]
Next, we parallelize the data:
>>> subjectsPairedRDD = sc.parallelize(subjectsData, 4)
>>> subjectsPairedRDD.take(4)
[('si1', 'Python'),
 ('si3', 'Java'),
 ('si1', 'Java'),
 ('si2', 'Python'),
]

Step 6-7-2. Saving the RDD as a Sequence File

Finally, we write our paired RDD into a sequence file:
>>>subjectsPairedRDD.saveAsSequenceFile('hdfs://localhost:9746/sequenceFiles')
Our sequence file is placed in HDFS, inside the sequenceFiles directory. Now let’s investigate the saved files:
$ hadoop fs -ls /sequenceFiles
We find five items:
-rw-r--r--  3 pysparkbook  supergroup 0 2017-05-22 00:18 /sequenceFiles /_SUCCESS
-rw-r--r--   3 pysparkbook supergroup        114 2017-05-22 00:18 /sequenceFiles/part-00000
-rw-r--r--   3 pysparkbook supergroup        114 2017-05-22 00:18 /sequenceFiles/part-00001
-rw-r--r--   3 pysparkbook supergroup        111 2017-05-22 00:18 /sequenceFiles/part-00002
-rw-r--r--   3 pysparkbook supergroup        128 2017-05-22 00:18 /sequenceFiles/part-00003
Here we can see that the files have been saved in four parts.

Recipe 6-8. Read a CSV File

Problem

You want to read a CSV file .

Solution

As we have mentioned, CSV stands for comma-separated values. In a CSV file, each line contains fields, which are separated by a delimiter. Generally, a comma (,) is used as the separating delimiter. But a delimiter could be a different character too.
We have seen that if PySpark reads a file, it creates an RDD of lines. So the best way to read a CSV file is to read it by using the textFile() function and then to parse each line by using a CSV parser in Python.
In Python, we can use the csv module to parse CSV lines. This module provides all the functionality for handling CSV data. We also use the reader() function, which returns a reader object. The reader object iterates over lines.
You have been given a CSV file of filaments data, the same filament data we used in Chapter 5. The lines of the filament data file look like the following:
filamentA,100W,605
filamentB,100W,683
filamentB,100W,691
filamentB,200W,561
filamentA,200W,530
We can see that each line in the file is separated by a comma. And so parsing each line by a CSV parser will complete our job.
Note
You can read more about reading a CSV file by using PySpark at https://stackoverflow.com/questions/28782940/load-csv-file-with-spark .

How It Works

We will start with writing a Python function that will parse each line.

Step 6-8-1. Writing a Python Function to Parse CSV Lines

We are going to write a function named parseCSV(). This function will take each line and parse it to return a list:
>>> import csv
>>> import  StringIO
>>> def parseCSV(csvRow) :
...      data = StringIO.StringIO(csvRow)
...      dataReader =  csv.reader(data)
...      return(dataReader.next())
Our function takes one line at a time. Each line is taken by the StringIO function defined in the StringIO module. Using the StringIO module, we can create a file-like object. We pass the data object to the reader() function of the csv module. It will return a reader object. The next() function will return a list of fields that are separated by commas. Let’s check whether our function is working correctly:
>>> csvRow = "p,s,r,p"
>>> parseCSV(csvRow)
Here is the output:
['p', 's', 'r', 'p']
We have created a string, csvRow. We can see that the fields in our string are separated by commas. Now we are parsing that line by using our function and getting a list as a result. So it is confirmed that our function meets our expectations.

Step 6-8-2. Creating a Paired RDD

We will read the file and parse its lines. The following code will read the file, parse the lines, and return the required data :
>>> filamentRDD =sc.textFile('/home/pysparkbook/pysparkBookData filamentData.csv',4)
>>> filamentRDDCSV = filamentRDD.map(parseCSV)
>>> filamentRDDCSV.take(4)
Here is the output:
[['filamentA', '100W', '605'],
 ['filamentB', '100W', '683'],
 ['filamentB', '100W', '691'],
 ['filamentB', '200W', '561']]
The final output returns a nested list. Each inside list consists of parsed lines.

Recipe 6-9. Write an RDD to a CSV File

Problem

You want to write an RDD to a CSV file .

Solution

Writing data to a CSV file requires that we transform the list of fields into strings of comma-separated fields. A list can be transformed to a string as elements are concatenated.

How It Works

Step 6-9-1. Creating a Function to Convert a List into a String

Let’s create a function named createCSV , which will take a list and return a string by joining the elements of the list that are separated by commas.
>>> import csv
>>> import StringIO
>>> def createCSV(dataList):
...       data = StringIO.StringIO()
...       dataWriter = csv.writer(data,lineterminator='')
...       dataWriter.writerow(dataList)
...       return (data.getvalue())
The StringIO() function returns a file-like object. Then the writerow() function of the csv module transforms it into a string. Let’s observe the action of the createCSV() function:
>>> listData = ['p','q','r','s']
>>> createCSV(listData)
Here is the output:
'p,q,r,s'
The listData list has four elements. We provide listData as input to the createCSV() function. And, finally, we get a string, which is created by concatenating elements of the list separated by commas.

Step 6-9-2. Saving Data to a File

Our problem asks us to save the data to a file. But let’s create an RDD and then save the data. We are going to create an RDD from our simpleData nested list:
>>> simpleData = [['p',20],
...               ['q',30],
...               ['r',20],
...               ['m',25]]
>>> simpleRDD = sc.parallelize(simpleData,4)
>>> simpleRDD.take(4)
Here is the output:
[['p', 20],
 ['q', 30],
 ['r', 20],
 ['m', 25]]
We have created our required RDD. Now, using the map() function , we’ll transform our data into the required format for saving:
simpleRDDLines = simpleRDD.map( createCSV)
simpleRDDLines.take(4)
simpleRDDLines.saveAsTextFile('/home/pysparkbook/csvData/')
We have saved our data as a CSV file in the csvData directory.

Recipe 6-10. Read a JSON File

Problem

You want to read a JSON file by using PySpark .

Solution

As noted previously in the chapter, JSON stands for JavaScript Object Notation. The popularity of JSON data files has increased over the decades. It is a data interchange format. Nearly all programming languages support reading and writing JSON. JSON format has like fields name and values pair. Two fields name and value pair is separated by comma (,), and each field name and associated value are separated by a colon (:). Data scientists often get data from clients in JSON format.
You have been given a JSON file, tempData.json. JSON files have a .json extension. This file has data in the following format:
$ cat tempData.json
Here is the output:
{"Time":"6AM",  "Temperature":15}
{"Time":"8AM",  "Temperature":16}
{"Time":"10AM", "Temperature":17}
{"Time":"12AM", "Temperature":17}
{"Time":"2PM",  "Temperature":18}
{"Time":"4PM",  "Temperature":17}
{"Time":"6PM",  "Temperature":16}
{"Time":"8PM",  "Temperature":14}
Our JSON file consists of two keys, Time and Temperature, and their associated values. We can see that for a particular field, the field name and value are associated by a colon. And the two fields’ name/value pairs are separated by a comma.
What is the big deal about reading a JSON file in PySpark? Now you understand that whenever we read a JSON file, it will create an RDD of lines and then parse the data line by line. So we need a parser to parse the lines, and more precisely, we need a JSON data parser.

How It Works

Many Python modules are available for JSON parsing, but we are going to use the json Python module. This module provides many utilities for working with JSON format.

Step 6-10-1. Creating a Function to Parse JSON Data

Let’s create a function that will parse JSON data and return a Python list for each line:
>>> import json
>>> def jsonParse(dataLine):
...      parsedDict = json.loads(dataLine)
...      valueData = parsedDict.values()
...      return(valueData)
The jsonParse() function will take a string in JSON format and return a list. Input to the jsonParse() function is passed to the json.loads() function. The json.loads() function takes a JSON string as an argument and returns a dictionary. Then we fetch values in the next line and finally return it. We must test our function:
>>> jsonData = '{"Time":"6AM",  "Temperature":15}'
We create a JSON string, jsonData:
>>> jsonParsedData = jsonParse(jsonData)
>>> print jsonParsedData
Here is the output:
 [15, '6AM']
To parse our JSON string, we use the jsonParse() function and get the Python list jsonParsedData.

Step 6-10-2. Reading the File

We are going to read the file by using textFile():
>>> tempData = sc.textFile("/home/pysparkbook//pysparkBookData/tempData.json",4)
>>> tempData.take(4)
Here is the output:
[u'{"Time":"6AM",  "Temperature":15}',
 u'{"Time":"8AM",  "Temperature":16}',
 u'{"Time":"10AM", "Temperature":17}',
 u'{"Time":"12AM", "Temperature":17}']
We have read the file, and the output tells us that the RDD is a line of JSON strings.

Step 6-10-3. Creating a Paired RDD

Now we have to parse each line. This can be done by passing the jsonParse() function as an argument to the map() function on our RDD tempData:
>>> tempDataParsed = tempData.map(jsonParse)
>>> tempDataParsed.take(4)
Here is the output:
[[15, u'6AM'],
 [16, u'8AM'],
 [17, u'10AM'],
 [17, u'12AM']]
Finally, we have the required result.

Recipe 6-11. Write an RDD to a JSON File

Problem

You want to write an RDD to a JSON file .

Solution

JSON string objects can be created from a Python dictionary by using the json module.

How It Works

Step 6-11-1. Creating a Function That Takes a List and Returns a JSON String

We are going to create a function, createJSON() , that will take a Python list and return a JSON string:
>>> def createJSON(data):
...     dataDict = {}
...     dataDict['Name'] = data[0]
...     dataDict['Age'] = data[1]
...     return(json.dumps(dataDict))
Inside the createJSON() function, we first create a dataDict dictionary. Then we pass that dictionary to the dumps() function of the json module. This function returns a JSON string object. The following line of code will test our function:
>>> nameAgeList = ['Arun',22]
>>> createJSON(nameAgeList)
Here is the output:
'{"Age": 22, "Name": "Arun"}'
Our function returns a JSON string object.

Step 6-11-2. Saving Data in JSON Format

We are going to follow the same procedure as before. First, we will create an RDD. Then we will write that RDD to a file in JSON format.
>>> nameAgeData = [['Arun',22],
...                                  ['Bony',35],
...                                  ['Juna',29]]
>>> nameAgeRDD = sc.parallelize(nameAgeData,3)
>>> nameAgeRDD.collect()
Here is the output:
[['Arun', 22],
 ['Bony', 35],
 ['Juna', 29]]
We have created an RDD; now we have to transform this data into JSON strings. The createJSON() function is being passed to the map() function as an argument. The map() function works on our nameAgeRDD RDD.
>>> nameAgeJSON = nameAgeRDD.map(createJSON)
>>> nameAgeJSON.collect()
Here is the output:
['{"Age": 22, "Name": "Arun"}',
 '{"Age": 35, "Name": "Bony"}',
 '{"Age": 29, "Name": "Juna"}']
The nameAgeJSON RDD elements are JSON strings. Now, using the saveAsTextFile() function, we save the nameAgeJSON RDD to the jsonDir directory.
>>> nameAgeJSON.saveAsTextFile('/home/pysparkbook/jsonDir/')
We are going to investigate the jsonDir directory. We use the ls command to find that there are four files:
jsonDir$ ls
Here is the output:
part-00000
part-00001
part-00002
_SUCCESS
jsonDir$ cat part-00000
Here is the output:
{"Age": 22, "Name": "Arun"}
The part-00000 file contains the first element of the RDD.

Recipe 6-12. Read Table Data from HBase by Using PySpark

Problem

You want to read table data from HBase .

Solution

We have been given a data table named pysparkTable in HBase. You want to read that table by using PySpark. The data in pysparkTable is shown in Table 6-3.
Table 6-3.
pysparkTable Data
A430628_1_En_6_Figc_HTML.gif
Let’s explain this table data. The pysparkTable table consists of four rows and two column families, btcf1 and btcf2. Column btc1 is under column family btcf1, and column btc2 is under column family btcf2. Remember that the code presented later in this section will work only with spark-1.6 and the older PySpark versions. Try tweaking the code to run on PySpark version 2.x.
We are going to use the newAPIHadoopRDD() function , which is defined on SparkContext sc. This function returns a paired RDD. Table 6-4 lists the arguments of the newAPIHadoopRDD() function.
Table 6-4.
Arguments of the newAPIHadoopRDD() Function
A430628_1_En_6_Figd_HTML.gif

How It Works

Let’s first define all the arguments that have to be passed into our newAPIHadoopRDD() function :
>>> hostName = 'localhost'
>>> tableName = 'pysparkBookTable'
>>>   ourInputFormatClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat'
>>> ourKeyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
>>> ourValueClass='org.apache.hadoop.hbase.client.Result'
>>> ourKeyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter'
>>> ourValueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter'
>>> configuration = {}
>>> configuration['hbase.mapreduce.inputtable'] = tableName
>>> configuration['hbase.zookeeper.quorum'] = hostName
Now it is time to call the newAPIHadoopRDD() function with its arguments:
>>> tableRDDfromHBase = sc.newAPIHadoopRDD(
...                        inputFormatClass = ourInputFormatClass,
...                        keyClass = ourKeyClass,
...                        valueClass = ourValueClass,
...                        keyConverter = ourKeyConverter,
...                        valueConverter = ourValueConverter,
...                        conf = configuration
...                     )
Let’s see how our paired RDD, tableRDDfromHBase, looks:
>>> tableRDDfromHBase.take(2)
Here is the output:
[(u'00001', u'{"qualifier" : "btc1", "timestamp" : "1496715394968", "columnFamily" : "btcf1", "row" : "00001", "type" : "Put", "value" : "c11"} {"qualifier" : "btc2", "timestamp" : "1496715408865", "columnFamily" : "btcf2", "row" : "00001", "type" : "Put", "value" : "c21"}'), (u'00002', u'{"qualifier" : "btc1", "timestamp" : "1496715423206", "columnFamily" : "btcf1", "row" : "00002", "type" : "Put", "value" : "c12"} {"qualifier" : "btc2", "timestamp" : "1496715436087", "columnFamily" : "btcf2", "row" : "00002", "type" : "Put", "value" : "c22"}')]
The paired RDD tableRDDfromHBase has RowID as a key. The columns and column classifiers are JSON strings, which is the value part. In a previous recipe, we solved the problem of reading JSON files.
Note
Remember, Recipe 6-12 code will work with only spark-1.6 and before. You can get the code on GitHub at https://github.com/apache/spark/blob/ed9d80385486cd39a84a689ef467795262af919a/examples/src/main/python/hbase_inputformat.py .
There is another twist. We are using many classes, so we have to add some JARs while starting the PySpark shell. The following are the JAR files:
  • spark-examples-1.6.0-hadoop2.6.0.jar
  • hbase-client-1.2.4.jar
  • hbase-common-1.2.4.jar
The following is the command to start the PySpark shell:
pyspark --jars 'spark-examples-1.6.0-hadoop2.6.0.jar','/hbase-client-1.2.4.jar','hbase-common-1.2.4.jar'
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.226.240