Tuesday, March 22, 2011

Dynamic Application Logic: Hosting JavaScript within .NET

Having one platform transfer and execute code on another is certainly not a new concept. As you read this post, JavaScript sent by the web server is performing numerous tasks (loading and rendering content, running animations, handling UI events). Despite the ubiquity of executing application logic from external sources, there is a stigma towards developing desktop and server applications using the same pattern. Fortunately, old beliefs are giving way to a new generation of technologies, many of them "cloud"-based, founded on the concept of distributing logic amongst nodes at runtime.


Hosted logic offers an alternative to (or can supplement) declarative (XML) workflow technologies where system tasks are composed of a graph of pre-generated operations.  In the example above, the 'cloud' represents the portion of the architecture hosting the dynamic application logic.  The host sandboxes that application logic, preventing it from executing code outside the privileges granted to the process. Simply hosting an application isn't enough though.  For this to be a truly compelling solution, the hosted application needs to be able to work with the services and repositories within the architecture.  We do this by injecting the appropriate service and repository clients into the hosted application so it may perform tasks on those components.  Finally, the component references (services and repositories) could be further extracted or adapted for use by the hosted application by attaching the runtime to the bus (at some level).  More interestingly, imagine transferring application logic amongst services and repositories much like we do data today.

To demonstrate the possibilities of hosting external code and wiring it into your application at runtime, I've built a demonstration application in C# using the extremely excellent JavaScript Interpretter for .NET (Jint) [http://jint.codeplex.com/].  For the purposes of this demo, I'm going to keep things simple by only using one model class and one component to manage that model class (a repository).  Since the notion of a user task is quite simple for everyone to understand, I will start by defining our demo's notion of this construct:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace com.berico
{
  /// <summary>
  /// A simple model definition representing user tasks
  /// </summary>
  public class Task
  {
    /// <summary>
    /// Default Constructor
    /// </summary>
    public Task() { }

    /// <summary>
    /// ID of the Task
    /// </summary>
    public long ID { get; set; }

    /// <summary>
    /// User the Task is Assigned To
    /// </summary>
    public string Owner { get; set; }

    /// <summary>
    /// A Summary of what needs to be done
    /// </summary>
    public string Summary { get; set; }

    /// <summary>
    /// Description of the task
    /// </summary>
    public string Description { get; set; }

    /// <summary>
    /// The drop-dead date for task completion
    /// </summary>
    public DateTime Expiration { get; set; }

    /// <summary>
    /// An optional URL to a form that should be used to 
    /// complete the task
    /// </summary>
    public string FormUrl { get; set; }

    /// <summary>
    /// Returns the internal state of the object, formatted
    /// </summary>
    /// <returns>formatted string of the object's state</returns>
    public override string ToString()
    {
      StringBuilder sb = new StringBuilder();
      sb.AppendLine("com.berico.Task {");
      sb.AppendLine(
        string.Format("  ID: {0}", this.ID));
      sb.AppendLine(
        string.Format("  Owner: {0}", this.Owner));
      sb.AppendLine(
        string.Format("  Summary: {0}", this.Summary));
      sb.AppendLine(
        string.Format("  Description: {0}", this.Description));
      sb.AppendLine(
        string.Format("  Expiration: {0}", this.Expiration));
      sb.AppendLine(
        string.Format("  Form URL: {0}", this.FormUrl));
      sb.AppendLine("}");
      return sb.ToString();
    }

  }
}

Here is the contract for our repository. I'm going to use some of C#'s more advanced functional features to reduce the amount of code necessary for interacting with the Repository. More importantly, you will find that we can actually use anonymous JavaScript functions to fulfill the runtime requirements of these delegates/predicates used by the Repo.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace com.berico
{
  public interface ITaskRepository
  {
    /// <summary>
    /// Create a new task
    /// </summary>
    /// <param name="owner">Person the task 
    ///    is assigned to</param>
    /// <param name="summary">Summary of the Task</param>
    /// <param name="description">Verbose 
    ///    description of the task</param>
    /// <param name="expiration">Expiration time of the task</param>
    /// <param name="formUrl">Location of a form 
    ///    for completing the task</param>
    /// <returns>A new Task</returns>
    Task Create(string owner, string summary, string description, 
              DateTime expiration, string formUrl);

    /// <summary>
    /// Get a Task by ID
    /// </summary>
    /// <param name="id">ID of the task</param>
    /// <returns>A Task with the submitted ID or Null</returns>
    Task Get(long id);

    /// <summary>
    /// Get all tasks that match the provided filter
    /// </summary>
    /// <param name="filter">Filter method</param>
    /// <returns>List of tasks that the filter applies to</returns>
    List<Task> Get(Func<Task, bool> filter);

    /// <summary>
    /// Update a task in the repository (Task must exist in the
    /// repository)
    /// </summary>
    /// <param name="task">Updated Task</param>
    void Update(Task task);

    /// <summary>
    /// Update many tasks using an update predicate
    /// </summary>
    /// <param name="updatePredicate">Predicate that will update
    /// tasks (completely dependent on your own logic.</param>
    void Update(Action<Task> updatePredicate);

    /// <summary>
    /// Delete a task having the supplied id
    /// </summary>
    /// <param name="id">ID of the task</param>
    void Delete(long id);

    /// <summary>
    /// Delete all tasks that match the supplied filter
    /// </summary>
    /// <param name="filter">Predicate</param>
    void Delete(Func<Task, bool> filter);

    /// <summary>
    /// Find the Aggregate value of something against tasks in the
    /// Repo
    /// </summary>
    /// <typeparam name="T">Return Type</typeparam>
    /// <param name="map">Function for selecting 
    ///   or calculating</param>
    /// <param name="reduce">Function for 
    ///   reducing selected values</param>
    /// <returns>Aggregate Value</returns>
    T Aggregate<T>(Func<Task, T> map, Func<IList<T>, T> reduce);
  }
}

