Saturday, June 25, 2011

Calculating Similarity (Part 3): Damerau-Levenshtein Distance

I promised this post a while ago and have unfortunately too busy to complete it.  I noticed a couple of people had searched Google explicitly for this post, so that encouraged me to complete it!

According to Wikipedia:
"Damerau–Levenshtein distance (named after Frederick J. Damerau and Vladimir I. Levenshtein) is a "distance" (string metric) between two strings, i.e., finite sequence of symbols, given by counting the minimum number of operations needed to transform one string into the other, where an operation is defined as an insertion, deletion, or substitution of a single character, or a transposition of two adjacent characters." [1]
In Computer Science, we commonly call algorithms like Damerau-Levenshtein Distance "Edit Distance", since the distance and resultant matrix tell us the number of transpositions, insertions, deletions, etc. necessary to make two strings identical.

The algorithm to calculate Damerau-Levenshtein is remarkably simple. Please follow the inline comments for a better understanding of how the algorithm works.

DamerauLevenshteinDistance.java

package com.berico.similarity;

/**
 * Damerau-Levenshtein Distance
 * Based on the algorithms provided at the following websites:
 * 
 * http://snippets.dzone.com/posts/show/6942
 * http://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
 * 
 * @author Richard Clayton (Berico Technologies)
 * @date June 25, 2011
 */
