It is very straightforward and easy to read data from a local filesystem in Spark. Let's discuss this with examples, as follows:
Let's put first things first. First, create (or reuse) the Maven project described in the previous chapter and create a Java class (with main method) for our application. We will start by creating a JavaSparkContext:
SparkConf conf =new SparkConf().setMaster("local").setAppName("Local File system Example"); JavaSparkContext jsc=new JavaSparkContext(conf);
To read a text file in Spark, the textFile method is provided by SparkContext. The following is the signature of the method:
textFile(String path)
This method can be used to read the file from local filesystems, as follows:
JavaRDD<String> localFile = jsc.textFile("absolutepathOfTheFileOnLocalFileSystem");
As shown previously, the file will be read as RDD of strings. Now we can run transformation and/or actions on the data of the file. For example, wordcount on this data can be executed as follows:
JavaPairRDD<String, Integer> wordCountLocalFile =localFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator()) .mapToPair(x -> new Tuple2<String, Integer>((String) x, 1)) .reduceByKey((x, y) -> x + y); wordCountLocalFile.collect();
The textfile method can be used to read multiple files as follows:
- To read more than one file:
jsc.textFile("absolutepathOfTheFile1,absolutepathOfTheFile2");
- To read all the files in a directory:
jsc.textFile("absolutepathOfTheDir/*");
- To read all the files in multiple directories:
jsc.textFile("absolutepathOfTheDir1/*,absolutepathOfTheDir2/*");
The textfile method also provides an overload that allows users to provide the minimum number of partitions:
textFile(String path,int minPartitions)
The minPartitons parameters help to calculate the number of splits the file is distributed into.
The Spark textFile method internally uses FileInputFormat[1] of Hadoop to calculate the input splits. The following is the formula used to calculate input splits:
Math.max(minSize, Math.min(goalSize, blockSize));
Where minSize = mapred.min.split.size or mapreduce.input.fileinputformat.split.minsize:
goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); (total size is total size of input data)
blockSize is the block size of the file, which is considered to be 32 MB (hadoop fs.local.block.size) in the case of local filesystems and 64 MB/128 MB (dfs.blocksize) in the case of HDFS.
The numSplits value, which is used to calculate goal Size is the minPartitons parameter value.
SparkContext provide another method to read the files:
wholeTextFiles(String path)
wholeTextFiles(String path,int minPartitions)
This method returns a pair RDD with key as the filename and value as the content of the file:
JavaPairRDD<String, String> localFile = jsc.wholeTextFiles("absolutepathOfTheFile");
Similar to the textFilemethod, multiple files can be read using the wholeTextFiles method as follows:
- To read more than one file
jsc.wholeTextFiles ("absolutepathOfTheFile1,absolutepathOfTheFile2");
- To read all the files in a directory
jsc.wholeTextFiles ("absolutepathOfTheDir/*");
- To read all the files in multiple directories
jsc.wholeTextFiles ("absolutepathOfTheDir1/*,absolutepathOfTheDir2/*");
To save the output back to the local file, the system,saveAsTextFile() method can be used as follows:
wordCountLocalFile.saveAsTextFile("absolutePathOfTheOutputDir");
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException