Coding for HDFS operations

Let's now look at some code to perform operations on the HDFS file system and interact with Hadoop, such as copying a file from the local to HDFS and vice versa, deleting a file, and reading a file from HDFS:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

//To copy a file from local drive to hdfs
public void copyLocalFileToHDFS (String source, String dest) throws IOException {
  Configuration conf = new Configuration();
  conf.addResource(new Path("<hadoop conf dir path>/core-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/hdfs-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/mapred-site.xml"));
  FileSystem fileSystem = FileSystem.get(conf);
  Path srcPath = new Path(source);
  Path dstPath = new Path(dest);
  if (!(fileSystem.exists(dstPath))) {
    System.out.println("No such destination " + dstPath);
    return;
  }
  String filename = source.substring(source.lastIndexOf('/') + 1, source.length());
  try{
    fileSystem.copyFromLocalFile(srcPath, dstPath);
    System.out.println("File " + filename + "copied to " + dest);
  }catch(Exception e){
    System.err.println("Exception caught! :" + e);
    System.exit(1);
  }finally{
  fileSystem.close();
  }
}

//copy a file from HDFS to local drive
public void copyHDFSFileToLocal (String source, String dest) throws IOException {
  Configuration conf = new Configuration();
  conf.addResource(new Path("<hadoop conf dir path>/core-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/hdfs-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/mapred-site.xml"));
  FileSystem fileSystem = FileSystem.get(conf);
  Path srcPath = new Path(source);
  Path dstPath = new Path(dest);
  if (!(fileSystem.exists(dstPath))) {
    System.out.println("No such destination " + dstPath);
    return;
  }
  String filename = source.substring(source.lastIndexOf('/') + 1, source.length());
  try{
    fileSystem.copyToLocalFile(srcPath, dstPath)
    System.out.println("File " + filename + "copied to " + dest);
  }catch(Exception e){
    System.err.println("Exception caught! :" + e);
    System.exit(1);
  }finally{
    fileSystem.close();
  }
}

//delete a file from hdfs
public void deleteAfileOnHDFS(String file) throws IOException {
  Configuration conf = new Configuration();
  conf.addResource(new Path("<hadoop conf dir path>/core-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/hdfs-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/mapred-site.xml"));
  FileSystem fileSystem = FileSystem.get(conf);
  Path path = new Path(file);
  if (!fileSystem.exists(path)) {
    System.out.println("File " + file + " does not exists");
  return;
  }
  fileSystem.delete(new Path(file), true);
  fileSystem.close();
}

//get block locations of a file on HDFS
public void getBlockLocationsOfHDFSFile(String source) throws IOException{
  Configuration conf = new Configuration();
  conf.addResource(new Path("<hadoop conf dir path>/core-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/hdfs-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/mapred-site.xml"));
  FileSystem fileSystem = FileSystem.get(conf);
  Path srcPath = new Path(source);
  if (!(ifExists(srcPath))) {
    System.out.println("No such destination " + srcPath);
    return;
  }
  String filename = source.substring(source.lastIndexOf('/') + 1, source.length());
  FileStatus fileStatus = fileSystem.getFileStatus(srcPath);
  BlockLocation[] blkLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
  int blkCount = blkLocations.length;
  System.out.println("File :" + filename + "stored at:");
  for (int i=0; i < blkCount; i++) {
    String[] hosts = blkLocations[i].getHosts();
    System.out.format("Host %d: %s %n", i, hosts);
  }
}

//create a directory on HDFS
public void createFileOnHDFS(String dir) throws IOException {
  Configuration conf = new Configuration();
  conf.addResource(new Path("<hadoop conf dir path>/core-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/hdfs-site.xml"));
  conf.addResource(new Path("<hadoop conf dir path>/mapred-site.xml"));
  FileSystem fileSystem = FileSystem.get(conf);
  Path path = new Path(dir);
  if (fileSystem.exists(path)) {
    System.out.println("Dir " + dir + " already exists!");
    return;
  }
  fileSystem.mkdirs(path);
  fileSystem.close();
}

Likewise, we can create a function like this in our own HDFS client and perform similar operations. We can use all the APIs provided by Hadoop at https://hadoop.apache.org/docs/current/api/overview-summary.html.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.10.45