public class DamerauLevenshteinDistance 
                   implements IDistanceCalculator {

  /**
   * Calculate the Damerau-Levenshtein Distance (edit distance)
   * between two strings.
   * @param source Source input string
   * @param target Target input string
   * @return The number of substitutions it would take
   *          to make the source string identical to the target
   *         string
   */
  public int calculate(String source, String target){
    //If both strings are empty, I'm of the opinion that
    //this is an error (technically the distance is zero).
    assert( !(source.isEmpty() && target.isEmpty()));
    
    //If the source string is empty, the distance is the
    //length of the target string.
    if(source.isEmpty()){
      return target.length();
    }
    
    //If the target string is empty, the distance is the
    //length of the source string.
    if(target.isEmpty()){
      return source.length();
    }
    
    //Delegate the calculation to the method that produces the matrix
    //and distance, but then only return the distance
    return calculateAndReturnFullResult(source, target).getDistance();
  }
  
  /**
   * Perform the distance calculation, but also return the
   * resulting matrix and distance.
   * @param source Source input string
   * @param target Target input string
   * @return A simple object with the matrix and distance
   */
  public DameauLevenshteinDistanceResult 
           calculateAndReturnFullResult(String source, String target){

    //If both strings are empty, I'm of the opinion that
    //this is an error (technically the distance is zero).
    assert( !(source.isEmpty() && target.isEmpty()));
    
    //We are going to construct a matrix of distances
    int[][] distanceMatrix 
      = new int[source.length() + 1][target.length() + 1];
    
    //We need indexers from 0 to the length of the source string.
    //This sequential set of numbers will be the row "headers"
    //in the matrix.
    for(int sourceIndex = 0; 
      sourceIndex <= source.length(); 
      sourceIndex++){
      
      //Set the value of the first cell in the row
      //equivalent to the current value of the iterator
      distanceMatrix[sourceIndex][0] = sourceIndex;  
    }
    
    //We need indexers from 0 to the length of the target string.
    //This sequential set of numbers will be the 
    //column "headers" in the matrix.
    for(int targetIndex = 0;
      targetIndex <= target.length();
      targetIndex++){
      
      //Set the value of the first cell in the column
      //equivalent to the current value of the iterator
      distanceMatrix[0][targetIndex] = targetIndex;
    }
    
    //We'll use this to add a penalty
    //to some operations.
    int cost = 0;
    
    //Iterate over all characters in the source
    //string.
    for(int sourceIndex = 1; 
    sourceIndex <= source.length(); 
    sourceIndex++){
      
      //Iterate over all characters in the target
      //string.
      for(int targetIndex = 1;
      targetIndex <= target.length();
      targetIndex++){
        
        //If the current characters in both strings are equal
        if(source.charAt(sourceIndex - 1)
               == target.charAt(targetIndex - 1))
        {
          //There is no penalty.
          cost = 0;
        }
        else 
        {
          //Not equal, there is a penalty.
          cost = 1;
        }
        
        //We want to find the current distance by determining
        //the shortest path to a match (hence the 'minimum'
        //calculation on distances).
        distanceMatrix[sourceIndex][targetIndex] 
          = minimum(
           //Character match between current character in 
           //source string and next character in target
           distanceMatrix[sourceIndex - 1][targetIndex] + 1, 
           //Character match between next character in
           //source string and current character in target
           distanceMatrix[sourceIndex][targetIndex - 1] + 1,
           //No match, at current, add cumulative penalty
           distanceMatrix[sourceIndex - 1][targetIndex - 1] + cost);
        
        //We don't want to do the next series of calculations on
        //the first pass because we would get an index out of bounds
        //exception.
        if(sourceIndex == 1 || targetIndex == 1){
          continue;
        }
        
        //transposition check (if the current and previous 
        //character are switched around (e.g.: t[se]t and t[es]t)...
        if(source.charAt(sourceIndex - 1) 
              == target.charAt(targetIndex - 2)
          && source.charAt(sourceIndex - 2) 
              == target.charAt(targetIndex - 1)){
          
          //What's the minimum cost between the current distance
          //and a transposition.
          distanceMatrix[sourceIndex][targetIndex] 
            = minimum(
               //Current cost
             distanceMatrix[sourceIndex][targetIndex],
             //Transposition
             distanceMatrix[sourceIndex - 2][targetIndex - 2] + cost);
        }
      }
    }
    
    //Return the matrix and distance as the result
    return new DameauLevenshteinDistanceResult(distanceMatrix);
  }
  
  /**
   * Calculate the minimum value from an array of values.
   * @param values Array of values.
   * @return minimum value of the provided set.
   */
  private static int minimum(int... values){
    
    //Hopefully, everything should be smaller
    //than the max int value!
    int currentMinimum = Integer.MAX_VALUE;
    
    //Iterate over all provided values
    for(int value : values){
      
      //Take the minimum value between the current
      //minimum and the current value of the
      //iteration
      currentMinimum = Math.min(value, currentMinimum);
    }
    
    //return the minimum value.
    return currentMinimum;
  }
  
  /**
   * Simple container for the result of the Dameau-Levenshtein
   * Distance calculation
   * @author Richard Clayton (Berico Technologies)
     * @date June 25, 2011
   */
  public class DameauLevenshteinDistanceResult {
    
    //Distance matrix
    private int[][] distanceMatrix;
    
    /**
     * Instantiate the object with the resulting distance matrix
     * @param distanceMatrix Matrix of distances between edits
     */
    public DameauLevenshteinDistanceResult(int[][] distanceMatrix){
      this.distanceMatrix = distanceMatrix;
    }

    /**
     * Get the Distance Matrix
     * @return Matrix of edit distances
     */
    public int[][] getDistanceMatrix() {
      return distanceMatrix;
    }
    
    /**
     * Get the Edit Distance
     * @return number of changes to make before
     *         both strings are identical
     */
    public int getDistance(){
      return 
        distanceMatrix
          [distanceMatrix.length - 1][distanceMatrix[0].length - 1];
    }

    /**
     * Get a string representation of this class
     * @return A friendly display of the distance and matrix
     */
    @Override
    public String toString() {
      
      StringBuilder sb = new StringBuilder();
      
      sb.append(
         String.format(
           "Distance: %s \n", this.getDistance()));
      sb.append("Matrix: \n\n");
      
      for(int i = 0; i < this.distanceMatrix.length; i++){
        
        sb.append("| ");
        
        for(int j = 0; j < this.distanceMatrix[0].length; j++){
        
          sb.append(String.format("\t%s", this.distanceMatrix[i][j]));
        }
        
        sb.append(" |\n");
      }
      
      return sb.toString();
    }
  }
}

Here are some examples of using the distance calculator.

IDistanceCalculator distanceCalc = new DamerauLevenshteinDistance();
      
String distOne = "snapple";
String distTwo = "apple";
      
int editDistance = distanceCalc.calculate(distOne, distTwo);
      
System.out.println(
  String.format("The distance between %s and %s is %s",
    distOne, distTwo, editDistance));