My naive implementation of the Task Repository backed by a Dictionary (obviously nothing is going to persist after the program terminates).

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace com.berico
{
  /// <summary>
  /// A little in-memory Repository of Tasks backed by a Dictionary
  /// </summary>
  public class VolatileTaskRepository : ITaskRepository
  {
    //Auto incrementing strategy.
    private static long id = 0;

    //Our task repository is backed by a dictionary
    private Dictionary<long, Task> 
      taskstore = new Dictionary<long, Task>();

    /// <summary>
    /// Create a new task
    /// </summary>
    /// <param name="owner">Person the task is 
    ///   assigned to</param>
    /// <param name="summary">Summary of the Task</param>
    /// <param name="description">Verbose description 
    ///   of the task</param>
    /// <param name="expiration">Expiration time of 
    ///   the task</param>
    /// <param name="formUrl">Location of a form for 
    ///   completing the task</param>
    /// <returns>A new Task</returns>
    public Task Create(string owner, string summary, 
                       string description, DateTime expiration, 
                       string formUrl)
    {
      Task task = new Task()
      {
        ID = ++id,
        Owner = owner,
        Summary = summary,
        Description = description,
        Expiration = expiration,
        FormUrl = formUrl
      };

      taskstore.Add(task.ID, task);

      return task;
    }

    /// <summary>
    /// Get a Task by ID
    /// </summary>
    /// <param name="id">ID of the task</param>
    /// <returns>A Task with the submitted ID or Null</returns>
    public Task Get(long id)
    {
      return taskstore[id];
    }

    /// <summary>
    /// Get all tasks that match the provided filter
    /// </summary>
    /// <param name="filter">Filter method</param>
    /// <returns>List of tasks that the filter applies to</returns>
    public List<Task> Get(Func<Task, bool> meetsFilter)
    {
      List<Task> filteredTasks = new List<Task>();

      foreach(Task task in taskstore.Values)
      {
        if (meetsFilter(task))
        {
          filteredTasks.Add(task);
        }
      }
    
      return filteredTasks;
    }

    /// <summary>
    /// Update a task in the repository (Task must exist in the
    /// repository)
    /// </summary>
    /// <param name="task">Updated Task</param>
    public void Update(Task task)
    {
      if (taskstore.ContainsKey(task.ID))
      {
        taskstore[task.ID] = task;
      }
    }

    /// <summary>
    /// Update many tasks using an update predicate
    /// </summary>
    /// <param name="updatePredicate">Predicate that will update
    /// tasks (completely dependent on your own logic.</param>
    public void Update(Action<Task> updatePredicate)
    {
      foreach (Task task in taskstore.Values)
      {
        updatePredicate(task);
      }
    }

    /// <summary>
    /// Delete a task having the supplied id
    /// </summary>
    /// <param name="id">ID of the task</param>
    public void Delete(long id)
    {
      if (taskstore.ContainsKey(id))
      {
        taskstore.Remove(id);
      }
    }

    /// <summary>
    /// Delete all tasks that match the supplied filter
    /// </summary>
    /// <param name="filter">Predicate</param>
    public void Delete(Func<Task, bool> meetsFilter)
    {
      List<long> idsToDelete = new List<long>();
      foreach (Task task in taskstore.Values)
      {
        if (meetsFilter(task))
        {
          idsToDelete.Add(task.ID);
        }
      }

      foreach (long id in idsToDelete)
      {
        taskstore.Remove(id);
      }

    }

    /// <summary>
    /// Find the Aggregate value of "whatever" against 
    /// tasks stored in the Repo
    /// </summary>
    /// <typeparam name="T">Return Type</typeparam>
    /// <param name="map">Function for selecting 
    ///   or calculating</param>
    /// <param name="reduce">Function for reducing 
    ///   selected values</param>
    /// <returns>Aggregate Value</returns>
    public T Aggregate<T>(Func<Task, T> map, 
                Func<IList<T>, T> reduce)
    {
      IList<T> items = new List<T>();

      foreach (Task task in taskstore.Values)
      {
        items.Add(map(task));
      }

      return reduce(items);
    }

  }
}

