Friday, June 24, 2011

JavaScript Map-Reduce Jobs in Hadoop using Mozilla Rhino

If you read my blog, you know that I am a huge fan of scripting, especially in areas of enterprise architecture that don’t classically see that kind of dynamism in their component logic.  One application of a script engine that recently piqued my interest was writing Map-Reduce jobs in JavaScript within Hadoop.
After playing with technologies like Pig and Hive, I realized that these frameworks don’t exist so much to provide a SQL-like interface to Hadoop, but rather to allow analysts and developers to quickly explore their datasets without needing to compile an implementation in Java.  During my Cloudera Developer training, I noticed that the biggest barrier of entry into the Hadoop ecosystem was a solid Java background.  Many of the students had C++ or .NET backgrounds.  More importantly, I personally found it tedious to write and compile a Hadoop job every time I wanted to create some variation of an existing Map-Reduce capability. 
But what about Hadoop Streaming?  Sure, this is certainly a viable answer, and you could even write your application in Python or Javascript (using Node.js).  I think my biggest problem with Hadoop Streaming is that it is very brittle.  The separation between the key and value is a tab character!  If you are using structured data formats native to Hadoop, you lose the ability to automatically marshal them back into a rich format (think Avro and getting real objects as values!).  If you are writing more complex applications using the Streaming API and need to reference external libraries, you may find yourself in a sticky situation.  Every “Task Node” (DN with TT) may need to have its environment preconfigured manually, vice transferring scripts to the distributed cache (for instance, what if you wanted to use a Python library like NLTK?).
What if instead of using a SQL-like language or an imperative language that has to be compiled, we use a scripting language?  Believe it or not, this is extremely simple to set up since Java already has a specification for hosting scripting languages (JSR 223).  In this post, I’m going to demonstrate how you can use JavaScript to write MapReduce jobs in Hadoop using the Mozilla Rhino script engine.
Before we can start crunching data in JavaScript, however, we need to set up a “hosting environment” that will allow us to use Mozilla Rhino within the context of Hadoop.  This will simply be a generic Map-Reduce job in which we delegate both the map and reduce functions to a JavaScript function in Rhino.
JsMapReduceBase.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;

/**
 * Provides the scaffolding necessary for hosting a 
 * a script engine in Hadoop.
 */
public abstract class JsMapReduceBase extends MapReduceBase {

  protected static final ScriptEngineManager mgr 
                            = new ScriptEngineManager();

  protected ScriptEngine jsEngine;
  protected static final Text outputKey = new Text();
  protected static final Text outputValue = new Text();
  
 /**
  * Configure the Mapper/Reducer using the Job Context.
  * In our case, we want to pull the Mapper and Reducer
  * JavaScript functions from the Distributed Cache.
  * @param job Map-Reduce Job Context
  */
  @Override
  public void configure(JobConf job) {
    super.configure(job);
    
    //Create a new Script Engine using the JSR 223 API.
    //Under the hood, Java will locate any registered 
    //implementation of the Script Language "JavaScript", 
    //which will be the Rhino engine.
    jsEngine = mgr.getEngineByName("JavaScript");
      
      try {
      //Using the Script out of the Cache, load the script
      //body into the engine (parsing and evaluating).
      jsEngine.eval(getScript(job));
      //Key are going to register an instance of a Text key
      //which we will reuse in the Hadoop job
      jsEngine.put("oKey", outputKey);
      //As well as a Text value.
      jsEngine.put("oValue", outputValue);
      
    } catch (Exception e) {
      
      System.err.print(e.getMessage());
    }
  }
  
 /**
  * Clean up any resources used during the Map-Reduce task
  */
  @Override
  public void close() throws IOException {
    super.close();
  }
  
 /**
  * Pull the JavaScript script from the distributed
  * cache.
  * @param job Map-Reduce Job Context
  */
  private String getScript(JobConf job) throws Exception {
    
    StringBuilder sb = new StringBuilder();
    
    Path[] cacheFiles;
    
    cacheFiles = DistributedCache.getLocalCacheFiles(job);
    
    for(Path p : cacheFiles){
    
      String file = readFile(p, job);
      
      System.err.println(file);
      
      sb.append(file).append(" \n");
    }
    
    return sb.toString();
  }
  
 /**
  * Read the contents of a file to a string.
  * @param path Path to the file
  * @param job Map-Reduce Job Context
  * @returns Body of the file as a string
  */
  private String readFile(Path path, JobConf job) 
     throws Exception {

    FSDataInputStream in = null;
    BufferedReader br = null;
    FileSystem fs = FileSystem.get(job);
    in = fs.open(path);
    br  = new BufferedReader(new InputStreamReader(in));
    
    StringBuilder sb = new StringBuilder();
    
    String line = "";
    while ( (line = br.readLine() )!= null) {
      sb.append(line);
    }
    in.close();
    return sb.toString();
  }
  
 /**
  * Call a function on the Script Engine.
  * @param functionName Name of the function to call
  * @param args An array of arguments to pass to the function
  *        representing the function argument signature.
  * @returns The result (if any) from the script function
  */ 
  protected Object callFunction(
    String functionName, Object... args)
    throws Exception {

    return ((Invocable)jsEngine)
               .invokeFunction(functionName, args);
  }
  
 /**
  * Call a method on a Script object within the Script Engine.
  * @param objectName The reference name of the object with
  *        the method that will be called (e.g.: in foo->bar(),
  *        we want 'foo').
  * @param methodName Name of the method to call (bar())
  * @param args An array of arguments to pass to the function
  *        representing the function argument signature.
  * @returns The result (if any) from the script method
  */ 
  protected Object callMethod(
    String objectName, String methodName, Object... args) 
    throws Exception {

    return ((Invocable)jsEngine)
               .invokeMethod(objectName, methodName, args);
  }
}