And the output from the console:

The distance between snapple and apple is 2

I've also added a method for getting the full result back from the calculator (matrix and distance). There is an example below, but remember, you will not be able to access the method if your are using the interface's type to reference the calculator.

String distOne = "snapple";
String distTwo = "apple";

DamerauLevenshteinDistance distanceCalc2 
     = new DamerauLevenshteinDistance();

DameauLevenshteinDistanceResult result 
     = distanceCalc2.calculateAndReturnFullResult(distOne, distTwo);

System.out.println(result);

And the output from the console:

Distance: 2 
Matrix: 

|  0 1 2 3 4 5 |
|  1 1 2 3 4 5 |
|  2 2 2 3 4 5 |
|  3 2 3 3 4 5 |
|  4 3 2 3 4 5 |
|  5 4 3 2 3 4 |
|  6 5 4 3 2 3 |
|  7 6 5 4 3 2 |

Once again, you can access the Eclipse project at the following link: http://dl.dropbox.com/u/12311372/StringSimilarity.zip.

If you have any questions or comments, I would love to hear them.

Richard

[1]. Wikipedia contributors. "Damerau–Levenshtein distance." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 20 Jun. 2011. Web. 25 Jun. 2011.

Friday, June 24, 2011

JavaScript Map-Reduce Jobs in Hadoop using Mozilla Rhino

If you read my blog, you know that I am a huge fan of scripting, especially in areas of enterprise architecture that don’t classically see that kind of dynamism in their component logic.  One application of a script engine that recently piqued my interest was writing Map-Reduce jobs in JavaScript within Hadoop.
After playing with technologies like Pig and Hive, I realized that these frameworks don’t exist so much to provide a SQL-like interface to Hadoop, but rather to allow analysts and developers to quickly explore their datasets without needing to compile an implementation in Java.  During my Cloudera Developer training, I noticed that the biggest barrier of entry into the Hadoop ecosystem was a solid Java background.  Many of the students had C++ or .NET backgrounds.  More importantly, I personally found it tedious to write and compile a Hadoop job every time I wanted to create some variation of an existing Map-Reduce capability. 
But what about Hadoop Streaming?  Sure, this is certainly a viable answer, and you could even write your application in Python or Javascript (using Node.js).  I think my biggest problem with Hadoop Streaming is that it is very brittle.  The separation between the key and value is a tab character!  If you are using structured data formats native to Hadoop, you lose the ability to automatically marshal them back into a rich format (think Avro and getting real objects as values!).  If you are writing more complex applications using the Streaming API and need to reference external libraries, you may find yourself in a sticky situation.  Every “Task Node” (DN with TT) may need to have its environment preconfigured manually, vice transferring scripts to the distributed cache (for instance, what if you wanted to use a Python library like NLTK?).
What if instead of using a SQL-like language or an imperative language that has to be compiled, we use a scripting language?  Believe it or not, this is extremely simple to set up since Java already has a specification for hosting scripting languages (JSR 223).  In this post, I’m going to demonstrate how you can use JavaScript to write MapReduce jobs in Hadoop using the Mozilla Rhino script engine.
Before we can start crunching data in JavaScript, however, we need to set up a “hosting environment” that will allow us to use Mozilla Rhino within the context of Hadoop.  This will simply be a generic Map-Reduce job in which we delegate both the map and reduce functions to a JavaScript function in Rhino.
JsMapReduceBase.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;

/**
 * Provides the scaffolding necessary for hosting a 
 * a script engine in Hadoop.
 */
public abstract class JsMapReduceBase extends MapReduceBase {

  protected static final ScriptEngineManager mgr 
                            = new ScriptEngineManager();

  protected ScriptEngine jsEngine;
  protected static final Text outputKey = new Text();
  protected static final Text outputValue = new Text();
  