Now let's setup the runtime environment. The Jint API is extremely simple, to run a script, you only need to construct the object and call the "Run" method: new JintEngine().Run(script);. By default, the interpreter will not expose any objects in your environment to the script, and will ensure the hosted application is executed at the Lowest .NET Trust Level. You can even isolate the script host from using the .NET Framework entirely, but this would prevent us from exposing our services to the script. In our case, we not only want to expose the Task Repository, but also the Console.WriteLine and String.Format functions to the script so we can see the script's output.

using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Text;
using Jint;

namespace com.berico
{
  public class JintTestDrive
  {
    private JintEngine jsEngine;

    public JintTestDrive()
    {
      //Instantiate the Engine
      this.jsEngine = new JintEngine();

      //Register the print and format functions, 
      //and the task repository
      this.jsEngine.SetFunction("print", 
        new Action<object>(Console.WriteLine));

      this.jsEngine.SetFunction("format", 
        new Func<string, object, string>(String.Format));

      this.jsEngine.SetParameter("taskRepo", 
        new VolatileTaskRepository());
    }

    static void Main(string[] args)
    {

      ///////////////////////////////////////////////////////
      //   Repository Usage                //
      ///////////////////////////////////////////////////////

      Console.WriteLine("Simple Repository Usage.\n\n");

      string simpleRepoUsageJavaScript = 
        File.ReadAllText("SimpleRepoUsage.js", Encoding.UTF8);

      new JintTestDrive().Run(simpleRepoUsageJavaScript);

      Console.WriteLine("Press Any Key to Continue...");
      Console.ReadKey();

      ///////////////////////////////////////////////////////
      //   Aggregate Example                 //
      ///////////////////////////////////////////////////////

      Console.WriteLine("Aggregate Usage.\n\n");

      string aggregateExampleJavaScript = 
        File.ReadAllText("AggregateExample.js", Encoding.UTF8);

      new JintTestDrive().Run(aggregateExampleJavaScript);

      Console.WriteLine("Press Any Key to Continue...");
      Console.ReadKey();
    }

    public void Run(string logic)
    {
      var returnData = this.jsEngine.Run(logic);
      Console.WriteLine(returnData);
    }
  }
}

You will notice that there are two different script demonstrations executed in the Main method. The first demonstrates the use of standard repository functions, though it does show you how to pass an anonymous function to the repository's Get(), Update() and Delete() methods. The final demonstration is a little more advanced, showing how a little functional programming can result in powerful capabilities for your script (in this case, I show you how to implement MapReduce using a blend of JavaScript and C# [inspired by CouchDB]).

I have to credit my wife who magically solved a coding issue of mine. The funny thing is that she doesn't even know why it worked!

EXAMPLE 1: SimpleRepoUsage.js

/**
 * Dump all tasks in the List to the Console.
 * @param tasks Task objects that should be dumped to
 *  the console.
 */
function printTasks(tasks) {
  for (var task in tasks) {
    print(task);
  }
}

/**
 * Get all tasks from the Task Repo using
 * the a predicate that selects every item
 * @return a list of all the tasks in the Repo
 */
function getAllTasks() {
  return taskRepo.Get(
    function (taskItem) {
      return true;
    });
}

print("\n\nCREATING TASK");

// Create a new Task using the Task Repository 
// injected into the Session
var task1 = taskRepo.Create(
      "Richard", "Do this",
      "This needs to get done now",
      System.DateTime.Now.AddHours(4), 
      "http://www.bericotechnologies.com/form?name=dothis");

// The "format" function is a binding to the 
// .NET String.Format method.
print(
  format(
    "\n\nWE SHOULD HAVE A TASK ID NOW: {0}", task1.ID));

print("\n\nUPDATING TASK");

// Set the owner of the task to "Bob"
task1.Owner = "Bob";

// Update the Task in the Repository
taskRepo.Update(task1);

print("\n\nGETTING THE FIRST TASK AFTER AN UPDATE");

// Get the updated task from the Repo (we want to ensure both
// the repo's Update and Get methods worked correctly)
var updatedTask = taskRepo.Get(task1.ID);

print(updatedTask);

// Create another task
var task2 = taskRepo.Create(
      "Richard", "Do this other thing",
      "You can have this done within 2 days",
      System.DateTime.Now.AddDays(2),
      "http://www.bericotechnologies.com/form?name=dothis2");

