Exporting data from HDFS into MySQL using Sqoop

Sqoop is an Apache project that is part of the broader Hadoop ecosphere. In many ways Sqoop is similar to distcp (See the Moving data efficiently between clusters using Distributed Copy recipe of this chapter). Both are built on top of MapReduce and take advantage of its parallelism and fault tolerance. Instead of moving data between clusters, Sqoop was designed to move data from and into relational databases using a JDBC driver to connect.

Its functionality is extensive. This recipe will show how to use Sqoop to export data from HDFS to MySQL using the weblog entries as an example.

Getting ready

This example uses Sqoop v1.3.0.

If you are using CDH3, you already have Sqoop installed. If you are not running CDH3 you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.

This recipe assumes that you have a MySQL instance up and running that can reach your Hadoop cluster. The mysql.user table is configured to accept a user connecting from the machine where you will be running Sqoop. Visit http://dev.mysql.com/doc/refman/5.5/en/installing.html for more information on installing and configuring MySQL.

The MySQL JDBC driver JAR file has been copied to $SQOOP_HOME/libs. The driver can be downloaded from http://dev.mysql.com/downloads/connector/j/.

Follow the Importing and exporting data into HDFS using the Hadoop shell commands recipe of this chapter to load the weblog_entires.txt file into HDFS.

How to do it...

Complete the following steps to transfer data from HDFS to a MySQL table:

  1. Create a new database in the MySQL instance:
    CREATE DATABASE logs;
  2. Create the weblogs_from_hdfs table:
    USE logs;
    CREATE TABLE weblogs_from_hdfs (
        md5             VARCHAR(32),
        url             VARCHAR(64),
        request_date    DATE,
        request_time    TIME,
        ip              VARCHAR(15)
    );
  3. Export the weblog_entries.txt file from HDFS to MySQL:
    sqoop export -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr --password test1 --table weblogs_from_hdfs --export-dir /data/weblogs/05102012 --input-fields-terminated-by '	' --mysql-delmiters

    The output is as follows:

    INFO mapreduce.ExportJobBase: Beginning export of weblogs_from_hdfs
    input.FileInputFormat: Total input paths to process : 1
    input.FileInputFormat: Total input paths to process : 1
    mapred.JobClient: Running job: job_201206222224_9010
    INFO mapred.JobClient:   Map-Reduce Framework
    INFO mapred.JobClient:     Map input records=3000
    INFO mapred.JobClient:     Spilled Records=0
    INFO mapred.JobClient:     Total committed heap usage (bytes)=85000192
    INFO mapred.JobClient:     Map output records=3000
    INFO mapred.JobClient:     SPLIT_RAW_BYTES=133
    INFO mapreduce.ExportJobBase: Transferred 248.3086 KB in 12.2398 seconds (20.287 KB/sec)
    INFO mapreduce.ExportJobBase: Exported 3000 records.

How it works...

Sqoop loads the JDBC driver defined in the --connect statement from $SQOOP_HOME/libs, where $SQOOP_HOME is the full path to the location where Sqoop is installed. The --username and --password options are used to authenticate the user issuing the command against the MySQL instance. The mysql.user table must have an entry for the --username and the host of each node in the Hadoop cluster; or else Sqoop will throw an exception indicating that the host is not allowed to connect to the MySQL Server.

mysql> USE mysql;
mysql> select host, user from user;
+---------------+-----------+
| user          | host      |
+---------------+-----------+
| hdp_usr       | hdp01     |
| hdp_usr       | hdp02     |
| hdp_usr       | hdp03     |
| hdp_usr       | hdp04     |
| root          | 127.0.0.1 |
| root          | ::1       |
| root          | localhost |
+---------------+-----------+
7 rows in set (1.04 sec)

In this example, we connected to the MySQL server using hdp_usr. Our cluster has four machines, hdp01, hdp02, hdp03, and hdp04.

The --table argument identifies the MySQL table that will receive the data from HDFS. This table must be created before running the Sqoop export command. Sqoop uses the metadata of the table, the number of columns, and their types, to validate the data coming from the HDFS folder and to create INSERT statements. For example, the export job can be thought of as reading each line of the weblogs_entries.txt file in HDFS and producing the following output:

INSERT INTO weblogs_from_hdfs
VALUES('aabba15edcd0c8042a14bf216c5', '/jcwbtvnkkujo.html', '2012-05-10', '21:25:44', '148.113.13.214'),

INSERT INTO weblogs_from_hdfs
VALUES('e7d3f242f111c1b522137481d8508ab7', '/ckyhatbpxu.html', '2012-05-10', '21:11:20', '4.175.198.160'),

INSERT INTO weblogs_from_hdfs
VALUES('b8bd62a5c4ede37b9e77893e043fc1', '/rr.html', '2012-05-10', '21:32:08', '24.146.153.181'),
...

By default, Sqoop export creates INSERT statements. If the --update-key argument is specified, UPDATE statements will be created instead. If the preceding example had used the argument --update-key md5, the generated code would have run like the following:

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:25:44'
ip='148.113.13.214'WHERE md5='aabba15edcd0c8042a14bf216c5'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:11:20' ip='4.175.198.160' WHERE md5='e7d3f242f111c1b522137481d8508ab7'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:32:08' ip='24.146.153.181' WHERE md5='b8bd62a5c4ede37b9e77893e043fc1'

In the case where the --update-key value is not found, setting the --update-mode to allowinsert will insert the row.

The -m argument sets the number of map jobs reading the file splits from HDFS. Each mapper will have its own connection to the MySQL Server. It will insert up to 100 records per statement. After it has completed 100 INSERT statements, that is 10,000 records in total, it will commit the current transaction. It is possible that a map task failure could cause data inconsistency resulting in possible insert collisions or duplicated data. These issues can be overcome with the use of the --staging-table argument. This will cause the job to insert into a staging table, and then in one transaction, move the data from the staging table to the table specified by the --table argument. The --staging-table argument must have the same format as --table. The --staging-table argument must be empty, or else the --clear-staging-table argument must be used.

See also

  • Importing data from MySQL into HDFS using Sqoop
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.184.3