Importing data from MySQL into HDFS using Sqoop

Sqoop is an Apache project that is part of the broader Hadoop ecosphere. In many ways Sqoop is similar to distcp (See the Moving data efficiently between clusters using Distributed Copy recipe of this chapter). Both are built on top of MapReduce and take advantage of its parallelism and fault tolerance. Instead of moving data between clusters, Sqoop was designed to move data from and into relational databases using a JDBC driver to connect.

Its functionality is extensive. This recipe will show how to use Sqoop to import data from MySQL to HDFS using the weblog entries as an example.

Getting ready

This example uses Sqoop v1.3.0.

If you are using CDH3, you already have Sqoop installed. If you are not running CDH3, you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.

This recipe assumes that you have a MySQL instance up and running that can reach your Hadoop cluster. The mysql.user table is configured to accept a user connecting from the machine where you will be running Sqoop. Visit http://dev.mysql.com/doc/refman//5.5/en/installing.html for more information on installing and configuring MySQL.

The MySQL JDBC driver JAR file has been copied to $SQOOP_HOME/libs. The driver can be downloaded from http://dev.mysql.com/downloads/connector/j/.

How to do it...

Complete the following steps to transfer data from a MySQL table to an HDFS file:

  1. Create a new database in the MySQL instance:
    CREATE DATABASE logs;
  2. Create and load the weblogs table:
    USE logs;
    CREATE TABLE weblogs(
        md5             VARCHAR(32),
        url             VARCHAR(64),
        request_date    DATE,
        request_time    TIME,
        ip              VARCHAR(15)
    );
    LOAD DATA INFILE '/path/weblog_entries.txt' INTO TABLE weblogs
    FIELDS TERMINATED BY '	' LINES TERMINATED BY '
    ';
  3. Select a count of rows from the weblogs table:
    mysql> select count(*) from weblogs;

    The output would be:

    +----------+
    | count(*) |
    +----------+
    |     3000 |
    +----------+
    1 row in set (0.01 sec)
  4. Import the data from MySQL to HDFS:
    sqoop import -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr --password test1 --table weblogs --target-dir /data/weblogs/import

    The output would be:

    INFO orm.CompilationManager: Writing jar file:
    /tmp/sqoop-jon/compile/f57ad8b208643698f3d01954eedb2e4d/weblogs.jar
    WARN manager.MySQLManager: It looks like you are importing from mysql.
    WARN manager.MySQLManager: This transfer can be faster! Use the --direct
    WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
    ...
    INFO mapred.JobClient:     Map input records=3000
    INFO mapred.JobClient:     Spilled Records=0
    INFO mapred.JobClient:     Total committed heap usage (bytes)=85000192
    INFO mapred.JobClient:     Map output records=3000
    INFO mapred.JobClient:     SPLIT_RAW_BYTES=87
    INFO mapreduce.ImportJobBase: Transferred 245.2451 KB in 13.7619 seconds (17.8206 KB/sec)
    INFO mapreduce.ImportJobBase: Retrieved 3000 records.

How it works...

Sqoop loads the JDBC driver defined in the --connect statement from $SQOOP_HOME/libs, where $SQOOP_HOME is the full path to the location where Sqoop is installed. The --username and --password options are used to authenticate the user issuing the command against the MySQL instance. The mysql.user table must have an entry for the --username option and the host of each node in the Hadoop cluster; or else Sqoop will throw an exception indicating that the host is not allowed to connect to the MySQL Server.

mysql> USE mysql;
mysql> select host, user from user;

The output would be:

+------------+-----------+
| user       | host      |
+------------+-----------+
| hdp_usr    | hdp01     |
| hdp_usr    | hdp02     |
| hdp_usr    | hdp03     |
| hdp_usr    | hdp04     |
| root       | 127.0.0.1 |
| root       | ::1       |
| root       | localhost |
+------------+-----------+
7 rows in set (1.04 sec)

In this example, we connected to the MySQL server using hdp_usr. Our cluster has four machines, hdp01, hdp02, hdp03, and hdp04.

The --table argument tells Sqoop which table to import. In our case, we are looking to import the weblogs table into HDFS. The --target-dir argument is passed the folder path in HDFS where the imported table will be stored:

hadoop fs -ls /data/weblogs/import

The output would be:

-rw-r--r--   1   hdp_usr hdp_grp    0      2012-06-08  23:47 /data/weblogs/import/_SUCCESS
drwxr-xr-x-  -  hdp_usr  hdp_grp    0      2012-06-08  23:47 /data/weblogs/import/_logs
-rw-r--r--  1  hdp_usr  hdp_grp     251131 2012-06-08  23:47 /data/weblogs/import/part-m-00000

By default, the imported data will be split on the primary key. If the table being imported does not have a primary key, the -m or --split-by arguments must be used to tell Sqoop how to split the data. In the preceding example, the -m argument was used. The -m argument controls the number of mappers that are used to import the data. Since -m was set to 1, a single mapper was used to import the data. Each mapper used will produce a part file.

This one line hides an incredible amount of complexity. Sqoop uses the metadata stored by the database to generate the DBWritable classes for each column. These classes are used by DBInputFormat, a Hadoop input format with the ability to read the results of arbitrary queries run against a database. In the preceding example, a MapReduce job is started using the DBInputFormat class to retrieve the contents from the weblogs table. The entire weblogs table is scanned and stored in /data/weblogs/import.

There's more...

There are many useful options for configuring how Sqoop imports data. Sqoop can import data as Avro or Sequence files using the --as-avrodatafile and --as-sequencefile arguments respectively. The data can be compressed while being imported as well using the -z or --compress arguments. The default codec is GZIP, but any Hadoop compression codec can be used by supplying the --compression-codec <CODEC> argument. See the Compressing data using LZO recipe in Chapter 2, HDFS. Another useful option is --direct. This argument instructs Sqoop to use native import/export tools if they are supported by the configured database. In the preceding example, if --direct was added as an argument, Sqoop would use mysqldump for fast exporting of the weblogs table. The --direct argument is so important that in the preceding example, a warning message was logged as follows:

WARN manager.MySQLManager: It looks like you are importing from mysql.
WARN manager.MySQLManager: This transfer can be faster! Use the --direct
WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.

See also

  • Exporting data from HDFS into MySQL using Sqoop
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.137.213