Saturday, January 22, 2011

Calculating Similarity (Part 2): Jaccard, Sørensen and Jaro-Winkler Similarity

In my last post on calculating string similarity, I focused on one algorithm (Cosine Similarity) and some of the concepts involved in making its calculation.  In this post, I want to introduce three more ways of calculating similarity.  The three methods I present today are all a little less intensive to calculate than cosine similarity, and one in particular is probably more accurate.

The three algorithms I will be presenting today are:
  • Jaccard Similarity
  • Sørensen Similarity
  • Jaro-Winkler Similarity

I'm going to present the implementation of each of these algorithms, and then at the end of the post, the results they return for a couple example sets of strings. Let's begin in order, starting with Jaccard Similarity.

Jaccard Similarity is very easy to calculate and actually required no extra programming on my part to implements (all the utility functions required were created for the Cosine Similarity implementation).  The equation is sometimes called the Jaccard Distance, or Jaccard Similarity Coefficient, originating from the late 19th, early 20th century botantist Paul Jaccard (Paul Jaccard, 2011).

The equation is remarkably simple (length of the intersect divided by the length of the union):

(Jaccard index, 2011).

Here is my implementation of the Jaccard Similarity Coefficient:

/**
  * Find the Jaccard Similarity of two strings
  * @param stringOne the first string to compare
  * @param stringTwo the second string to compare
  * @return the Jaccard Similarity (intersect(A,B) / union(A, B))
  */
 @Override
 public double calculate(String stringOne, String stringTwo) {
   return (double) intersect(stringOne, stringTwo).size() /
          (double) union(stringOne, stringTwo).size();
 }

The intersect and union functions are both defined in the com.berico.similarity.CharacterVectorUtils class, which you can, along with the rest of the Eclipse project, at the bottom of this post.

The next similarity algorithm is Sørensen Similarity (or Sørensen index, or Sørensen Similarity Coefficient). The equation comes from Thorvald Sørensen, a turn-of-the-century Danish botanist (Thorvald Sørensen 2011). Like the Jaccard index, Sørensen Similarity is very easy to calculate.