// Create a third task
var task3 = taskRepo.Create(
      "Richard", "Do you ever get anything done?",
      "Hurry up on this one!",
      System.DateTime.Now.AddMinutes(1),
      "http://www.bericotechnologies.com/form?name=dothisNOW");

print("\n\nGETTING ALL TASKS");

// Show all the tasks in the repo (verify the
// new tasks are there)
printTasks(getAllTasks());

print("\n\nGETTING ALL OF RICHARD'S TASKS");

// Do a predicate search against the repository
// looking for all tasks assigned to Richard
var richardsTasks = taskRepo.Get(
    function (taskItem) {
      if (taskItem.Owner == "Richard") {
        return true;
      }
      return false;
    });

// Dump Richard's Tasks to the Console
printTasks(richardsTasks);

print("\n\nDELETING THE FIRST TASK");

// Delete the first task (assigned to Bob)
// from the repo.
taskRepo.Delete(task1.ID);

print("\n\nGETTING ALL TASKS AFTER A DELETE");

// Verify the task was deleted
printTasks(getAllTasks());

// Use a predicate to perform a conditional update on Tasks
// in the repository
taskRepo.Update(
  function (task) {
    if (task.Owner == "Richard") {
      task.Summary = "Postpone a month!";
      task.Description = "I'll get to it.";
      task.Expiration = System.DateTime.Now.AddMonths(1);
    }
  });

print("\n\nGETTING ALL TASKS AFTER AN UPDATE");

// Verify the task was deleted
printTasks(getAllTasks());

// Let's delete tasks using a predicate to
// select those that should be deleted
taskRepo.Delete(
  function (task) {
    return task.Owner == "Richard";
  });

print("\n\nGETTING ALL TASKS AFTER A FILTERED DELETE");

// Ensure there are no tasks left and that the previous delete
// operation was correct
printTasks(getAllTasks());

The output for the standard repository operations.

CREATING TASK

WE SHOULD HAVE A TASK ID NOW: 1

UPDATING TASK

GETTING THE FIRST TASK AFTER AN UPDATE
com.berico.Task {
  ID: 1
  Owner: Bob
  Summary: Do this
  Description: This needs to get done now
  Expiration: 3/23/2011 1:55:29 AM
  Form URL: http://www.bericotechnologies.com/form?name=dothis
}

GETTING ALL TASKS
com.berico.Task {
  ID: 1
  Owner: Bob
  Summary: Do this
  Description: This needs to get done now
  Expiration: 3/23/2011 1:55:29 AM
  Form URL: http://www.bericotechnologies.com/form?name=dothis
}

com.berico.Task {
  ID: 2
  Owner: Richard
  Summary: Do this other thing
  Description: You can have this done within 2 days
  Expiration: 3/24/2011 9:55:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothis2
}

com.berico.Task {
  ID: 3
  Owner: Richard
  Summary: Do you ever get anything done?
  Description: Hurry up on this one!
  Expiration: 3/22/2011 9:56:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothisNOW
}

GETTING ALL OF RICHARD'S TASKS
com.berico.Task {
  ID: 2
  Owner: Richard
  Summary: Do this other thing
  Description: You can have this done within 2 days
  Expiration: 3/24/2011 9:55:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothis2
}

com.berico.Task {
  ID: 3
  Owner: Richard
  Summary: Do you ever get anything done?
  Description: Hurry up on this one!
  Expiration: 3/22/2011 9:56:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothisNOW
}

DELETING THE FIRST TASK

GETTING ALL TASKS AFTER A DELETE
com.berico.Task {
  ID: 2
  Owner: Richard
  Summary: Do this other thing
  Description: You can have this done within 2 days
  Expiration: 3/24/2011 9:55:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothis2
}

com.berico.Task {
  ID: 3
  Owner: Richard
  Summary: Do you ever get anything done?
  Description: Hurry up on this one!
  Expiration: 3/22/2011 9:56:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothisNOW
}

GETTING ALL TASKS AFTER AN UPDATE
com.berico.Task {
  ID: 2
  Owner: Richard
  Summary: Postpone a month!
  Description: I'll get to it.
  Expiration: 4/22/2011 9:55:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothis2
}

com.berico.Task {
  ID: 3
  Owner: Richard
  Summary: Postpone a month!
  Description: I'll get to it.
  Expiration: 4/22/2011 9:55:29 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothisNOW
}

GETTING ALL TASKS AFTER A FILTERED DELETE


The next example demonstrates two different MapReduce operations, exposed by the Aggregate() function of the Task Repository. You will probably notice the funky brackets in between the function name and the braces "()". Jint forces you to declare the value of the Generic type, and does this through a nonstandard (ECMAScript standard) use of brackets "{}" with the specified fulfilling type. For instance, taskRepo.Aggregate{com.berico.Task} will return a Task object back at the end of the Aggregate operation.

