Importing data from MongoDB into HDFS

This recipe will use the MongoInputFormat class to load data from a MongoDB collection into HDFS.

Getting ready

The easiest way to get started with the Mongo Hadoop Adaptor is to clone the mongo-hadoop project from GitHub and build the project configured for a specific version of Hadoop. A Git client must be installed to clone this project.

This recipe assumes that you are using the CDH3 distribution of Hadoop.

The official Git Client can be found at http://git-scm.com/downloads.

GitHub for Windows can be found at http://windows.github.com/.

GitHub for Mac can be found at http://mac.github.com/.

The Mongo Hadoop Adaptor can be found on GitHub at https://github.com/mongodb/mongo-hadoop. This project needs to be built for a specific version of Hadoop. The resulting JAR file must be installed on each node in the $HADOOP_HOME/lib folder.

The Mongo Java Driver is required to be installed on each node in the $HADOOP_HOME/lib folder. It can be found at https://github.com/mongodb/mongo-java-driver/downloads.

How to do it...

Complete the following steps to copy data from MongoDB into HDFS:

  1. Clone the mongo-hadoop repository:
    git clone https://github.com/mongodb/mongo-hadoop.git
  2. Switch to the stable release 1.0 branch:
    git checkout release-1.0
  3. Set the Hadoop version which mongo-hadoop should target. In the folder that mongo-hadoop was cloned to, open the build.sbt file with a text editor. Change the following line:
    hadoopRelease in ThisBuild := "default"

    to

    hadoopRelease in ThisBuild := "cdh3"
  4. Build mongo-hadoop:
    ./sbt package

    This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the core/target folder.

  5. Download the Mongo Java Driver Version 2.8.0 from https://github.com/mongodb/mongo-java-driver/downloads.
  6. Copy mongo-hadoop and the MongoDB Java Driver to $HADOOP_HOME/lib on each node:
    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
  7. Create a Java MapReduce program that will read the weblogs file from a MongoDB collection and write them to HDFS:
    import java.io.*;
    
    import org.apache.commons.logging.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    import org.apache.hadoop.mapreduce.*;
    import org.bson.*;
    
    import com.mongodb.hadoop.*;
    import com.mongodb.hadoop.util.*;
    
    public class ImportWeblogsFromMongo {
    
       private static final Log log = LogFactory.getLog(ImportWeblogsFromMongo.class);
    
       public static class ReadWeblogsFromMongo extends Mapper<Object, BSONObject, Text, Text>{
    
          public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{
    
             System.out.println("Key: " + key);
             System.out.println("Value: " + value);
    
             String md5 = value.get("md5").toString();
             String url = value.get("url").toString();
             String date = value.get("date").toString();
             String time = value.get("time").toString();
             String ip = value.get("ip").toString();
             String output = "	" + url + "	" + date + "	" + 
                             time + "	" + ip;
             context.write( new Text(md5), new Text(output));
          }
       }
    
       public static void main(String[] args) throws Exception{
    
          final Configuration conf = new Configuration();
          MongoConfigUtil.setInputURI(conf,
    "mongodb://<HOST>:<PORT>/test.weblogs");
          MongoConfigUtil.setCreateInputSplits(conf, false);
          System.out.println("Configuration: " + conf);
    
          final Job job = new Job(conf, "Mongo Import");
    
          Path out = new Path("/data/weblogs/mongo_import");
          FileOutputFormat.setOutputPath(job, out);
          job.setJarByClass(ImportWeblogsFromMongo.class);
          job.setMapperClass(ReadWeblogsFromMongo.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
    
          job.setInputFormatClass(MongoInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
    
          job.setNumReduceTasks(0);
    
          System.exit(job.waitForCompletion(true) ? 0 : 1 );
          
       }
    
    }

    This map-only job uses several classes provided by the Mongo Hadoop Adaptor. Data that is read in from HDFS is converted to a BSONObject. This class represents a binary format JSON value. MongoDB uses these BSON objects to efficiently serialize, transfer, and store data. The Mongo Hadoop Adaptor also provides a convenient MongoConfigUtil class to help set up the job to connect to MongoDB as if it were a filesystem.

  8. Export as runnable JAR file and run the job:
    hadoop jar ImportWeblogsFromMongo.jar
  9. Verify that the weblogs were imported from MongoDB:
    hadoop fs -ls /data/weblogs/mongo_import

How it works...

The Mongo Hadoop Adaptor provides a new Hadoop compatible filesystem implementation, MongoInputFormat and MongoOutputFormat. These abstractions make working with MongoDB similar to working with any Hadoop compatible filesystem.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.62.168