The equation for Sørensen Similarity:
(Sørensen similarity index, 2011)
In the equation, 'A' represents the size of the first sample, 'B' the size of a second, and 'C' the size of the intersect between sample one and two (I don't know why someone posted it in Wikipedia that way, as opposed to [ 2 * |A ∩ B| / |A| + |B| ]).  Here is my implementation of the Sørensen similarity index:

/**
 * Calculate the Sorensen Similarity of two strings.
 * Equation: (2 * intersect(A, B)) / (|A| + |B|)
 * @param stringOne First String
 * @param stringTwo Second String
 * @return The Sorensen similarity of two strings.
 */
 @Override
 public double calculate(String stringOne, String stringTwo) {
   return  (double) (2 * intersect(stringOne, stringTwo).size()) /
           (double) (stringOne.length() + stringTwo.length());
 }

Once again, a very simple algorithm to implement. The next algorithm is not so simple. The Jaro-Winkler Similarity index is the first algorithm I will demonstrate in which the order of occurrence is an essential determination of similarity. In a way, Jaro-Winkler similarity is very similar to the ubiquitous Levenshtein Distance, which is used in many Natural Language Processing applications like Lucene and PostgresSQL (Levenshtein distance, 2011). In many domains, the order in which properties occur are just as important as their existence; this is certainly true for strings. Because of this, Jaro-Winkler is a lot more accurate than Cosine Similarity.

The Jaro-Winkler Similarity Coefficient is an addition to the Jaro Distance; I don't know if Winkler worked in tandem with Jaro, but Winkler added an important component to the Jaro distance algorithm that weighted or penalized strings based on their similarity at the beginning of the string (the prefix). The algorithm for the Jaro-Winkler distance is a little complicated, in part because it requires iteration over the smallest string to determine if a non-matching character fits within a predetermined "window" of the other string. For instance, the strings "martha" and "marhta" are considered a complete match because the transposed "th" and "ht" are within 2 characters of each other.

Finding matches is probably the most complicated part of this algorithm, but fortunately for you, I have done all of the hard work:

/**
  * Instead of making two seperate functions for matching
  * and transposes (which would be terrible since you
  * find the transpose while doing matching), I created this
  * little class to hold the results of both.
  */
 public static class MatchResults {
   public int numberOfMatches = 0;
   public int numberOfTransposes = 0;
 }

/**
  * Find the all of the matching and transposed characters in 
  * two strings
  * @param stringOne First String
  * @param stringTwo Second String
  * @return A Match Result with both the number of matches and
  * number of transposed characters
  */
 public static MatchResults determineMatchesAndTransposes(
                  String stringOne, String stringTwo){

   //Create the match result instance
   MatchResults matchResults = new MatchResults();
   //Find the matching window (how far left and right to
   //look for matches)
   int window = matchingWindow(stringOne, stringTwo);
   //We need to find the shortest and longest character string
   //because we iterate over the shortest
   char[] shortest, longest;
   //If string one is less than or equal to string two
   if(stringOne.length() <= stringTwo.length()){
     //use string one as the shortest
     shortest = stringOne.toCharArray();
     longest = stringTwo.toCharArray();
   } else {
     //otherwise use string two as the shortest
     shortest = stringTwo.toCharArray();
     longest = stringOne.toCharArray();
   }
   //we need to find the number of times we find a match
   //out of position (ex: the 4th character of string one
   //matches the 5th character of string two
   int matchedOutOfPosition = 0; 
   //Iterate over the shortest string
   for(int i = 0; i < shortest.length; i++){
     //If we have an in-position match
     if(shortest[i] == longest[i]){
       //increment the number of matches
       matchResults.numberOfMatches++;
       //go to the next iteration
       continue;
     }
     //Set the boundary of how far back we
     //can look at the longest string
     //given the string's size and the
     //window size
     int backwardBoundary = 
       ((i - window) < 0)? 0 : i - window;
     //Set the boundary of how far we
     //can look ahead at the longest string
     //given the string's size and the
     //window size
     int forwardBoundary = 
       ((i + window) > (longest.length - 1))? 
          longest.length - 1 : i + window;
     //Starting at the back-most point, and
     //moving to the forward-most point of the
     //longest string, iterate looking for matches
     //of our current character on the shortest 
     //string
     for(int b = backwardBoundary; 
             b <= forwardBoundary; 
             b++){
       //If theres a match within our window
       if(longest[b] == shortest[i]){
         //increment the number of matches
         matchResults.numberOfMatches++;
         //but note that this is out of
         //position
         matchedOutOfPosition++;
         //break out of this inner loop if we
         //aren't on the last element
         break;
       }
     }
   }
   //Determine the number of transposes by halving
   //the number of out-of-position matches.
   //This works because if we had two strings:
   //"martha" and "marhta", the 'th' and 'ht' are
   //transposed, but would be counted twice in the
   //process above.
   matchResults.numberOfTransposes = matchedOutOfPosition / 2;
   //return the match result
   return matchResults;
 }

Transposed characters in the Jaro-Winkler algorithm, however, do not get the full weight that a 1-for-1 match would. The penalty for transposed characters is a part of the Jaro Distance algorithm provided below:
(Jaro-Winkler distance, 2011)
dj = Jaro Distance
m = number of matching characters
t = number of transposed characters
|s1| = length of the first string
|s2| = length of the second string

The penalty for transposed characters can be seen by the (m - t) / m factor.  The more transposes found between the two strings, the smaller the overall matching weight.

/**
 * Determine the Jaro Distance.  Winkler stole some of Jaro's
 * thunder by adding some more math onto his algorithm and getting
 * equal parts credit!  However, we still need Jaro's Distance
 * to get the Jaro-Winkler Distance.
 * Equation: 1/3 * ((|A| / m) + (|B| / m) + ((m - t) / m))
 * Where: |A| = length of first string
 *        |B| = length of second string
 *         m  = number of matches
 *         t  = number of transposes
 * @param numMatches Number of matches
 * @param numTransposes Number of transposes
 * @param stringOneLength Length of String one
 * @param stringTwoLength Length of String two
 * @return Jaro Distance
 */
 public static double jaroDistance(
       int numMatches, int numTransposes, 
       int stringOneLength, int stringTwoLength){
   
   //I hate Java's facility for math.  
   //We have to cast these int's as doubles to
   //be able to properly retrieve the decimal result
   double third = 1d / 3d;
   // (|A| / m)
   double stringOneNorm = 
      (double)numMatches / (double)stringOneLength;
   // (|B| / m)
   double stringTwoNorm = 
      (double)numMatches / (double)stringTwoLength;
   // ((m - t) / m)
   double matchTransNorm = 
      ((double)numMatches - (double)numTransposes) / (double)numMatches;
   // 1/3 * ((|A| / m) + (|B| / m) + ((m - t) / m))
   return third * (stringOneNorm + stringTwoNorm + matchTransNorm);
 }

Another important equation to show is how we determine the matching window.  The matching window is how far to the left and right of our current position on the longest string we will look for matching characters.  The window can be calculated with the following equation:
(Jaro-Winkler distance, 2011)

As you can see, the window is generally a little less than half the size of the longest string.

/**
 * Determine the maximum window size to use when looking for matches.
 * The window is basically a little less than the half the longest
 * string's length.
 * Equation: [ Max(A, B) / 2 ] -1
 * @param stringOne First String
 * @param stringTwo Second String
 * @return Max window size
 */
 public static int matchingWindow(String stringOne, String stringTwo){
   return 
    (Math.max(stringOne.length(), stringTwo.length()) / 2) - 1;
 }

Winkler's contribution to the algorithm was the addition of the weighted prefix.  Winkler introduced the notion that strings starting with the same characters (exact, not out of order matching) should be more heavily weighted.  The caveat is that Winkler considers the maximum prefix size to be 4.  This means that all matching characters past the first four are weighted equally.  The number of matching characters in the prefix are multiplied by a constant, the standard being 0.1 (this is what Winkler used).

/**
  * Find the Winkler Common Prefix of two strings.  In Layman's terms,
  * find the number of character that match at the beginning of the
  * two strings, with a maximum of 4.
  * @param stringOne First string
  * @param stringTwo Second string
  * @return Integer between 0 and 4 representing the number of
  * matching characters at the beginning of both strings.
  */
 public static int winklerCommonPrefix(
                    String stringOne, String stringTwo){
  
   int commonPrefix = 0;
   //Find the shortest string (we don't want an index out of bounds
   //exception).
   int boundary = (stringOne.length() <= stringTwo.length())? 
                    stringOne.length() : stringTwo.length();
   //iterate until the boundary is hit (shortest string length)
   for(int i = 0; 
           i < boundary;
           i++){
      //If the character at the current position matches
      if(stringOne.charAt(i) == stringTwo.charAt(i)){
         //increment the common prefix
         commonPrefix++;
      } else {
         //otherwise, continue no further, we are done.
 break;
      }
      //If we hit our max number of matches, bust out
      //of the loop.
      if(commonPrefix == 4){ break; }
   }
   //return the number of matches at the beginning of 
   //both strings
   return commonPrefix;
 }

Now that we understand Winkler's contribution, I can show you the full Jaro-Winkler equation:
(Jaro-Winkler distance, 2011)
dw = Jaro-Winkler Distance
dj = Jaro Distance
l = the prefix length (number of starting characters in both strings that matched, max of 4)
p = the prefix weight (default = 0.1)

The best way to understand how this algorithm works is to look at its implementation in code:

//Bonus weighting for string starting with the same characters
//(e.g.: prefix scaling factor)
public static double PREFIX_SCALING_FACTOR = 0.1;
  
/**
 * Calculate the Jaro-Winkler Similarity of two strings
 * @param stringOne First String
 * @param stringTwo Second String
 * @return Jaro-Winkler similarity value
 */
 @Override
 public double calculate(String stringOne, String stringTwo) {
   //Get Matches and Transposes
   MatchResults matchResults = 
      determineMatchesAndTransposes(stringOne, stringTwo);
   //Get the Jaro Distance
   double jaroDistance = 
      jaroDistance(
         matchResults.numberOfMatches, 
         matchResults.numberOfTransposes, 
         stringOne.length(), 
         stringTwo.length());
   //Find the Winkler common prefix length (maxes at 4 characters)
   int winklerCommonPrefix = 
      winklerCommonPrefix(stringOne, stringTwo);
   //Find the Jaro-Winkler Distance
   // = Jd + (l * p * ( 1 - Jd));
   double jaroWinklerDistance = 
      jaroDistance + (winklerCommonPrefix * PREFIX_SCALING_FACTOR)
         * (1 - jaroDistance);
   //Return the distance
   return jaroWinklerDistance;
 }

And that's it. Jaro-Winkler in its entirety. Now let's see the differences between these similarity implementations. The following is the "runner" I'm using to execute these similarity functions (I refactored the old implementation of this class, which was displayed in the first post):

public class SimilarityRunner {

/**
 * @param args
 */
 public static void main(String[] args) {  
  
   String one = "apple";
   String two = "applet";
  
   printSimilarity(new CosineSimilarity(), one, two);
   printSimilarity(new JaccardSimilarity(), one, two);
   printSimilarity(new SorensenSimilarity(), one, two);
   printSimilarity(new JaroWinklerSimilarity(), one, two);
 }

 private static void printSimilarity(
       ISimilarityCalculator simCalculator, 
                    String one, String two){
  
   double percentSimilar = 
     simCalculator.calculate(one, two) * 100;

   System.out.println(
     String.format("[%s] %s and %s are %s%% similar",    
       simCalculator.getClass().getSimpleName(), 
       one, two, percentSimilar));
 }
 
}

I will be changing the values of strings 'one' and 'two' to demonstrate the differences between their calculations. Let's begin by using the strings currently set in the code: "apple" and "applet".

[CosineSimilarity] apple and applet are 93.54143466934852% similar
[JaccardSimilarity] apple and applet are 80.0% similar
[SorensenSimilarity] apple and applet are 72.72727272727273% similar
[JaroWinklerSimilarity] apple and applet are 96.66666666666667% similar

The strings "apple" and "applet" are obviously very similar. Both the Cosine and Jaro-Winkler similarity algorithms, in my opinion, do a good job of noting the similarity. Now let's look at the string "string" compared to it's reverse "gnirts":

[CosineSimilarity] string and gnirts are 100.00000000000003% similar
[JaccardSimilarity] string and gnirts are 100.0% similar
[SorensenSimilarity] string and gnirts are 100.0% similar
[JaroWinklerSimilarity] string and gnirts are 38.888888888888886% similar

Isn't that interesting...

As you can see, Jaro-Winkler does a much better job at determining the similarity of strings because it takes order into account. In my next installment of this series, I will tackle the Levenshtein distance algorithm, which not only tells you how similar two strings are, but also what needs to change in order for both strings to be exact (can anyone say DIFF?).

I hope you found this post useful and/or enjoyable. If you want the source code from this post, please look at the resources section below.

Resources:

I'm publishing all of my stuff on GitHub now. The following is the repository for the StringSimilarity stuff I have been presenting: https://github.com/rclayton/StringSimilarity

References:

Wikipedia contributors. "Jaccard index." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 13 Dec. 2010. Web. 23 Jan. 2011.

Wikipedia contributors. "Jaro–Winkler distance." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 9 Jan. 2011. Web. 23 Jan. 2011. 

Wikipedia contributors. "Levenshtein distance." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 27 Dec. 2010. Web. 23 Jan. 2011.

Wikipedia contributors. "Paul Jaccard." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 17 Nov. 2010. Web. 23 Jan. 2011. 

Wikipedia contributors. "Sørensen similarity index." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 8 Nov. 2010. Web. 23 Jan. 2011.

Wikipedia contributors. "Thorvald Sørensen." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 1 Jan. 2011. Web. 23 Jan. 2011.

Drools Syntax Highlighter Brush now hosted on GitHub

I've decided to move my Syntax Highlighter brush for Drools out of my Dropbox (gross, I know) and into a more permanent place.  If you are interested in downloading the file, or looking at the source, please visit: https://github.com/rclayton/shBrushDrools.  The file shBrushDrools.js is CC Attribution licensed.  You may do whatever you want with it, including bundle it with a commercial product (at no cost to you).

Also, to add some context, the Drools Syntax Highlighter brush was featured on a previous post from December:  http://www.gettingcirrius.com/2010/12/jboss-rules-drools-brush-for-syntax.html.

Tuesday, January 11, 2011

Distributed Processing Made Easy with GridGain 3.0

Parallel computing is a popular topic, seen by the growing number of cores in CPU's, as well as, software frameworks that take advantage of them. While some types of computer hardware have managed to keep up with rapid advancements in chip design (memory and graphics), some devices have failed to match the pace. We are still largely constrained by I/O limitations (both in terms of storage and network throughput). While it is possible to "scale up" the hardware of a single machine to meet the increasing demands of an application, the more practical solution is to distribute tasks amongst a cluster of machines.

The Java Community is blessed with a plethora of distributed processing frameworks. Perhaps the most ubiquitous in this category are Apache Hadoop and Terracotta. If you are not familiar with the two, Terracotta allows engineers to cluster core components of a JVM across multiple machines. Hadoop, on the other hand, is an ecosystem of tools centered around the distributed Map-Reduce algorithm; this is an oversimplification of both frameworks, but it should work for the context of this discussion.

The strategies employed by both Terracotta and Hadoop represent the two schools of thought in distributed frameworks. Terracotta's opinion is that the effort of distributing an application should be the burden of the framework, and the concern of parallelism separated from the application. In this case, turning a standard Java application into a grid application is transparent to the developer. Distributed processing in Apache Hadoop is a lot more deliberate. Hadoop is not designed to be deployed in some web container, coexisting with CRUD applications. Hadoop also is not designed to scale a POJO application. Hadoop is designed specifically to accomplish one goal: storing and processing massive amounts of data.

Unfortunately, there are many cases where we have a need existing somewhere in between the intended goals of both frameworks. In many cases, we may have some CRUD application that needs to distribute processing or offload computationally expensive tasks, but does not need a distributed JVM or cluster of specially purposed machines. This is not to say that Terracotta or Hadoop could not be made to serve this role with some effort.

An excellent framework that (sort of) exists between the goals of Terracotta and Hadoop is GridGain (http://www.gridgain.com).  GridGain allows engineers to easily adapt existing applications into ones capable of distributing their loads across a cluster of machines.  GridGain's strategy for achieving this functionality is a beautiful blend of annotations and advice (see Aspected Oriented Programming) applied to your existing application code.  More importantly, GridGain is easy and enjoyable.  Using the framework feels very natural because the gentlemen at GridGain have gone to painstaking efforts to include the core features most of us use in standard Java development: Spring, JUnit, application container accessible, etc.  I should also mention that the API is also well designed; my litmus test for design is stability (just works) and intuitiveness (understand the abstraction without needing to consult documentation).  All of the example code in this post was written in only a couple of hours!

Here are some of the other features of GridGain:
  • Ad Hoc, Zero-Config Grids!  Grid Nodes broadcast their existence on the network and auto discover other nodes.  This allows you to easily scale your grid up or down just by starting or stopping nodes.
  • Map-Reduce capability.
  • Queryable In-Memory Cache.
  • Grid Event Model.  React to a number of events on the grid (cache insertions/deletions, nodes discovered, etc).
  • Distributed Thread-Pool.
  • Functional Language support for Java.
  • Native Scala support.
You can read more about GridGains features from their White Paper: http://www.gridgain.com/media/gridgain_white_paper.pdf.

The power of GridGain is best seen in a code demonstration.  So instead of continuing to espouse the merits of the framework, why don't we just jump into some code?  The remainder of the post will center around 3 separate Grid Applications.  The post is rather long, so I've left a little roadmap to each individual application below:
  1. Using the Gridify attribute to distribute a task to the Grid.
  2. Broadcasting the execution of a classic thread task (Runnable) across all nodes in the Grid.
  3. Turning a classic Java function into a distributed Map-Reduce operation.
Sorry for the long-winded introduction.  Without further adieu, here is our first demonstration: Using the Gridify attribute to distribute a task to the Grid.

Demo 1.


The following is an example distributing a method to a grid node in the Ad Hoc cluster. The method annotated with "Gridify" will be advised by an AOP framework with a proxy that distributes the work amongst the active grid nodes. Since this task is not associated with a TaskSplitter implementation, it can only be ran on a single node (chosen at random on the network).

package com.berico.grid;

import org.gridgain.grid.GridException;
import org.gridgain.grid.gridify.Gridify;
import org.gridgain.grid.gridify.aop.spring.GridifySpringEnhancer;
import org.gridgain.grid.typedef.G;

/**
 * Demonstrates the use of GridGain's 
 * Gridify annotation for distributed
 * processing of a method.
 * @author Richard C. (Berico Technologies)
 */
public class GridifyExample {

  /**
   * Entry-point to the application
   * @param args Command Line arguments
   * @throws GridException 
   */
  public static void main(String[] args) 
   throws GridException {
    //Programatically start a new
    //Grid node instance
    G.start();
    //Wrap the call so we can close
    //the connection regardless of
    //what happens.
    try {
      //Create a new instance of this
      //class.  Technically this is not
      //necessary; the annotation can be
      //applied to a static method.
      //We, however, need to use Spring AOP
      //to advise this class (proxying the
      //object with the Distributed functionality
      //at runtime, vice compile-time).
      GridifyExample example = new GridifyExample();
      //This step is unnecessary if AspectJ
      //or JBoss AOP is correctly configured
      //to proxy objects annotated with
      //Gridify
      example = GridifySpringEnhancer.enhance(example);
      //Call the distribute method.  Behind
      //the scenes, this method is proxied and
      //will be invoked using GridGain's
      //distributed api.
      example.distribute();
    }
    finally {
      //Stop the local grid node (whether
      //we succeed or fail).
      G.stop(true);
    }
  }

  /**
   * A node on the grid will be tasked with 
   * executing this method.  Since the 
   * "Gridify" annotation is not paired
   * with a task splitter, this will only
   * run on one grid node (randomly chosen).
   */
  @Gridify
  public void distribute(){
    System.out.println(
      "Calling Gridified Method.");
  }
}

The following image shows only one Grid Node outputting the message (keep in mind, Eclipse is serving as one of those nodes):


Full View:  http://dl.dropbox.com/u/12311372/GridifyExample.Output.PNG

Demo 2.

The previous example was not particularly useful. Sure, I can think of a couple of scenarios that might benefit by randomly distributing a single task to another node, but these scenarios are far less common than the need to perform map-reduce or queue a static job on multiple servers.

The next example demonstrates how to force all nodes in the Grid to execute a standard Java "Runnable" implementation. For this demonstration, our Runnable task will simply print a message to the console (don't worry, I'll provide a demonstration of working with state in the next example). Here is the Runnable implementation:

/**
 * An implementation of the standard 
 * Java Runnable Interface
 */
 public static class MyGridTask implements Runnable {
   //This will be executed by each Grid node.
   @Override
   public void run() {
     System.out.println("Got a Task!");
   }
 }


To run this task across all servers in our grid, all we need to do is submit the Runnable to the grid's run method:

package com.berico.grid;

import org.gridgain.grid.GridClosureCallMode;
import org.gridgain.grid.GridException;
import org.gridgain.grid.typedef.G;

/**
 * An Example of using a Zero-Config Grid
 * to broadcast the execution of a Java
 * Runnable.
 * @author Richard C (Berico Technologies)
 */
 public class RunnableExample {

  /**
   * Start a new Grid Node and 
   * broadcast a "Runnable" job on
   * all participating nodes. 
   * @param args Shell Arguments
   * @throws GridException
   */
   public static void main(String[] args) 
         throws GridException{

     //Start a new Grid Node instance
     G.start();
     try {
       //Get a handle to the Grid
       G.grid()
           //Broadcast a run request
           .run(GridClosureCallMode.BROADCAST, 
                new MyGridTask());
     } finally {
       //Stop the Grid Node instance
       G.stop(true);
     }
   }
 }

This is the output on the initiating node:

[23:34:52]    _____     _     _______      _         ____  ___    
[23:34:52]   / ___/____(_)___/ / ___/___ _(_)___    |_  / / _ \  
[23:34:52]  / (_ // __/ // _  / (_ // _ `/ // _ \  _/_ <_/ // /  
[23:34:52]  \___//_/ /_/ \_,_/\___/ \_,_/_//_//_/ /____(+)___/
[23:34:52] 
[23:34:52]        ---==++ C0MPUTE + DATA + CL0UD ++==---
[23:34:52]                 ver. 3.0.3c-20122010
[23:34:52]    2005-2010 Copyright (C) GridGain Systems, Inc.
[23:34:52] 
[23:34:52] Quiet mode.
[23:34:52]   ^-- To disable add -DGRIDGAIN_QUIET=false or "-v" to ggstart.{sh|bat}
[23:34:52] << Community Edition >>
[23:34:52] Language runtime: Java Platform API Specification ver. 1.6
[23:34:52] GRIDGAIN_HOME=null
[23:34:54] Node JOINED [nodeId8=165ec9f5, addr=[192.168.2.4], CPUs=2]
[23:34:54] '--ToPoLoGy SnApShOt [nodes=2, CPUs=2, hash=0xFCCE1F62]
[23:34:54] Node JOINED [nodeId8=ec1ddad5, addr=[192.168.2.4], CPUs=2]
[23:34:54] '--ToPoLoGy SnApShOt [nodes=3, CPUs=2, hash=0xB32DDF10]
[23:34:57] JVM: Sun Microsystems Inc., Java(TM) SE Runtime Environment ver. 1.6.0_23-b05
[23:34:57] OS: Windows 7 6.1 amd64, rclayton
[23:34:57] VM name: 20960@deathstar
[23:34:57] Local ports: TCP:47102 UDP:47200 TCP:47302 
[23:34:57] GridGain started OK [grid=default, nodeId8=1ad101c5, CPUs=2, addrs=[192.168.2.4]]
[23:34:57] ZZZzz zz z...
Got a Task!
[23:34:57] GridGain stopped OK [uptime=00:00:00:258]

And the output from all of the Grid Nodes:


Full View: http://dl.dropbox.com/u/12311372/RunnableExample.Output.PNG

Demo 3.

Of course, distributing work that doesn't return any data to nodes throughout the network isn't very exciting. More often than not, we are going to want to pass some data to the grid nodes and get a synthesized dataset back. GridGain has a number of mechanisms for doing this sort of task. Let's see an example very similar to a map-reduce job that you might submit to Hadoop.

In this scenario, we have a large set of keys (UUID's) and we need to find the correct server that holds the data that key is associated with. This is a common problem we encounter when building distributed databases. My approach to "consistent key partitioning" is to use the remainder of the value of the 3rd and 4th byte of a UUID (lowest order bytes) divided by the number of servers in the cluster.

server index = ((integer formed from concat(byte[2], byte[3])) % number of servers)

This strategy works surprisingly well at evenly distributing UUID keys amongst servers. This is because many UUID implementations (a lot of Databases) increment the first 4 bytes of the UUID (keeping the trailing 12 bytes consistent) because it is more performant for indexing (though this strategy may not necessarily be standards compliant). To ensure we understand the exact algorithm being performed by the distributed nodes, review the implementation below; we're going to use a really simple class to store both the string representation of the UUID and, eventually, the server index that key is stored on. For the sake of brevity, and the fact that the class is used internally, I'm not going to use accessors and mutators on the data structure.

package com.berico.grid;

import java.io.Serializable;

/**
 * Associates a record's key to a server.
 * This is necessary when records are 
 * partitioned across a cluster of servers.
 * Note, I chose not to use getters or
 * setters.  This class used internally.
 * @author Richard C. (Berico Technologies)
 */
 public class KeyToServerAssociation implements Serializable {

   private static final long 
      serialVersionUID = 4299712504127354446L;

   //The Key of a record; UUID without
   //the dashes.
   public String key;
 
   //The index/id of the server
   //the key should be stored or
   //found.
   public int server;
 
   /**
    * Friendly toString
    */
   @Override
   public String toString(){
     StringBuilder sb = new StringBuilder();
     sb.append("[Key: ")
       .append(this.key)
       .append(" Server: ")
       .append(this.server)
       .append("]");
     return sb.toString();
   } 
}

And here is the static method we will use to calculate the association. This method exists in the LobUuidAssociationBuilder class (I will introduce it a little later).

/**
 * We want the 3rd and 4th bytes of the UUID, 
 * which is the least significant
 * 2 bytes of the 32-bit (4 byte) integer. 
 * Most databases increment this value
 * when they create their UUIDs.
 * The X's represent the part of 
 * the string we are targeting
 * on the GUID:  
 * 000000XX--0000-0000-0000-000000000000
 * @param association Key to Server Association
 */
 public static void associate(
         KeyToServerAssociation association){
   //Get the least significant 2 bytes by
   //parsing the 7th and 8th characters
   //of the UUID string
   Integer leastSignificantBytes = 
      Integer.valueOf(
        association.key.substring(6, 8), 16);
   //We will take the remainder of the 
   //least significant bytes divided by
   //the number of servers as the index
   //of the appropriate server in which
   //the record is stored.
   association.server = 
   leastSignificantBytes % NUMBER_OF_SERVERS;
   //For the sake of the demonstration,
   //print the server
   System.out.println(
     String.format(
       "Key associated with server: %s", 
       association));
 }

The goal of this application is to build a list of associations for a million or more keys. So we need an implementation in which we can submit a list of unassociated KeyToServerAssociation objects and receive an equivalent list back with the server indexes set. Our interface for this implementation is listed below:

package com.berico.grid;

import java.util.ArrayList;

import org.gridgain.grid.gridify.Gridify;

/**
 * Given a list of unassociated keys (meaning
 * only the UUID has been set, not the
 * server index), associate those keys
 * to the appropriate servers and return
 * the list.
 * @author Richard C. (Berico Technologies)
 */
 public interface IBulkAssociationBuilder {
 
   /**
    * Build the associations and return
    * it to the caller
    * @return Key to Server Association
    */
    @Gridify(taskClass = KeyAssociationTaskSplitter.class)
    ArrayList<KeyToServerAssociation> build(
      ArrayList<KeyToServerAssociation> unassociatedKeys);

}

Probably the first thing you will notice is the addition of some metadata to the "Gridify" annotation. When we start dealing with functions that can be distributed across multiple servers, we need to have a mechanism to decide how the task will be split amongst the server nodes. It is also essential to determine how the results from all those nodes will be "reduced" back into a single result. This is the job of the GridGain TaskSplitter class. In our example, we extend GridGain's API creating a custom TaskSplitter for our Grid Application (this is the standard way we do this form of distributed processing using the framework).

package com.berico.grid;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.gridgain.grid.GridException;
import org.gridgain.grid.GridJob;
import org.gridgain.grid.GridJobAdapterEx;
import org.gridgain.grid.GridJobResult;
import org.gridgain.grid.gridify.GridifyArgument;
import org.gridgain.grid.gridify.GridifyTaskSplitAdapter;

/**
 * This is GridGains mechanism for splitting
 * tasks (you get to decide how jobs are 
 * partitioned), and optionally allows you
 * to reduce the results into some type of
 * value (in our case, a list of key-to-server
 * associations).
 * @author Richard C. (Berico Technologies)
 */
 public class KeyAssociationTaskSplitter 
        extends GridifyTaskSplitAdapter
               < List<KeyToServerAssociation> > {

   //For the Serializable interface
   private static final long 
      serialVersionUID = -6072502274009699541L;

   /**
    * Split the work traditionally done by the
    * build() method into a collection of grid
    * jobs that can be distributed to other
    * nodes.
    * @param gridSize Number of nodes in the grid
    * @param argument The Method
    * @return
    * @throws GridException
    */
   @Override
   protected Collection<? extends GridJob> 
         split(int gridSize, GridifyArgument argument)
              throws GridException {
  
     //Pull the KeyAssociator from the Gridify AOP Argument
     LobUuidAssociationBuilder associationBuilder = 
         (LobUuidAssociationBuilder)argument.getTarget();
     //Total number of unassociated keys in the List
     int totalRecords = 
         associationBuilder.getUnassociatedKeys().size();
     //A Collection of Grid Jobs we're going to 
     //submit to the GridGain framework to distribute
     Collection<GridJobAdapterEx> jobs 
         = new ArrayList<GridJobAdapterEx>();
     //Get a local reference to the list of unassociated keys
     List<KeyToServerAssociation> associations 
         = associationBuilder.getUnassociatedKeys();
     //We are going to partition the unassociate keys list
     //evenly across the Grid Nodes.  To do this, we will
     //need to calculate the partition size by dividing
     //the total number of records by the number of 
     //server nodes in the grid
     int partitionSize = Math.round(totalRecords / gridSize);
     //We need to keep track of what server we are on
     //when we partition the list.  This variable will hold
     //that 'position'.
     int position = 0;
     //While we haven't reached the last node in the grid
     while(position < (gridSize)){
       //Calculate the starting position for this
       //partition on the list
       int startingIndex = position * partitionSize;
       //Reference to the end position for the partition
       //on the list
       int endingIndex;
       //If this is not the last partition
       if(position != (gridSize -1)){
         //the end position is the current partition we
         //are on multiplied by the partition size
         endingIndex = (position + 1) * partitionSize;
       } 
       //If this is the last position
       else {
         //the ending index is the last item in 
         //the array list
         endingIndex = totalRecords - 1;
       }
       //Create a new list to hold the "splice" (sublist)
       //of the initial unassociated key list
       ArrayList<KeyToServerAssociation> splice 
           = new ArrayList<KeyToServerAssociation>();
       //Now that we have the index we should start and end
       //at, iterate through the array adding those items
       //to the spliced list.
       for(int r = startingIndex; r < endingIndex + 1; r++){
          splice.add(associations.get(r));
       }
       //Once we have the spliced list, we need to add a new
       //Grid Job to the the Grid Job list.  This is an atomic
       //unit of work for a grid node in GridGain.
       jobs.add(
         //Create a new Job Adapter, essentially a wrapper
         //for the method we want to distribute.
         new GridJobAdapterEx(splice) {
           //Since this is sent over the wire to another
           //server node, it is Serializable
           private static final long 
             serialVersionUID = 1243523452452345L;
           //The method that will be executed by
           //the remote grid node.  Please note that
           //the implementation requires a Serializable
           //object (since it is sent back over the wire).
           //This is why you see me using ArrayList instead
           //of the List Interface (which does not extend
           //Serializable).
           @Override public Serializable execute() {
             //Create a new instance of the same class
             //that we are initially wrapping; this
             //could alternatively be another class,
             //but it's convenient to only have one
             //implementation and treat an instance
             //methods as if it were static to prevent
             //duplication of the functionality.
             LobUuidAssociationBuilder taskWrapper 
                 = new LobUuidAssociationBuilder();
             //Finally, build the associations.
             //It took me a minute to realize we are
             //calling an inherited template method 
             //which returns the constructor argument 
             //(the key-to-server association list)
             //as its proper type.
             return taskWrapper.build(
                this.<ArrayList<KeyToServerAssociation>>
                    argument(0));
           }
         });
         //Now that the job is added, increment the partition
         //position (to the next node).
         position++;
       }
       //Return the grid job list.
       return jobs;
     }

  /**
   * Reduce the results from all "mapped" job
   * nodes into a single KeyToServerAssociation list
   * result.
   * @param results The result list from all
   * grid nodes
   * @return Returns all of the completed key to 
   * server associations 
   */
   @Override
   @SuppressWarnings("unchecked")
   public List<KeyToServerAssociation> 
                reduce(List<GridJobResult> results)
                     throws GridException {
     //Create a new ArrayList to hold the 
     //merged results
     ArrayList<KeyToServerAssociation> associations 
          = new ArrayList<KeyToServerAssociation>();
     //Iterate over the results
     for (GridJobResult result : results) {
       //Add the resultant data from the grid
       //nodes to the merge array
       associations.addAll(
         (ArrayList<KeyToServerAssociation>)
            result.getData());
     }
     //Return the merged list
     return associations;
   }
 
}

In order to Gridify our IBulkAssociationBuilder.build() method, we need to implement the class. This is our "Lowest Order Bytes" implementation that we talked about above. I've abbreviated the associate() method since we've already seen it (see the code above). The one thing to mention about this class is that it contains methods to store the state that's going to be "Gridify"'d. This is probably the one thing I don't like very much, since we have to store all of the state (all key-to-server associations) in the advised class (so it can't be split by the task splitter). Of course, I can think of other ways to perform the task, but not too many seem very clean.

package com.berico.grid;

import java.util.ArrayList;

/**
 * Lowest Order Bytes, Association Builder.
 * Associates the key (a UUID) to the appropriate
 * server.  The correct server is calculated
 * using a modulus operation on the UUID's
 * 3rd and 4th byte by the number of servers
 * in the cluster.
 * @author Richard C. (Berico Technologies)
 */
 public class LobUuidAssociationBuilder 
       implements IBulkAssociationBuilder {
 
   //Number of Servers in the cluster in
   //which we are partitioning keys
   public static int NUMBER_OF_SERVERS = 16;
 
   //Stored Key to Server Associations
   private ArrayList<KeyToServerAssociation> associations;
 
   public LobUuidAssociationBuilder(){}
 
   /**
    * Initialize the task, passing the 
    * unassociated keys
    * @param associations Unassociated Keys
    */
   public LobUuidAssociationBuilder(
           ArrayList<KeyToServerAssociation> associations){
     this.associations = associations;
   }
 
   /**
    * Get the unassociated keys
    * @return Unassociated Keys
    */
   public ArrayList<KeyToServerAssociation> 
             getUnassociatedKeys() {
     return this.associations;
   }
 
   /**
    * Build the Associations
    * @return Key to Server Associations
    */
   @Override
   public ArrayList<KeyToServerAssociation> build(
            ArrayList<KeyToServerAssociation> unassociatedKeys){
     //Iterate over the collection
     for(KeyToServerAssociation association : unassociatedKeys){
     //Associate
       associate(association);
     }
     //Return the Association
     return unassociatedKeys;
   }
 
  /**
   * SEE THE IMPLEMENTATION ABOVE
   * @param association Key to Server Association
   */
   public static void associate(
     //And we've seen this method before
   }
  
}

To be completely transparent, the next class demonstrates how I'm generating and temporarily storing the keys. It's not essential for the purposes of explaining the framework, but I've included just in case you had questions:

package com.berico.grid;

import java.util.ArrayList;
import java.util.UUID;

/**
 * Generates unassociated keys for our grid
 * demonstration.
 * @author Richard C. (Berico Technologies)
 */
 public class TestDataGenerator {
 
   //Number of keys to generate
   public static int NUMBER_OF_KEYS = 1000000;
   //A list of generated unassociated keys
   protected ArrayList<KeyToServerAssociation> associations;
 
   /**
    * Default Constructor
    */
   public TestDataGenerator(){}
 
  /**
   * Setup the example by creating 
   * a bunch of random keys.
   */
   public void generate(){
     //initialize the list
     this.associations 
         = new ArrayList<KeyToServerAssociation>();
     //generate a bunch of unassociated keys
     for(int i = 0; i < NUMBER_OF_KEYS; i++){
       associations.add(generateUnassociatedKeys());
     }
   }
 
  /**
   * Get the list of key to server associations
   * @return
   */
   public ArrayList<KeyToServerAssociation> 
              getUnassociationedKeys(){
     //Return the list of associations
     return this.associations;
   }
 
  /**
   * Generate a random key and add it 
   * to an key-to-server association.
   * @return Random Key.
   */
   public static KeyToServerAssociation 
              generateUnassociatedKeys(){
     //Create a new association
     KeyToServerAssociation association 
          = new KeyToServerAssociation();
     //Create the key (UUID)
     association.key = 
           UUID.randomUUID()
               //Convert to string
               .toString()
               //remove the unnecessary dashes
               .replace("-", "");
     //Return the association
     return association;
   }
}

Finally we have come to the end of the demonstration: the "runner" class that will actually execute our Key-to-Server Association distributed application. At the end of the class, you will find some non-essential functionality that counts the keys for each server. I've done this to demonstrate that the values returning back to the main node are correctly set.

package com.berico.grid;

import java.util.List;

import org.gridgain.grid.GridException;
import org.gridgain.grid.gridify.aop.spring.GridifySpringEnhancer;
import org.gridgain.grid.typedef.G;

/**
 * A more advanced example of 'Gridify'ing 
 * an instance method, manually 'partitioning'
 * the work amongst server nodes and 'reducing'
 * the result back into a single list.
 * The class is the runner for the example.
 * In this scenario, we are associating the keys
 * (UUID's) of hypothetical records to the servers
 * those records would be stored on.  This is a 
 * scenario that happens quite often in distributed
 * database (we use 8 Redis instances, for example). 
 * @author Richard C. (Berico Technologies)
 */
 public final class KeyAssociationExample {
 
   //Ensure this can't be instantiated
   private KeyAssociationExample(){}
 
  /**
   * Entry-point into our example
   * @param args Command Line Arguments
   * @throws GridException
   */
   public static void main(String[] args) throws GridException{
     //Reference to the key-to-server associations
     List<KeyToServerAssociation> associations;
     //Start a new grid node instance
     G.start();
     //Wrap the distributed call so we can stop
     //the server node whether we are successful
     //or not.
     try {
       //Create a new instance of the
       //Test Data Generator
       TestDataGenerator provider 
            = new TestDataGenerator();
       //Generate some test data, exactly
       //1,000,000 keys
       provider.generate();
       //Create a new instance of the
       //Bulk Association Builder.  We are
       //going to use a Lowest Order Byte(s)
       //strategy for partitioning our keys.
       IBulkAssociationBuilder associationBuilder 
            = new LobUuidAssociationBuilder(
                  provider.getUnassociationedKeys());
       //We are going to use Spring AOP to advice
       //the builder.  If you use AspectJ or 
       //JBoss AOP (to compile-time weave the 
       //advice onto the class), this is
       //unnecessary.
       associationBuilder = GridifySpringEnhancer
            .enhance(associationBuilder);
       //With our newly enhanced Association
       //Builder, let's build the associations...
       //of course, we're going to do this
       //across a grid of computers.
       associations = associationBuilder
            .build(provider.getUnassociationedKeys());
     }
     finally {
       //Stop the node regardless of whether
       //we are successful.
       G.stop(true);
     }
        
     //We'll wrap up by demonstrating that the 
     //distributed operation was successful.
     //To prove it was, we will count the number of
     //keys associated with each server.
        
     //Yes I know this is heinously inefficient!
        
     //Iterate over the number of servers
     for(int i = 0; 
             i < LobUuidAssociationBuilder.NUMBER_OF_SERVERS;
             i++){
       //Set a counter
       int count = 0;
       //Iterate over the associations
       for(KeyToServerAssociation association : associations){
         //if the current association's server is the
         //same as the iteration, increment the
         //counter.
         if(association.server == i){
           count++;
         }
       }
       //Finally, print the results to the console.
       System.out.println(
           String.format(
                "Server %s had %s keys associated with it.", 
                i + 1, count));
     }
   }
}

And here is the results from the node that tasked the grid (Eclipse):

[23:29:29]    _____     _     _______      _         ____  ___    
[23:29:29]   / ___/____(_)___/ / ___/___ _(_)___    |_  / / _ \  
[23:29:29]  / (_ // __/ // _  / (_ // _ `/ // _ \  _/_ <_/ // /  
[23:29:29]  \___//_/ /_/ \_,_/\___/ \_,_/_//_//_/ /____(+)___/
[23:29:29] 
[23:29:29]        ---==++ C0MPUTE + DATA + CL0UD ++==---
[23:29:29]                 ver. 3.0.3c-20122010
[23:29:29]    2005-2010 Copyright (C) GridGain Systems, Inc.
[23:29:29] 
[23:29:29] Quiet mode.
[23:29:29]   ^-- To disable add -DGRIDGAIN_QUIET=false or "-v" to ggstart.{sh|bat}
[23:29:29] << Community Edition >>
[23:29:29] Language runtime: Java Platform API Specification ver. 1.6
[23:29:29] GRIDGAIN_HOME=null
[23:29:31] Node JOINED [nodeId8=71f4cdf5, addr=[192.168.2.4], CPUs=2]
[23:29:31] '--ToPoLoGy SnApShOt [nodes=3, CPUs=2, hash=0xD3C8011B]
[23:29:31] Node JOINED [nodeId8=385a77d4, addr=[192.168.2.4], CPUs=2]
[23:29:31] Node JOINED [nodeId8=2709d99f, addr=[192.168.2.4], CPUs=2]
[23:29:31] '--ToPoLoGy SnApShOt [nodes=4, CPUs=2, hash=0x5B332E38]
[23:29:34] JVM: Sun Microsystems Inc., Java(TM) SE Runtime Environment ver. 1.6.0_23-b05
[23:29:34] OS: Windows 7 6.1 amd64, rclayton
[23:29:34] VM name: 21164@deathstar
[23:29:34] Local ports: TCP:47103 UDP:47200 TCP:47303 
[23:29:34] GridGain started OK [grid=default, nodeId8=22aaaa77, CPUs=2, addrs=[192.168.2.4]]
[23:29:34] ZZZzz zz z...
Key associated with server: [Key: 4f22c46355b24050ab09267bf26000fd Server: 3]
Key associated with server: [Key: e733cf098fa14fb6898eee97c653ca2b Server: 9]

Abbreviated...You're missing about 250,000 entries.

Key associated with server: [Key: 8e44a025efb74d5f8b45c23c781c6128 Server: 5]
Key associated with server: [Key: d93e7742bf5945f5b534e6e9e8f9c251 Server: 2]
Key associated with server: [Key: 27ae198b30644d0ab9ba806ed93b55f4 Server: 11]
[20:46:58] GridGain stopped OK [uptime=00:01:47:435]
Server 1 had 62502 keys associated with it.
Server 2 had 62367 keys associated with it.
Server 3 had 62479 keys associated with it.
Server 4 had 62753 keys associated with it.
Server 5 had 62526 keys associated with it.
Server 6 had 62139 keys associated with it.
Server 7 had 62560 keys associated with it.
Server 8 had 62657 keys associated with it.
Server 9 had 62404 keys associated with it.
Server 10 had 62325 keys associated with it.
Server 11 had 62818 keys associated with it.
Server 12 had 62285 keys associated with it.
Server 13 had 62129 keys associated with it.
Server 14 had 62292 keys associated with it.
Server 15 had 63142 keys associated with it.
Server 16 had 62625 keys associated with it.

I took a screen shot to demonstrate what the task looks like on 4 local nodes started on my laptop.


Full View: http://dl.dropbox.com/u/12311372/KeyAssociationExample.Output.PNG

And I'm spent! I hope you found this introduction to GridGain rewarding. I highly recommend the framework to any engineer considering a "scale out" or "scale up" of their system. In a later post, I will introduce GridGain's functional API, making the framework function eerily similar to Microsoft's Task Parallel Library.

Good Luck and Happy Coding.

Richard C.

Resources

Eclipse Project:  http://dl.dropbox.com/u/12311372/GridifyWithGridGain.zip
Image 1:  http://dl.dropbox.com/u/12311372/GridifyExample.Output.PNG
Image 2:  http://dl.dropbox.com/u/12311372/RunnableExample.Output.PNG
Image 3:  http://dl.dropbox.com/u/12311372/KeyAssociationExample.Output.PNG

Links

Terracotta:  http://www.terracotta.org/
Apache Hadoop:  http://hadoop.apache.org/
GridGain:  http://www.gridgain.com
Performance Comparison of Hadoop, GridGain and Hazelcast: http://java.dzone.com/articles/comparison-gridcloud-computing-0