EXAMPLE 2: AggregateExample.js

// Create some tasks
print("CREATING SOME TASKS.\n\n");

taskRepo.Create(
      "Richard", "Schedule Vacation",
      "You need one, and soon.",
      System.DateTime.Now.AddHours(8),
      "http://www.bericotechnologies.com/form?name=dothis2");

taskRepo.Create(
      "Richard", "Where did all the love go?",
      "Buy flowers and candy for the wife.",
      System.DateTime.Now.AddMonths(5),
      "http://www.bericotechnologies.com/form?name=dothisNOW");

taskRepo.Create(
      "Bob", "Need this now.",
      "TPS Coversheets are mandatory Bob!",
      System.DateTime.Now.AddMinutes(5),
      "http://www.bericotechnologies.com/form?name=dothisNOW");

taskRepo.Create(
      "Richard", "Immediate Tasker",
      "Figure out cloud strategy.  Do we really need SOA?",
      System.DateTime.Now.AddMinutes(1),
      "http://www.bericotechnologies.com/form?name=dothisNOW");

taskRepo.Create(
      "Bob", "Buy Plane Tickets",
      "Richard needs a vacation, get him tickets to Australia.",
      System.DateTime.Now.AddDays(2),
      "http://www.bericotechnologies.com/form?name=vacation");

// Let's count all tasks for Richard in the Repository
var numOfTasksForRichard =
  taskRepo.Aggregate{System.Int32}(

    function (task) {
      return (task.Owner == "Richard") ? 1 : 0;
    },

    function (markers) {
      var total = 0;
      for (var mark in markers) {
        if (mark == 1) {
          total++;
        }
      }
      return total;
    });

//Display the number of tasks for Rich
print(format("Number of tasks for Richard = {0}\n\n", 
    numOfTasksForRichard));

// Find the Earliest Task in the Repository
var earliestTask = 
    taskRepo.Aggregate{com.berico.Task}(
    
    function (task){
      return task;
    },

    function (tasks){
      var earliest = tasks[0];
      for(var task in tasks){
        if(task.Expiration < earliest.Expiration){
          earliest = task;
        }
      }
      return earliest;
    });


//Dump the task
print(earliestTask);
The output from our aggregate operations.
CREATING SOME TASKS.

Number of tasks for Richard = 3

com.berico.Task {
  ID: 7
  Owner: Richard
  Summary: Immediate Tasker
  Description: Figure out cloud strategy.  Do we really need SOA?
  Expiration: 3/22/2011 9:56:33 PM
  Form URL: http://www.bericotechnologies.com/form?name=dothisNOW
}
I hope you found this post interesting. I really do believe this sort of hosted application interpretation and executing is the future of distributed computing, but also have their place in API's that have a need for scripting or easy to implement extension points. I also want to thank the creators of Jint. I evaluated a number of JavaScript interpreters for the purpose of this post, and Jint was the only one mature enough to support the functionality I need; in fact, I was shocked at some of the features it did have (allowing me to pass anonymous functions in JavaScript as .NET delegates was a major surprise!).
Happy coding,
Richard

Monday, March 14, 2011