We will also need to implement the Mapper and Reducer interfaces:

JsMapper.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

/**
 * A very simple implementation of the Mapper interface
 * that delegates the 'map' task to a Script Engine.
 * Keep in mind that this implementation currently uses
 * the Text class for input and output Key-Value pairs,
 * but could easily be changed to use anything.
 */
public class JsMapper extends JsMapReduceBase 
   implements Mapper<Object, Text, Text, Text> {
  
/**
 * Perform a map on the given key-value pair, delegating the
 * call to JavaScript.
 * @param key The Key
 * @param value The Value
 * @param output The output collector we use to send key-value
 *        pairs to the reducer.
 * @param reporter A mechanism we can use for 
 *        'reporting' our progress.
 */
  @Override
  public void map(
      Object key, 
      Text value, 
      OutputCollector<Text, Text> output,
      Reporter reporter) throws IOException {
    
    try {
      //Delegate the call to the "map()" function in JavaScript
      //Note: this was hard-coded to keep the demo simple.
      callFunction("map", key, value, output, reporter);
    } catch (Exception e) {
      //Handle Error
    }
  }
  
}

JsReducer.java

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * A very simple implementation of the Reducer interface
 * that delegates the 'reduce' task to a Script Engine.
 * Keep in mind that this implementation currently uses
 * the Text class for input and output Key-Value pairs,
 * but could easily be changed to use anything.
 */
public class JsReducer extends JsMapReduceBase 
    implements Reducer<Text, Text, Text, Text> {

/**
 * Perform a map on the given key-value pair, delegating the
 * call to JavaScript.
 * @param key The Key
 * @param values A Value Collection for the corresponding key
 * @param output The output collector we use to save key-value
 *        pairs to HDFS.
 * @param reporter A mechanism we can use for 
 *        'reporting' our progress.
 */
  @Override
  public void reduce(
           Text key, 
           Iterator<Text> values,
           OutputCollector<Text, Text> output, 
           Reporter reporter) throws IOException {
    
    try {
      //Delegate the call to the "reduce()" function in JavaScript
      //Note: this was hard-coded to keep the demo simple.
      callFunction("reduce", key, values, output, reporter);
    } catch (Exception e){
      //Handle Error
    }
  }
}

Finally, we need to create a Generic Job Runner for executing the Map-Reduce job.

JsJobDriver.java

import java.net.URI;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

/**
 * Here is a pretty simple Job Driver for loading
 * the correct scripts and input directories for the
 * Map-Reduce job.  You will probably notice
 * that I was running this on the Cloudera VM
 * (the user directory is 'training'.
 * The dataset I was processing is a list of METAR
 * and TAF reporting weather stations that you can
 * find at this url: 
 * http://aviationweather.gov/adds/metars/stations.txt
 *
 * Ideally, you will want to pass in the location of 
 * the input directory and map/reduce JavaScript files.
 */
public class JsJobDriver {
  
  //Default paths for my application
  public static String inputPaths =
    "hdfs://localhost/stationsInput.txt";
  public static String outputPath = 
    "hdfs://localhost/stationsOutput";
  public static String mapJsFile = 
    "hdfs://localhost/user/training/map.js";
  public static String reduceJsFile = 
    "hdfs://localhost/user/training/reduce.js";

 /**
  * Start the Job
  * @param args Console input arguments
  */
  public static void main(String... args) 
     throws Exception {
    
    //If we have two inputs, they are the map-reduce
    //scripts.
    if(args.length == 2){
      mapJsFile = args[0];
      reduceJsFile = args[1];
    }
    
    //If we have four inputs, we are getting the
    //input and output paths, and javascript map and reduce
    //scripts.
    if(args.length == 4){
      inputPaths = args[0];
      outputPath = args[1];
      mapJsFile = args[2];
      reduceJsFile = args[3];
    }
    
    JobConf conf = new JobConf(JsMapper.class);
    conf.setJobName("Js Test.");
    
    FileInputFormat.setInputPaths(conf, new Path(inputPaths));
    FileOutputFormat.setOutputPath(conf, new Path(outputPath));
    
    //Associate the correct Mappers and Reducers
    conf.setMapperClass(JsMapper.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);
    
    conf.setReducerClass(JsReducer.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    
    //Store our scripts in the cache.
    DistributedCache.addCacheFile(new URI(mapJsFile), conf);
    DistributedCache.addCacheFile(new URI(reduceJsFile), conf);
    
    //Run the job
    JobClient.runJob(conf);
  }
}

That's all it takes to get Scripting support in Hadoop! In my opinion is actually pretty trivial, and a lot cleaner than using some of the other frameworks.

Let me show you some JavaScript MapReduce now:

Identity Mapper
map.js

function map(key, value, output, reporter){
  oKey.set(key);
  oValue.set(value);
  output.collect(oKey, oValue);
}

Identity Reducer
reduce.js

function reduce(key, values, output, reporter){
  oKey.set(key); 
  while(values.hasNext()){
    output.collect(oKey, values.next()); 
  } 
}

I realize that these JavaScript examples are trivial. In another post, I will get into more complex examples. My goal is simply to emphasize that there are easy ways to get greater extensibility with Hadoop without having to use the Streaming API or Hive and Pig. In my next post, I'm going to take this example one step further and create a web-based interface for writing Map-Reduce jobs on Hadoop, much like many of us have enjoyed with CouchDB.

In the meantime, good luck, and have a great time coding.

Richard

1 comment:

  1. Hi,
    Thanks for providing information hadoop training provides by the online with ral time experts on
    hadoop online training

    ReplyDelete

Note: Only a member of this blog may post a comment.