Monday, March 14, 2011

RabbitMQ with Thrift Serialization (Part 2 - C#)

I wanted to quickly follow up the last post on RabbitMQ with Thrift Serialization and add some examples in C#. I promise this post will be a lot more code and less chatter.

Let first start by mentioning that all of the following code was written, successfully compiled, and ran using Mono  (developed in MonoDevelop 2.6).  So yes, this code is portable!

In the previous post, we generated the C# Notification class from the Thrift template using the following command:
thrift --gen csharp model.thrift

I won't expose the generated code to you, it needs to be cleaned up quite a bit, and it is not really necessary for the purpose of this demo.  If you would like, I could upload the source to GitHub on request.

Change: 3/18/2011

In the examples below, I'm performing the Thrift serialization with my own Generics-enabled marshaller class. The following is that marshaller class.

using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;

namespace RabbitMQClient
{

  public static class ThriftMarshaller
  {    
    public static T deserialize<T>(byte[] payload) 
      where T : TBase, new() {
      
      MemoryStream stream = new MemoryStream(payload);
      TProtocol protocol = new TBinaryProtocol(
        new TStreamTransport(stream, stream));
      T t = new T();
      t.Read(protocol);
      return t;
    }
    
    public static byte[] serialize<T>(T objectToSerialize) 
      where T : TBase {
      
      MemoryStream stream = new MemoryStream();
      TProtocol protocol = new TBinaryProtocol(
        new TStreamTransport(stream, stream));
      objectToSerialize.Write(protocol);
      return stream.ToArray();
    }
  }
}

Let's start by examining the Message Producer code (essentially an almost line for line port of the Java demo):
using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageProducer
  {
  
    public static string QUEUE_NAME 
             = "com.berico.notifications";
  
    public static string HOST = "localhost";
  
    public static void Main(string[] args){
   
      try {
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
        //Open a new connection
        IConnection connection = factory.CreateConnection();
        //IModel is the abstraction for interacting with a queue
        IModel channel = connection.CreateModel();
        //It's possible the Queue doesn't already exist (a producer
        //may not have been instantiated yet).  Create the queue if
        //it doesn't exist
        channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
        //Create an instance of our Domain object.
        //Keep in mind this is a Thrift generated bean.
        Notification notification = new Notification
        {
          Uid = 1, 
          Principals = new List(
             new string[] { "richard", "john" }),
          Subject = "Test Message",
          Message = "Hope this finds you well",
          Created = DateTime.Now.Ticks
         };

        /* Thank you Raa (http://chrisraa.blogspot.com/) 
           for finding the bug in the original post.
           (I had added a initialization constructor to
           the Notification class and had not posted the
           change.)
         */
    
        //Comfort logging
        Console.WriteLine(
          " [*] Waiting for messages. To exit press CTRL+C");
    
        //Serialize the Notification object into a byte array
        byte[] payload = 
          ThriftMarshaller.serialize<Notification>(notification);
    
        //Do this 10 times
        for(int i = 0; i < 10; i++){
        
          //Publish message on the queue using our domain
          //object as the message (or payload)
          channel.BasicPublish("", QUEUE_NAME, null, payload);
        
          //Print a friendly message
          Console.WriteLine(" [x] Sent '{0}'", notification);
        }
      
        //Close the channel
        channel.Close();
        //Close the connection
        connection.Close();
       
       } catch(Exception e){
         //Dump any error to the console
         Console.WriteLine(e.Message);
       }   
     } 
  }
}