RabbitMQ with Thrift Serialization (Part 2 - C#)

I wanted to quickly follow up the last post on RabbitMQ with Thrift Serialization and add some examples in C#. I promise this post will be a lot more code and less chatter.

Let first start by mentioning that all of the following code was written, successfully compiled, and ran using Mono  (developed in MonoDevelop 2.6).  So yes, this code is portable!

In the previous post, we generated the C# Notification class from the Thrift template using the following command:
thrift --gen csharp model.thrift

I won't expose the generated code to you, it needs to be cleaned up quite a bit, and it is not really necessary for the purpose of this demo.  If you would like, I could upload the source to GitHub on request.

Change: 3/18/2011

In the examples below, I'm performing the Thrift serialization with my own Generics-enabled marshaller class. The following is that marshaller class.

using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;

namespace RabbitMQClient
{

  public static class ThriftMarshaller
  {    
    public static T deserialize<T>(byte[] payload) 
      where T : TBase, new() {
      
      MemoryStream stream = new MemoryStream(payload);
      TProtocol protocol = new TBinaryProtocol(
        new TStreamTransport(stream, stream));
      T t = new T();
      t.Read(protocol);
      return t;
    }
    
    public static byte[] serialize<T>(T objectToSerialize) 
      where T : TBase {
      
      MemoryStream stream = new MemoryStream();
      TProtocol protocol = new TBinaryProtocol(
        new TStreamTransport(stream, stream));
      objectToSerialize.Write(protocol);
      return stream.ToArray();
    }
  }
}

Let's start by examining the Message Producer code (essentially an almost line for line port of the Java demo):
using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageProducer
  {
  
    public static string QUEUE_NAME 
             = "com.berico.notifications";
  
    public static string HOST = "localhost";
  
    public static void Main(string[] args){
   
      try {
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
        //Open a new connection
        IConnection connection = factory.CreateConnection();
        //IModel is the abstraction for interacting with a queue
        IModel channel = connection.CreateModel();
        //It's possible the Queue doesn't already exist (a producer
        //may not have been instantiated yet).  Create the queue if
        //it doesn't exist
        channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
        //Create an instance of our Domain object.
        //Keep in mind this is a Thrift generated bean.
        Notification notification = new Notification
        {
          Uid = 1, 
          Principals = new List(
             new string[] { "richard", "john" }),
          Subject = "Test Message",
          Message = "Hope this finds you well",
          Created = DateTime.Now.Ticks
         };

        /* Thank you Raa (http://chrisraa.blogspot.com/) 
           for finding the bug in the original post.
           (I had added a initialization constructor to
           the Notification class and had not posted the
           change.)
         */
    
        //Comfort logging
        Console.WriteLine(
          " [*] Waiting for messages. To exit press CTRL+C");
    
        //Serialize the Notification object into a byte array
        byte[] payload = 
          ThriftMarshaller.serialize<Notification>(notification);
    
        //Do this 10 times
        for(int i = 0; i < 10; i++){
        
          //Publish message on the queue using our domain
          //object as the message (or payload)
          channel.BasicPublish("", QUEUE_NAME, null, payload);
        
          //Print a friendly message
          Console.WriteLine(" [x] Sent '{0}'", notification);
        }
      
        //Close the channel
        channel.Close();
        //Close the connection
        connection.Close();
       
       } catch(Exception e){
         //Dump any error to the console
         Console.WriteLine(e.Message);
       }   
     } 
  }
}

The Message Consumer is a little different. In this next demo code, I'm only going to consume one message from the Queue and then terminate. The .NET client API for RabbitMQ is a little different than the Java API, making use of .NET events/delegates. Instead of writing all this code, RabbitMQ has a "pattern" you can extend which will handle all of the annoying scaffolding code necessary for handling these events.
using System;
using System.IO;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageConsumer
  {
    
    public static string QUEUE_NAME 
        = "com.berico.notifications";
    
    public static string HOST = "localhost";
    
    public static void Main (string[] args)
    {
      try {
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
        //Open a new connection
        IConnection connection = factory.CreateConnection();
        //IModel is the abstraction for interacting with a queue
        IModel channel = connection.CreateModel();
        //It's possible the Queue doesn't already exist (a producer
        //may not have been instantiated yet).  Create the queue if
        //it doesn't exist
        channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
        //Comfort logging
        Console.WriteLine(
          " [*] Waiting for messages. To exit press CTRL+C");
        //Queueing Consumer is a buffer for messages, 
        //allowing you handle messages asynchronously 
        //(instead of acting on the message immediately).
        QueueingBasicConsumer consumer = 
          new QueueingBasicConsumer(channel);
        //Register the consumer on the channel
        channel.BasicConsume(QUEUE_NAME, true, consumer);
        //Wait for the next message to arrive in the queue
        BasicDeliverEventArgs e = 
          (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        //Acknowledge the Delivery of the message
        channel.BasicAck(e.DeliveryTag, false);
        //Deserialize the Notification object from 
        //the byte array (body) of the message
        Notification notification = 
          ThriftMarshaller.deserialize<Notification>(e.Body);
        //Print the notification to the console.
        Console.WriteLine(
          " [x] Received '{0}'", notification);
        
      } catch(Exception e){
        //Dump any error to the console
        Console.WriteLine(e.Message);
      }
    }
  }
}

Extending the SimpleRpcServer class in the RabbitMQ.Client.MessagePatterns makes consuming an AMQP queue much easier.  Consider the following code, which is not limited to acting on only one message in the queue:
using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;
using RabbitMQ.Client.MessagePatterns;

namespace RabbitMQClient
{
  public class AmqpRpcServer : SimpleRpcServer
  {
    
    public static void Main (string[] args)
    {
      //Create a connection factory
      ConnectionFactory factory = new ConnectionFactory();
      //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
      //Create a new connection to the RabbitMQ Server
      using (IConnection conn = factory.CreateConnection()) {
        //Create a new channel with the server
        using (IModel ch = conn.CreateModel ()) {
          //Subscribe to our queue on this channel
          Subscription sub = new Subscription (ch, QUEUE_NAME);
          //Instantiate our RPC Server that
          //watches the queue for new messages
          //and reacts to those messages
          new AmqpRpcServer(sub).MainLoop ();
        }
      }
    }
    
