Saturday, March 12, 2011

RabbitMQ with Thrift Serialization (Part 1 - Java)

I’m tired of SOAP!  Our Service Oriented Architecture relies heavily on asynchronous SOAP calls to coordinate actions amongst distributed workflow participants.  Since the team has started the project, I feel like we have encountered every bug a developer could encounter with a technology.  We have had serialization issues between .NET and Java’s implementation of SOAP.  We’ve also had to write a special SOAP Handler to append a namespace on a specific XML element so JAXB could parse the message body [1].  Our latest problem with SOAP (though not really a SOAP issue) is a class-loading conflict in JBoss AS 6, between Apache CXF and the Spring Framework [2].

SOAP has consumed too much of our time and we have gained little functionality in return.  This has inspired us to search for alternative communication mechanisms.  The only real requirements we had for our communication protocol and serialization framework(s) were:

  • Interoperable serialization between Java and .NET
  • Reliable communication
  • Needs to be fast

My boss (CTO of Berico Technologies) suggested we take a look at AMQP.  After a quick review, I came to the conclusion that the AMQP specification would meet these requirements and we wouldn’t have any fear of vendors walking away from the technology (the protocol was gaining traction).  After a quick survey of the available open source implementations, two platforms stuck out in my mind: Apache QPid and RabbitMQ.  For this tutorial, I’ve arbitrarily chosen to use RabbitMQ.

Now that I had identified a communication mechanism, the next requirement was to find a cross-language serialization framework; the most obvious options were XML and JSON.  While there is great support for constructing and parsing these formats, the cost of transformation tends to be pretty high in terms of performance (especially if you are transmitting a lot of data).  Also, I did not want to run into some of the problems we had previously experienced using JAXB.  There are not too many other alternatives for cross-language serialization (unless you build your own).  I’ve known about both Google’s ProtoBuf and Apache Thrift for a while, and decided to go with Thrift, which seemed to have better .NET support.

Ok, enough babbling, more code.

SCENARIO

We have a requirement to send and receive notifications amongst users (or principals) in our system.  Our notification object will require the following properties:

ID (Integer)
Principals (List<String>)
Subject (String)
Message (String)
Timestamp for Creation (64 bit Integer)

One client will publish a notification message and another will consume it using AMQP.  We will also generate our model object (Notification) from a Thrift definition.

SERIALIZATION

Let's start by creating our model object.  Thrift has it's own language for defining cross-platform objects.  The schema is incredibly simple:

model.thrift
struct Notification {
  1: i32 uid,
  2: list<string> principals,
  3: string subject,
  4: string message,
  5: i64 created
}

Now we need to build our Java (and optionally .NET) source files using the Thrift utility. Please note that I've added the Thrift "bin" directory to my console's "PATH":

thrift --gen java:beans model.thrift
thrift --gen csharp model.thrift


OK we've generated our model object, we now need to write some code to utilize RabbitMQ. For the purpose of this demo, you will (obviously) need to install RabbitMQ. The folks at SpringSource provide a great "Windows Bundle" (http://www.rabbitmq.com/server.html#windows_bundle) which has everything you will need for this project. It even includes the Erlang Runtime, the language RabbitMQ is built upon. Install Erlang and unzip the RabbitMQ Server archive in a location of your choice. Finally, add the "ERLANG_HOME" environment variable to your Operating System, where the value of "ERLANG_HOME" is the install path of Erlang. If you have any trouble, please refer to the RabbitMQ website.

Once RabbitMQ is installed, go ahead and start the server (rabbitmq-server.bat) in the "sbin" directory of the RabbitMQ folder. Here's the output from my console:


Now we can interact with the message queue.  Let's start by writing a message consumer.  Please follow the comment in the source code!

package com.berico.amqp;

import org.apache.thrift.TDeserializer;

import com.berico.model.Notification;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Consumes AMQP Messages from the "com.berico.notifications"
 * channel, deserializing the message (using the Thrift API),
 * and printing the contents to the console.
 * @author Richard C. (Berico Technologies)
 */
public class MessageConsumer {

  //The Queue we will be consuming
  public static String QUEUE_NAME = "com.berico.notifications";
  //The Host of the RabbitMQ
  public static String HOST = "localhost";
 