The Message Consumer is a little different. In this next demo code, I'm only going to consume one message from the Queue and then terminate. The .NET client API for RabbitMQ is a little different than the Java API, making use of .NET events/delegates. Instead of writing all this code, RabbitMQ has a "pattern" you can extend which will handle all of the annoying scaffolding code necessary for handling these events.
using System;
using System.IO;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageConsumer
  {
    
    public static string QUEUE_NAME 
        = "com.berico.notifications";
    
    public static string HOST = "localhost";
    
    public static void Main (string[] args)
    {
      try {
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
        //Open a new connection
        IConnection connection = factory.CreateConnection();
        //IModel is the abstraction for interacting with a queue
        IModel channel = connection.CreateModel();
        //It's possible the Queue doesn't already exist (a producer
        //may not have been instantiated yet).  Create the queue if
        //it doesn't exist
        channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
        //Comfort logging
        Console.WriteLine(
          " [*] Waiting for messages. To exit press CTRL+C");
        //Queueing Consumer is a buffer for messages, 
        //allowing you handle messages asynchronously 
        //(instead of acting on the message immediately).
        QueueingBasicConsumer consumer = 
          new QueueingBasicConsumer(channel);
        //Register the consumer on the channel
        channel.BasicConsume(QUEUE_NAME, true, consumer);
        //Wait for the next message to arrive in the queue
        BasicDeliverEventArgs e = 
          (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        //Acknowledge the Delivery of the message
        channel.BasicAck(e.DeliveryTag, false);
        //Deserialize the Notification object from 
        //the byte array (body) of the message
        Notification notification = 
          ThriftMarshaller.deserialize<Notification>(e.Body);
        //Print the notification to the console.
        Console.WriteLine(
          " [x] Received '{0}'", notification);
        
      } catch(Exception e){
        //Dump any error to the console
        Console.WriteLine(e.Message);
      }
    }
  }
}

Extending the SimpleRpcServer class in the RabbitMQ.Client.MessagePatterns makes consuming an AMQP queue much easier.  Consider the following code, which is not limited to acting on only one message in the queue:
using System;
using System.IO;
using System.Collections.Generic;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;
using RabbitMQ.Client.MessagePatterns;

namespace RabbitMQClient
{
  public class AmqpRpcServer : SimpleRpcServer
  {
    
    public static void Main (string[] args)
    {
      //Create a connection factory
      ConnectionFactory factory = new ConnectionFactory();
      //Set the host to the location of the RabbitMQ server
        factory.HostName = HOST;
      //Create a new connection to the RabbitMQ Server
      using (IConnection conn = factory.CreateConnection()) {
        //Create a new channel with the server
        using (IModel ch = conn.CreateModel ()) {
          //Subscribe to our queue on this channel
          Subscription sub = new Subscription (ch, QUEUE_NAME);
          //Instantiate our RPC Server that
          //watches the queue for new messages
          //and reacts to those messages
          new AmqpRpcServer(sub).MainLoop ();
        }
      }
    }
    
    public static string QUEUE_NAME 
        = "com.berico.notifications";
    
    public static string HOST = "localhost";
    
    //Intialize the Server using the provided
    //channel/queue subscription
    public AmqpRpcServer(Subscription sub) 
        : base(sub) {}
    
    //Process an incoming message
    public override void ProcessRequest (BasicDeliverEventArgs evt)
    {
      //Deserialize our notification using the
      //message body (byte array)
      Notification notification = 
        ThriftMarshaller.deserialize<Notification>(evt.Body);
      
      //Print the notification to the console.
      Console.WriteLine(
        " [x] Received '{0}'", notification);
      
      //Allow the base class send acknowledgement, etc.
      base.ProcessRequest (evt);
    }
  }
}

As you can see, using both Apache Thrift and SpringSource RabbitMQ in .NET is a piece of cake.

Richard

2 comments:

  1. Great Post Richard! Everything worked except the the 5 argument constructor for the Notification was rejected (NET 4.0 with thrift 0.6.1). The following properties constructor worked fine.

    Notification notification = new Notification
    {
    Uid = 1,
    Principals = new List(new string[] { "richard", "john" }),
    Subject = "Test Message",
    Message = "Hope this finds you well",
    Created = DateTime.Now.Ticks
    };

    ReplyDelete
  2. Raa,

    Sorry about that. I added the 'initialization constructor' to the Thrift-generated Notification class. You are absolutely right, without that constructor, you should use the Property-Constructor.

    I will make the changes to the code and credit you for this!

    Thanks,

    Richard

    ReplyDelete

Note: Only a member of this blog may post a comment.