    public static string QUEUE_NAME 
        = "com.berico.notifications";
    
    public static string HOST = "localhost";
    
    //Intialize the Server using the provided
    //channel/queue subscription
    public AmqpRpcServer(Subscription sub) 
        : base(sub) {}
    
    //Process an incoming message
    public override void ProcessRequest (BasicDeliverEventArgs evt)
    {
      //Deserialize our notification using the
      //message body (byte array)
      Notification notification = 
        ThriftMarshaller.deserialize<Notification>(evt.Body);
      
      //Print the notification to the console.
      Console.WriteLine(
        " [x] Received '{0}'", notification);
      
      //Allow the base class send acknowledgement, etc.
      base.ProcessRequest (evt);
    }
  }
}

As you can see, using both Apache Thrift and SpringSource RabbitMQ in .NET is a piece of cake.

Richard

Saturday, March 12, 2011

RabbitMQ with Thrift Serialization (Part 1 - Java)

I’m tired of SOAP!  Our Service Oriented Architecture relies heavily on asynchronous SOAP calls to coordinate actions amongst distributed workflow participants.  Since the team has started the project, I feel like we have encountered every bug a developer could encounter with a technology.  We have had serialization issues between .NET and Java’s implementation of SOAP.  We’ve also had to write a special SOAP Handler to append a namespace on a specific XML element so JAXB could parse the message body [1].  Our latest problem with SOAP (though not really a SOAP issue) is a class-loading conflict in JBoss AS 6, between Apache CXF and the Spring Framework [2].

SOAP has consumed too much of our time and we have gained little functionality in return.  This has inspired us to search for alternative communication mechanisms.  The only real requirements we had for our communication protocol and serialization framework(s) were:

  • Interoperable serialization between Java and .NET
  • Reliable communication
  • Needs to be fast

My boss (CTO of Berico Technologies) suggested we take a look at AMQP.  After a quick review, I came to the conclusion that the AMQP specification would meet these requirements and we wouldn’t have any fear of vendors walking away from the technology (the protocol was gaining traction).  After a quick survey of the available open source implementations, two platforms stuck out in my mind: Apache QPid and RabbitMQ.  For this tutorial, I’ve arbitrarily chosen to use RabbitMQ.

Now that I had identified a communication mechanism, the next requirement was to find a cross-language serialization framework; the most obvious options were XML and JSON.  While there is great support for constructing and parsing these formats, the cost of transformation tends to be pretty high in terms of performance (especially if you are transmitting a lot of data).  Also, I did not want to run into some of the problems we had previously experienced using JAXB.  There are not too many other alternatives for cross-language serialization (unless you build your own).  I’ve known about both Google’s ProtoBuf and Apache Thrift for a while, and decided to go with Thrift, which seemed to have better .NET support.

Ok, enough babbling, more code.

SCENARIO

We have a requirement to send and receive notifications amongst users (or principals) in our system.  Our notification object will require the following properties:

ID (Integer)
Principals (List<String>)
Subject (String)
Message (String)
Timestamp for Creation (64 bit Integer)

One client will publish a notification message and another will consume it using AMQP.  We will also generate our model object (Notification) from a Thrift definition.

SERIALIZATION

Let's start by creating our model object.  Thrift has it's own language for defining cross-platform objects.  The schema is incredibly simple:

model.thrift
struct Notification {
  1: i32 uid,
  2: list<string> principals,
  3: string subject,
  4: string message,
  5: i64 created
}

Now we need to build our Java (and optionally .NET) source files using the Thrift utility. Please note that I've added the Thrift "bin" directory to my console's "PATH":

thrift --gen java:beans model.thrift
thrift --gen csharp model.thrift


OK we've generated our model object, we now need to write some code to utilize RabbitMQ. For the purpose of this demo, you will (obviously) need to install RabbitMQ. The folks at SpringSource provide a great "Windows Bundle" (http://www.rabbitmq.com/server.html#windows_bundle) which has everything you will need for this project. It even includes the Erlang Runtime, the language RabbitMQ is built upon. Install Erlang and unzip the RabbitMQ Server archive in a location of your choice. Finally, add the "ERLANG_HOME" environment variable to your Operating System, where the value of "ERLANG_HOME" is the install path of Erlang. If you have any trouble, please refer to the RabbitMQ website.

Once RabbitMQ is installed, go ahead and start the server (rabbitmq-server.bat) in the "sbin" directory of the RabbitMQ folder. Here's the output from my console:


Now we can interact with the message queue.  Let's start by writing a message consumer.  Please follow the comment in the source code!

package com.berico.amqp;

import org.apache.thrift.TDeserializer;

import com.berico.model.Notification;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Consumes AMQP Messages from the "com.berico.notifications"
 * channel, deserializing the message (using the Thrift API),
 * and printing the contents to the console.
 * @author Richard C. (Berico Technologies)
 */