  /**
   * Most of this code is a replication of the RabbitMQ tutorial:
   * http://www.rabbitmq.com/tutorials/tutorial-one-java.html
   * @param args Not used.
   */
  public static void main(String... args){
 
    try {
  
      //Create a connection factory
      ConnectionFactory factory = new ConnectionFactory();
      //Set the host to the location of the RabbitMQ server
      factory.setHost(HOST);
      //Open a new connection
      Connection connection = factory.newConnection();
      //Channel is the abstraction for interacting with a queue
      Channel channel = connection.createChannel();
      //It's possible the Queue doesn't already exist (a producer
      //may not have been instantiated yet).  Create the queue if
      //it doesn't exist
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      //Comfort logging
      System.out.println(
         " [*] Waiting for messages. To exit press CTRL+C");
      //Queueing Consumer is a buffer for messages, 
      //allowing you handle messages asynchronously 
      //(instead of acting on the message immediately).
      QueueingConsumer consumer = new QueueingConsumer(channel);
      //Register the consumer on the channel
      channel.basicConsume(QUEUE_NAME, true, consumer);

      //Loop until the application is terminated by the user
      while (true) {
        
        //Wait for the next message to arrive in the queue
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      
        //Create a empty notification (we will fill this with the
        //contents of the Thrift serialized payload)
        Notification notification = new Notification();
        //Instantiate the Thrift Deserializer
        TDeserializer deserializer = new TDeserializer();
        //Deserialize the body of the AMQP message, setting the state
        //on the empty notification object
        deserializer.deserialize(notification, delivery.getBody());
      
        //Print the notification to the console.
        System.out.println(" [x] Received '" 
                + notification.toString() + "'");
      }
      
    } catch(Exception e){
      //Dump any error to the console
      e.printStackTrace();
    }
  }
}

Now we need an application to produce messages on the Channel so the Consumer and pull them from the queue and display the payload:

package com.berico.amqp;

import java.util.Arrays;

import org.apache.thrift.TSerializer;

import com.berico.model.Notification;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

/**
 * Produces AMQP Messages on the "com.berico.notifications"
 * channel after serializing a message (using the Thrift API).
 * @author Richard C. (Berico Technologies)
 */
public class MessageProducer {

  //The Queue we will be consuming
  public static String QUEUE_NAME = "com.berico.notifications";
  //The Host of the RabbitMQ
  public static String HOST = "localhost";
 
  public static void main(String... args){
  
    //Create an instance of our Domain object.
    //Keep in mind this is a Thrift generated bean.
    Notification notification = new Notification(
      1, 
      Arrays.asList(new String[]{ "richard", "john" }), 
      "Test Message", 
      "Hope this finds you well", 
      System.currentTimeMillis());
  
      try {
  
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.setHost(HOST);
        //Open a new connection
        Connection connection = factory.newConnection();
        //Channel is the abstraction for interacting with a queue
        Channel channel = connection.createChannel();
        //Create the Queue if it does not exist
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      
        //Create a new instance of the Thrift Serializer
        TSerializer serializer = new TSerializer();
        //Serialize our domain object to a byte array
        byte[] payload = serializer.serialize(notification);
      
        //Do this 10 times
        for(int i = 0; i < 10; i++){
       
          //Publish message on the queue using our domain
          //object as the message (or payload)
          channel.basicPublish("", QUEUE_NAME, null, payload);
       
          //Print a friendly message
          System.out.println(
             " [x] Sent '" 
             + notification.toString() + "'");
        }
      
        //Close the channel
        channel.close();
        //Close the connection
        connection.close();
      
      } catch (Exception e) {
        //Dump any exception to the console
        e.printStackTrace();
      } 
   }
}

That's it for the code, now let's see it in action. We'll start by initiating the Consumer:

[*] Waiting for messages. To exit press CTRL+C

Now let's spawn the Producer:

[x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Sent 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

And Finally, here are the messages being received by the Consumer:

[x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well,
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

 [x] Received 'Notification(uid:1, principals:[richard, john], 
subject:Test Message, message:Hope this finds you well, 
created:1299941797329)'

AMQP and Apache Thrift, used together, offer a great alternative to SOAP over HTTP. For our project, we get the benefits of cross-language serialization and reliable transport, with half the effort!

Footnotes

[1]. This gentlemen had the same JAXB problem:
http://stackoverflow.com/questions/1871060/jaxb-unmarshalling-ignoring-namespace-turns-element-attributes-into-null

[2]. From the JBoss Forums:
http://community.jboss.org/thread/160615
http://community.jboss.org/thread/163899?tstart=0
http://community.jboss.org/thread/161545?tstart=0

1 comment:

  1. Nice article , you have indeed covered topic in details with benchmarking result and graphics.

    Thanks
    Javin
    Top 10 Java Serialization Interview Question

    ReplyDelete