 /**
  * Configure the Mapper/Reducer using the Job Context.
  * In our case, we want to pull the Mapper and Reducer
  * JavaScript functions from the Distributed Cache.
  * @param job Map-Reduce Job Context
  */
  @Override
  public void configure(JobConf job) {
    super.configure(job);
    
    //Create a new Script Engine using the JSR 223 API.
    //Under the hood, Java will locate any registered 
    //implementation of the Script Language "JavaScript", 
    //which will be the Rhino engine.
    jsEngine = mgr.getEngineByName("JavaScript");
      
      try {
      //Using the Script out of the Cache, load the script
      //body into the engine (parsing and evaluating).
      jsEngine.eval(getScript(job));
      //Key are going to register an instance of a Text key
      //which we will reuse in the Hadoop job
      jsEngine.put("oKey", outputKey);
      //As well as a Text value.
      jsEngine.put("oValue", outputValue);
      
    } catch (Exception e) {
      
      System.err.print(e.getMessage());
    }
  }
  
 /**
  * Clean up any resources used during the Map-Reduce task
  */
  @Override
  public void close() throws IOException {
    super.close();
  }
  
 /**
  * Pull the JavaScript script from the distributed
  * cache.
  * @param job Map-Reduce Job Context
  */
  private String getScript(JobConf job) throws Exception {
    
    StringBuilder sb = new StringBuilder();
    
    Path[] cacheFiles;
    
    cacheFiles = DistributedCache.getLocalCacheFiles(job);
    
    for(Path p : cacheFiles){
    
      String file = readFile(p, job);
      
      System.err.println(file);
      
      sb.append(file).append(" \n");
    }
    
    return sb.toString();
  }
  
 /**
  * Read the contents of a file to a string.
  * @param path Path to the file
  * @param job Map-Reduce Job Context
  * @returns Body of the file as a string
  */
  private String readFile(Path path, JobConf job) 
     throws Exception {

    FSDataInputStream in = null;
    BufferedReader br = null;
    FileSystem fs = FileSystem.get(job);
    in = fs.open(path);
    br  = new BufferedReader(new InputStreamReader(in));
    
    StringBuilder sb = new StringBuilder();
    
    String line = "";
    while ( (line = br.readLine() )!= null) {
      sb.append(line);
    }
    in.close();
    return sb.toString();
  }
  
 /**
  * Call a function on the Script Engine.
  * @param functionName Name of the function to call
  * @param args An array of arguments to pass to the function
  *        representing the function argument signature.
  * @returns The result (if any) from the script function
  */ 
  protected Object callFunction(
    String functionName, Object... args)
    throws Exception {

    return ((Invocable)jsEngine)
               .invokeFunction(functionName, args);
  }
  
 /**
  * Call a method on a Script object within the Script Engine.
  * @param objectName The reference name of the object with
  *        the method that will be called (e.g.: in foo->bar(),
  *        we want 'foo').
  * @param methodName Name of the method to call (bar())
  * @param args An array of arguments to pass to the function
  *        representing the function argument signature.
  * @returns The result (if any) from the script method
  */ 
  protected Object callMethod(
    String objectName, String methodName, Object... args) 
    throws Exception {

    return ((Invocable)jsEngine)
               .invokeMethod(objectName, methodName, args);
  }
}

We will also need to implement the Mapper and Reducer interfaces:

JsMapper.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

/**
 * A very simple implementation of the Mapper interface
 * that delegates the 'map' task to a Script Engine.
 * Keep in mind that this implementation currently uses
 * the Text class for input and output Key-Value pairs,
 * but could easily be changed to use anything.
 */
public class JsMapper extends JsMapReduceBase 
   implements Mapper<Object, Text, Text, Text> {
  
/**
 * Perform a map on the given key-value pair, delegating the
 * call to JavaScript.
 * @param key The Key
 * @param value The Value
 * @param output The output collector we use to send key-value
 *        pairs to the reducer.
 * @param reporter A mechanism we can use for 
 *        'reporting' our progress.
 */
  @Override
  public void map(
      Object key, 
      Text value, 
      OutputCollector<Text, Text> output,
      Reporter reporter) throws IOException {
    
    try {
      //Delegate the call to the "map()" function in JavaScript
      //Note: this was hard-coded to keep the demo simple.
      callFunction("map", key, value, output, reporter);
    } catch (Exception e) {
      //Handle Error
    }
  }
  
}

JsReducer.java

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * A very simple implementation of the Reducer interface
 * that delegates the 'reduce' task to a Script Engine.
 * Keep in mind that this implementation currently uses
 * the Text class for input and output Key-Value pairs,
 * but could easily be changed to use anything.
 */