public class MessageConsumer {

  //The Queue we will be consuming
  public static String QUEUE_NAME = "com.berico.notifications";
  //The Host of the RabbitMQ
  public static String HOST = "localhost";
 
  /**
   * Most of this code is a replication of the RabbitMQ tutorial:
   * http://www.rabbitmq.com/tutorials/tutorial-one-java.html
   * @param args Not used.
   */
  public static void main(String... args){
 
    try {
  
      //Create a connection factory
      ConnectionFactory factory = new ConnectionFactory();
      //Set the host to the location of the RabbitMQ server
      factory.setHost(HOST);
      //Open a new connection
      Connection connection = factory.newConnection();
      //Channel is the abstraction for interacting with a queue
      Channel channel = connection.createChannel();
      //It's possible the Queue doesn't already exist (a producer
      //may not have been instantiated yet).  Create the queue if
      //it doesn't exist
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      //Comfort logging
      System.out.println(
         " [*] Waiting for messages. To exit press CTRL+C");
      //Queueing Consumer is a buffer for messages, 
      //allowing you handle messages asynchronously 
      //(instead of acting on the message immediately).
      QueueingConsumer consumer = new QueueingConsumer(channel);
      //Register the consumer on the channel
      channel.basicConsume(QUEUE_NAME, true, consumer);

      //Loop until the application is terminated by the user
      while (true) {
        
        //Wait for the next message to arrive in the queue
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      
        //Create a empty notification (we will fill this with the
        //contents of the Thrift serialized payload)
        Notification notification = new Notification();
        //Instantiate the Thrift Deserializer
        TDeserializer deserializer = new TDeserializer();
        //Deserialize the body of the AMQP message, setting the state
        //on the empty notification object
        deserializer.deserialize(notification, delivery.getBody());
      
        //Print the notification to the console.
        System.out.println(" [x] Received '" 
                + notification.toString() + "'");
      }
      
    } catch(Exception e){
      //Dump any error to the console
      e.printStackTrace();
    }
  }
}

Now we need an application to produce messages on the Channel so the Consumer and pull them from the queue and display the payload:

package com.berico.amqp;

import java.util.Arrays;

import org.apache.thrift.TSerializer;

import com.berico.model.Notification;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

/**
 * Produces AMQP Messages on the "com.berico.notifications"
 * channel after serializing a message (using the Thrift API).
 * @author Richard C. (Berico Technologies)
 */
public class MessageProducer {

  //The Queue we will be consuming
  public static String QUEUE_NAME = "com.berico.notifications";
  //The Host of the RabbitMQ
  public static String HOST = "localhost";
 
  public static void main(String... args){
  
    //Create an instance of our Domain object.
    //Keep in mind this is a Thrift generated bean.
    Notification notification = new Notification(
      1, 
      Arrays.asList(new String[]{ "richard", "john" }), 
      "Test Message", 
      "Hope this finds you well", 
      System.currentTimeMillis());
  
      try {
  
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.setHost(HOST);
        //Open a new connection
        Connection connection = factory.newConnection();
        //Channel is the abstraction for interacting with a queue
        Channel channel = connection.createChannel();
        //Create the Queue if it does not exist
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      
        //Create a new instance of the Thrift Serializer
        TSerializer serializer = new TSerializer();
        //Serialize our domain object to a byte array
        byte[] payload = serializer.serialize(notification);
      
        //Do this 10 times
        for(int i = 0; i < 10; i++){
       
          //Publish message on the queue using our domain
          //object as the message (or payload)
          channel.basicPublish("", QUEUE_NAME, null, payload);
       
          //Print a friendly message
          System.out.println(
             " [x] Sent '" 
             + notification.toString() + "'");
        }
      
        //Close the channel
        channel.close();
        //Close the connection
        connection.close();
      
      } catch (Exception e) {
        //Dump any exception to the console
        e.printStackTrace();
      } 
   }
}

That's it for the code, now let's see it in action. We'll start by initiating the Consumer:

[*] Waiting for messages. To exit press CTRL+C

Now let's spawn the Producer:

[x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

And Finally, here are the messages being received by the Consumer:

[x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well,
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

AMQP and Apache Thrift, used together, offer a great alternative to SOAP over HTTP. For our project, we get the benefits of cross-language serialization and reliable transport, with half the effort!

Footnotes

[1]. This gentlemen had the same JAXB problem:
http://stackoverflow.com/questions/1871060/jaxb-unmarshalling-ignoring-namespace-turns-element-attributes-into-null

[2]. From the JBoss Forums:
http://community.jboss.org/thread/160615
http://community.jboss.org/thread/163899?tstart=0
http://community.jboss.org/thread/161545?tstart=0