Solr and Hadoop

Apache Hadoop and the big data ecosystem have exploded in popularity and most developers are at least loosely familiar with it. Needless to say, there are many pieces of the Hadoop ecosystem that work together to form a big data platform. It's mostly an a-la-carte world in which you combine the pieces you want, each having different uses, or makes different trade-offs between ease-of-coding and performance. What does Solr have to do with Hadoop, you may ask? Read on.

HDFS

As an alternative to a standard filesystem, Solr can store its indexes in Hadoop Distributed File System (HDFS). HDFS acts like a shared filesystem for Solr, somewhat like how networked storage is (for example, a SAN), but is implemented at the application layer instead of at the OS or hardware layer. HDFS offers almost limitless growth, and you can increase storage incrementally without restarting or reconfiguring the server processes supporting it. HDFS has redundancy too, although this is extra-redundant with SolrCloud replication. Ideally, Solr nodes should be running on the same machines as HDFS data nodes. If you already have HDFS for other purposes, be it for MapReduce jobs or whatever, then this may be particularly appealing, but otherwise, it is probably more complex than it's worth. Solr's HDFS integration is built into Solr—see the Solr Reference Guide for the details and read it thoroughly. If you've already got HDFS running, it's really quite easy to get Solr to use it.

A nice bonus of using HDFS is the option of using the autoAddReplicas feature in Solr 4.10. With this feature, Solr will respond to a node failure by automatically adding replacement replicas on other nodes to maintain the desired replicationFactor. Although it may work with any shared filesystem, only HDFS is supported right now. A planned benefit of shared filesystems is having leaders and replicas use the same index, thereby saving space and freeing replicas from much of the impact of concurrent indexing activity. See SOLR-6237 (https://issues.apache.org/jira/browse/SOLR-6237) for the current status. Once that feature is released, using HDFS will be far more compelling!

Indexing via MapReduce

Solr includes a contrib module named map-reduce that provides Hadoop MapReduce components for indexing documents into new Solr indexes stored on HDFS. It includes a cool go-live feature that will then merge those generated indexes into Solr's existing indexes in HDFS without experiencing any downtime. This module provides the most value when your data is already in HDFS and there is a lot of data and/or CPU-intensive work in generating the resulting Solr input documents, such as doing text extraction from common document formats. Using the MapReduce paradigm, this work is parallelized across a cluster of Hadoop nodes in a fault-tolerant manner, and even the resulting Solr indexes get built from this process. Even if you are doing trivial text processing on tweets, you can benefit from this module by leveraging your large Hadoop cluster to index all of your data faster than you would be able to with just your Solr cluster. Furthermore, your search performance is not going to be heavily impacted by the ongoing indexing activity because it is separated.

To learn more about the map-reduce module, start with MapReduceIndexerTool, a command-line program and façade to the nuts and bolts here. It's not documented in Solr's Reference Guide; instead Google it and you will wind up looking at its documentation within the documentation for Cloudera Search: http://www.cloudera.com/content/cloudera/en/documentation/cloudera-search/v1-latest/Cloudera-Search-User-Guide/csug_mapreduceindexertool.html.

Morphlines

This module and two other related contrib modules use an Extract Transform Load (ETL) framework called Morphlines, open sourced by Cloudera. However, unlike some other popular ETLs, this one is lightweight and developer friendly (no IDE!) and has a data model matching Lucene/Solr—String keys supporting multiple values. Morphlines enables MapReduceIndexerTool to support any input format and custom transformations just by editing a configuration file. If the numerous built-in Morphline commands aren't sufficient, you can of course write your own commands to plug in. Morphlines does not require Hadoop or even Solr, though you will see strong associations with both. If you have non-trivial steps to construct a Solr input document for indexing, then you should definitely check it out. The best source of information for this is available at http://kitesdk.org/docs/current/kite-morphlines/index.html.

Running a Solr build using Hadoop

The source code for this example is available at /examples/9/solr-map-reduce-example.

Note

The original code was published by SolrCloud lead developer Mark Miller: https://github.com/markrmiller/solr-map-reduce-example.

Make sure you don't have any Solr or Hadoop processes running before you start performing the following steps:

  1. Start the example via the ./run-example.sh script. You'll see that the Hadoop and Solr distributions are downloaded, and various processes related to them are run.
  2. When it completes, browse to http://127.0.0.1:8042 and you'll see the Hadoop WebApp. Assuming all went well, you can query some Twitter data via http://localhost:8983/solr/collection1/select?q=*:*.

Looking at the storage

So what makes this special, or different than a traditional SolrCloud setup? Well, first off, the Lucene data files aren't stored on your local filesystem, instead they are on a locally running HDFS cluster. If you browse to one of the shards at http://localhost:8983/solr/#/collection1, you will see that the filesystem implementation is listed as org.apache.solr.core.HdfsDirectoryFactory, and that the data is being stored on the HDFS cluster:

Looking at the storage

How did that happen? If you dig around in run-example.sh, you'll see that when Solr was started, some additional HDFS-related parameters were passed in:

 -Dsolr.directoryFactory=solr.HdfsDirectoryFactory -Dsolr.lock.type=hdfs -Dsolr.hdfs.home=hdfs://127.0.0.1:8020/solr1 -Dsolr.hdfs.confdir=$hadoop_conf_dir

The solr.directoryFactory parameter told Solr to change the default <directoryFactory/> setting in solrconfig.xml to be a HDFS directory factory instead of the traditional filesystem one. Additionally, solr.lock.type overrides the native lock type to enable HDFS, own semantics around locking.

HDFS works very similar to the Unix filesystem that you are already used to, except instead of the solr.home parameter pointing to a local directory, the solr.hdfs.home parameter is the URL of the directory in HDFS. In this case, the data is stored in /solr1/ at the root of the HDFS server running locally on port 8020. Since we are running two Solrs in our local environment, if you look at run-example.sh, you'll see that the second Solr has started with the same parameters except for the HDFS location to store the second shard:

-Dsolr.hdfs.home=hdfs://127.0.0.1:8020/solr2

Let's take a look at the HDFS cluster. From the root of ./examples/9/solr-map-reduce-example, run the command to list the command of all the files and directories on our HDFS cluster:

./hadoop-2.2.0/bin/hadoop fs -fs hdfs://127.0.0.1:8020 -ls /

The command is pretty selfexplanatory; we are using the Hadoop client to make a file listing call. The location of the filesystem is passed in as a url starting with hdfs://, which means you can see how HDFS is meant, from the ground up, to be used in a distributed manner. The command we wish to run is to list all the files in the root directory. The output should be something like this:

Found 5 items
drwxr-xr-x   - epugh supergroup          0 2014-11-12 09:53 /indir
drwxr-xr-x   - epugh supergroup          0 2014-11-12 09:54 /outdir
drwxr-xr-x   - epugh supergroup          0 2014-11-12 09:53 /solr1
drwxr-xr-x   - epugh supergroup          0 2014-11-12 09:53 /solr2
drwxr-xr-x   - epugh supergroup          0 2014-11-12 09:53 /tmp

Ignore /indir and /outdir for now. As you can see, we have two filesystems rooted on the HDFS cluster at /solr1 and /solr2. With HDFS, as the size of the data in these filesystems grow, you just add more hardware to the HDFS cluster, and HDFS manages distributing the additional data to the new hardware. Let's look in a bit more detail at the data files stored in HDFS:

hadoop-2.2.0/bin/hadoop fs -fs hdfs://127.0.0.1:8020 -ls /solr1/collection1/core_node2/data/index

As you can see, we have our write lock file, but this time in HDFS, as well as some Lucene segment files. (To save space, I have only listed the first couple of files):

Found 13 items
-rw-r--r--   1 epugh supergroup          0 2014-11-12 09:53 /solr1/collection1/core_node2/data/index/HdfsDirectory@798a0aac lockFactory=org.apache.solr.store.hdfs.HdfsLockFactory@5ef215db-write.lock
-rwxr-xr-x   1 epugh supergroup      42943 2014-11-12 09:54 /solr1/collection1/core_node2/data/index/_0.fdt
-rwxr-xr-x   1 epugh supergroup         89 2014-11-12 09:54 /solr1/collection1/core_node2/data/index/_0.fdx
-rwxr-xr-x   1 epugh supergroup       2951 2014-11-12 09:54 /solr1/collection1/core_node2/data/index/_0.fnm

So let's see if we can use the power of HDFS to quickly add another Solr process. We've added a script called add-third-solr.sh that fires up another Solr server process, and add it to the cluster of Solrs supporting collection1. List out the root directory in HDFS and you'll see that the data is now being stored in HDFS in /solr3. Look at the log file in the solr-4.10.1/example3/example3.log file to see the behavior. Keen-eyed readers will notice, however, that the /solr3 directory is a complete copy of the original /solr1 or /solr2 directory that has the leader's data. As we mentioned in the intro, once SOLR-6237 is wrapped up, this will drastically reduce the amount of disk storage required, making HDFS a much more compelling option over the traditional SolrCloud.

One last thing to keep in mind when looking at using HDFS is that HDFS is primarily oriented towards the storage of fewer larger files that can be split up and stored on multiple servers versus many small files that are rapidly changing. If you have a near real-time scenario, with many small incremental changes to your data, then you will want to make sure that the overhead of HDFS isn't prohibitive. On the other hand, if you periodically build an index, optimize it, and leave it alone, then the strengths of HDFS may meet your needs perfectly.

The data ingestion process

This demo uses source Twitter data stored in a format called Avro. Apache Avro is a powerful data serialization framework, like Thrift and ProtoBuf. What makes Avro great is that each .avro file includes the schema that describes the data that is stored inside that file. This means that you can interrogate any .avro file and figure out how to read back the data. This makes this data format very stable, with none of the stubs required by Thrift or ProtoBuf for writing and reading the data. Go ahead and open the sample-statuses-20120906-141433-medium.avro file with a text editor. You'll see the schema for the data listed in the binary file as shown here:

{"type":"record","name":"Doc","doc":"adoc","fields":[{"name":"id","type":"string"},{"name":"user_friends_count","type":["int","null"]},{"name":"user_location","type":["string","null"]},{"name":"user_description","type":["string","null"]},{"name":"user_statuses_count","type":["int","null"]},{"name":"user_followers_count","type":["int","null"]},{"name":"user_name","type":["string","null"]},{"name":"user_screen_name","type":["string","null"]},{"name":"created_at","type":["string","null"]},{"name":"text","type":["string","null"]},{"name":"retweet_count","type":["int","null"]},{"name":"retweeted","type":["boolean","null"]},{"name":"in_reply_to_user_id","type":["long","null"]},{"name":"source","type":["string","null"]},{"name":"in_reply_to_status_id","type":["long","null"]},{"name":"media_url_https","type":["string","null"]},{"name":"expanded_url","type":["string","null"]}]}

Remember how we performed a ls command on the HDFS cluster? Similar commands in run-example.sh are used to insert the sample data into the cluster:

samplefile=sample-statuses-20120906-141433-medium.avro
$hadoop_distrib/bin/hadoop --config $hadoop_conf_dir fs -mkdir hdfs://127.0.0.1/indir
$hadoop_distrib/bin/hadoop --config $hadoop_conf_dir fs -put
$samplefile hdfs://127.0.0.1/indir/$samplefile

Want to try it yourself? The following command will upload a copy of the sample data into /indir, which is the staging directory for in-bound data for processing using Hadoop:

./hadoop-2.2.0/bin/hadoop fs -fs hdfs://127.0.0.1:8020 -put sample-statuses-20120906-141433-medium.avro hdfs://127.0.0.1/indir/sample-statuses-20120906-141433-medium2.avro

Once the data is loaded, we are ready to run the Morphline process in our Hadoop cluster to import the data into Solr. The files stored in the HDFS cluster under /indir will be processed into Lucene indexes that are placed into /outdir in HDFS. Let's look at /outdir to see one of the segment files:

./hadoop-2.2.0/bin/hadoop fs -fs hdfs://127.0.0.1:8020 -ls /outdir/results/part-00000/data

Morphlines works by taking a configuration file that describes the steps in the data processing pipeline. Open up the readAvroContainer.conf file in your text editor. You can see that this is DSL for data processing. And yes, it's YAFF (Yet Another File Format) called HOCON (Human-Optimized Config Object Notation), so it may take you a bit to get used to it.

The key bit to understand is that the file declares a set of commands:

  • readAvroContainer, extractAvroPaths, convertTimestamp, sanitizeUnknownSolrFields, and finally loadSolr. Each of these commands is documented in the readAvroContainer.conf file; however, you can guess from the names that the first step of the Morphline is to read records from our .avro files that where uploaded into the /indir directory.
  • The second step, extractAvroPaths is interesting, as it uses a basic path expression, akin to XPath, to map the fields in the Avro document to the names of the fields in Solr. In this simple example, it's pretty redundant since there is a 1-to-1 mapping between field names in Avro and in the Solr schema, but you can see how this would be useful in mapping Avro documents to an existing Solr schema.
  • convertTimestamp is a nice example of the types of manipulations that you always have to do when importing data into Solr. Solr is very fussy about getting all dates in the full ISO standard format of yyyy-MM-dd'T'HH:mm:ss'Z', and this function makes it easy.
  • sanitizeUnknownSolrFields takes advantage of the RESTful APIs added to Solr to query the schema service for the list of fields that Solr knows about, and then filters the documents being sent to drop any fields that Solr doesn't know about.
  • Lastly, the loadSolr command pops the documents into your Solr. The amazing thing is that what you think of as complex process turns out to be the easiest step:
    loadSolr {
      solrLocator : ${SOLR_LOCATOR}
    }

Unfortunately, like many things with Hadoop, the command line to invoke loading data with Morphlines is quite intimidating. To make things simpler, look at the run-just-morphline.sh script, it only runs the Morphline. It assumes you've already run the run-example.sh script to start up the Hadoop and Solr processes. Most of the parameters are pretty selfexplanatory. The really interesting parameter is the --go-live parameter. What this tells the Morphline process to do is to take all the index data generated by the Morphline process in /outdir, and uses the information in ZooKeeper to deploy this new data into the right locations in HDFS for the Solr servers (for us that's /solr1 and /solr2), and then merge them into the live indexes.

Look at the console output from running the script, you can see the MapReduce jobs running all the steps of the Morphline. It gives you a sense of how MapReduce works! If you are debugging the Morphline, add the –dry-run parameter to the command. When you run the Morphline, it will output to the console the documents that it would have otherwise inserted into Solr, giving you a way of debugging what is going on.

Want to know it's working? Look at the Gen value in the Solr admin for one of your shards, and then rerun the Morphline process. You will see the Gen value increment by one. One thing to note is that while the number of live documents in the index won't go up, remaining at 2104, the total number of documents in the index will continue to go up because new segment files are being appended constantly. This can be a bit confusing since Solr considers the shard to be optimized, despite the existence of ostensibly deleted documents in the index!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.125.7