public class JsReducer extends JsMapReduceBase 
    implements Reducer<Text, Text, Text, Text> {

/**
 * Perform a map on the given key-value pair, delegating the
 * call to JavaScript.
 * @param key The Key
 * @param values A Value Collection for the corresponding key
 * @param output The output collector we use to save key-value
 *        pairs to HDFS.
 * @param reporter A mechanism we can use for 
 *        'reporting' our progress.
 */
  @Override
  public void reduce(
           Text key, 
           Iterator<Text> values,
           OutputCollector<Text, Text> output, 
           Reporter reporter) throws IOException {
    
    try {
      //Delegate the call to the "reduce()" function in JavaScript
      //Note: this was hard-coded to keep the demo simple.
      callFunction("reduce", key, values, output, reporter);
    } catch (Exception e){
      //Handle Error
    }
  }
}

Finally, we need to create a Generic Job Runner for executing the Map-Reduce job.

JsJobDriver.java

import java.net.URI;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

/**
 * Here is a pretty simple Job Driver for loading
 * the correct scripts and input directories for the
 * Map-Reduce job.  You will probably notice
 * that I was running this on the Cloudera VM
 * (the user directory is 'training'.
 * The dataset I was processing is a list of METAR
 * and TAF reporting weather stations that you can
 * find at this url: 
 * http://aviationweather.gov/adds/metars/stations.txt
 *
 * Ideally, you will want to pass in the location of 
 * the input directory and map/reduce JavaScript files.
 */
public class JsJobDriver {
  
  //Default paths for my application
  public static String inputPaths =
    "hdfs://localhost/stationsInput.txt";
  public static String outputPath = 
    "hdfs://localhost/stationsOutput";
  public static String mapJsFile = 
    "hdfs://localhost/user/training/map.js";
  public static String reduceJsFile = 
    "hdfs://localhost/user/training/reduce.js";

 /**
  * Start the Job
  * @param args Console input arguments
  */
  public static void main(String... args) 
     throws Exception {
    
    //If we have two inputs, they are the map-reduce
    //scripts.
    if(args.length == 2){
      mapJsFile = args[0];
      reduceJsFile = args[1];
    }
    
    //If we have four inputs, we are getting the
    //input and output paths, and javascript map and reduce
    //scripts.
    if(args.length == 4){
      inputPaths = args[0];
      outputPath = args[1];
      mapJsFile = args[2];
      reduceJsFile = args[3];
    }
    
    JobConf conf = new JobConf(JsMapper.class);
    conf.setJobName("Js Test.");
    
    FileInputFormat.setInputPaths(conf, new Path(inputPaths));
    FileOutputFormat.setOutputPath(conf, new Path(outputPath));
    
    //Associate the correct Mappers and Reducers
    conf.setMapperClass(JsMapper.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);
    
    conf.setReducerClass(JsReducer.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    
    //Store our scripts in the cache.
    DistributedCache.addCacheFile(new URI(mapJsFile), conf);
    DistributedCache.addCacheFile(new URI(reduceJsFile), conf);
    
    //Run the job
    JobClient.runJob(conf);
  }
}

That's all it takes to get Scripting support in Hadoop! In my opinion is actually pretty trivial, and a lot cleaner than using some of the other frameworks.

Let me show you some JavaScript MapReduce now:

Identity Mapper
map.js

function map(key, value, output, reporter){
  oKey.set(key);
  oValue.set(value);
  output.collect(oKey, oValue);
}

Identity Reducer
reduce.js

function reduce(key, values, output, reporter){
  oKey.set(key); 
  while(values.hasNext()){
    output.collect(oKey, values.next()); 
  } 
}

I realize that these JavaScript examples are trivial. In another post, I will get into more complex examples. My goal is simply to emphasize that there are easy ways to get greater extensibility with Hadoop without having to use the Streaming API or Hive and Pig. In my next post, I'm going to take this example one step further and create a web-based interface for writing Map-Reduce jobs on Hadoop, much like many of us have enjoyed with CouchDB.

In the meantime, good luck, and have a great time coding.

Richard