Poudro's blog
CTO / Data Scientist / Problem Solver - Consultant

Building an Elasticsearch index offline using Hadoop Pig

Building an Elasticsearch index offline using Hadoop Pig - poudro.com développeur web freelance


In this post, I explore the possibility of generating an Elasticsearch index offline, directly using Lucene and Hadoop Pig.


Recently I started using Apache Lucene, and, while I was highly impressed by the library itself, when you wish to deploy it for a large scale project, you need something that can take care of the distribution/replication/administration of your indexes. The market has two major competitors at the moment : Apache SOLR and Elasticsearch. The purpose of this post is not to explore the differences between the two, both have their advantages and disadvantages, take a look at this link for more details.

The Context

What I’m interested in here, is figuring out how to generate an index for Elasticsearch (ES) without having to rely on a running instance of ES.


Using the Elasticsearch Hadoop plugin, it already is possible to take advantage of Pig to index your data into ES, only it requires an instance of ES be running. That means, you have to have at least one Elasticsearch server up and running, ready to ingest your data as quickly as possible while Pig streams it in. This works well if you have time and/or not so much data that it overwhelms the server. But when I tried it with loads of data, the server either crashed or the jobs timed out. I was able to make it work running enough instances in parallel to ingest that data. But, at least to me, that’s not a viable solution in production environments.


What next?


On github, Twitter published a number of tools for Hadoop called Elephant Bird. Within these tools is a Pig UDF (user defined function) that can be used to generate a Lucene index directly from Pig with whatever schema you wish. This works really well to generate huge indexes in parallel on hadoop.


So, why can’t I try to generate a Lucene index directly and push it into ES?


The Elasticsearch index specificities

I don’t want to go into the details of ES here, there is an extensive user guide on their website. In short, Elasticsearch is built on top of Lucene. It has a few extra features which it integrates into the index by adding extra fields. You can read about these here. The three main fields it adds are “_source”, “_all” and “_uid”. The “_source” field is, basically, a compressed copy of the original document you indexed (minus the fields you chose to discard), you can turn this off. The second field, “_all”, is an indexed field that concatenates all indexable fields, again you can turn this off if you don’t need it. The final field, “_uid”, is a unique id of your document within the index, it is required by ES.


ES also has a very useful sharding system that allows to distribute your index across shards, instances, nodes, etc… On the filesystem, by default, each shard is a seperate Lucene index within its own subdirectory.


What happens when you drop in an index straight up?


First, let’s create a specific mapping :

curl -XPOST localhost:9200/test -d '{
    "settings": {
        "number_of_replicas": 0,
        "number_of_shards": 1
    },
    "mappings": {
        "uf": {
            "_source": {
                "enabled": false
            },
            "_all": {
                "enabled": false
            },
            "properties": {
                "field1": {
                    "type": "integer",
                    "store": "yes"
                },
                "field2": {
                    "type": "integer",
                    "store": "yes"
                }
            }
        }
    }
}'



Which will create an index called test, with a single type called “fu” and with two integer typed fields “field1″ and “field2″. For this test, the index doesn’t have any replicas and is composed of only a single shard. It also explicitly deactivates “_all” and “_source” fields. The index is, by default, located in a subdirectory of the untarred archive.


The first thing I tried was, after stopping ES, simply replacing the data in the “data/cluster/nodes/0/indices/test/0/index” with a Lucene index I had just created with 100 documents containing those two fields. After restarting ES, it had successfully loaded the 100 documents (they were being registered in the _stats link). But when I tried a default query, I was greeted with this error :

java.lang.NullPointerException
	at org.elasticsearch.search.fetch.FetchPhase.execute(FetchPhase.java:154)
	at org.elasticsearch.search.SearchService.executeFetchPhase(SearchService.java:326)
	[...]



Checking out the source code of FetchPhase.java, it was obvious the system was looking for “_uid” as it was loading a default FieldsVisitor subclass which all seem to contain “_uid” and I had deactivated the “_source”.


I reiterated the experiment after adding a “_uid” string field in my Lucene index with values generated by java.util.UUID. The doc says “the internal _uid field is the unique identifier of a document within an index and is composed of the type and the id”. So I knew I was missing the type, but let’s see. This time I received this error :

java.lang.StringIndexOutOfBoundsException: String index out of range: -1
	at java.lang.String.substring(String.java:1937)
	at org.elasticsearch.index.mapper.Uid.createUid(Uid.java:117)
        [...]



So I checked Uid.java, where it is quite obvious the “_uid” is actually a concatenated String of type and ID with a “#” as a seperator. I once again regenerated my index, this time concatenating the result of java.util.UUID to “fu#”. Restarted ES, made a query, and EUREKA! It worked! I was able to list documents, query, update, add, i.e. all operations seemed to work the way they should.


Generating the index with Hadoop Pig

As mentionned, I am going to use one of the elements in Twitter’s tool package Elephant Bird to generate the index. To be more specific, in this open sourced code, there is a class LuceneIndexOutputFormat you can extend to output from Pig to a Lucene index using the format you desire. It is fairly straight forward to use in a Pig script. Here is the simplest example :

register 'lucene.jar';

a = load 'input' using PigStorage('\t') as (field1:integer, field2:integer);
store a into 'index' using com.poudro.pig.storage.LuceneIndexStorage('com.poudro.mapreduce.output.TestESOF');



As you can see, this script outputs to LuceneIndexStorage which internally calls TestESOF. This class is where the actual documents are being generated. Here is an example of this class :

package com.poudro.mapreduce.output;

import java.io.IOException;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.util.Version;
import org.apache.pig.data.Tuple;

public class TestESOF extends LuceneIndexOutputFormat {

        private Field field1 = new IntField("field1", 0, Field.Store.YES);
        private Field field2 = new IntField("field2", 0, Field.Store.YES);
        private Field uid = new StringField("_uid", "", Field.Store.YES);

        private Document d = null;
        
        @Override
        protected Document buildDocument(K key, V value) throws IOException {
                if (d == null)
                {
                        d = new Document();
                        d.add(field1);
                        d.add(field2);
                        d.add(uid);
                }
                
                Tuple tuple = (Tuple)value;
                field1.setIntValue((Integer)tuple.get(0));
                field2.setIntValue((Integer)tuple.get(1));
                uid.setStringValue("uf#"+UUID.randomUUID().toString());

                return d;
        }

        @Override
        protected Analyzer newAnalyzer(Configuration conf) {
                return new StandardAnalyzer(Version.LUCENE_44);
        }                
}



Notice the newAnalyzer method at the bottom that you can use to define the analyzer you need to generate your documents. For specific details, please look at Elephant Bird’s source code


So now you have your index generated using Pig.

Integrating the two

Well, there’s not much more to do. The next step is simply extracting the index from HDFS and setting it up in ES. In my example above, I generated a single sharded index in ES, of course this works with as many shards as you want. By controlling the number of reducers in Pig you can output to the correct number of shards, or you can merge the outputted indexes into a smaller number.


Also, it should be pretty straight forward to add the “_source” field using Elasticsearch’s internals by packaging them into your jar. You should be able to do this with all other fields.


That’s all, thanks for reading… :)




06 Oct 2013