Sunday, July 3, 2011

Introducing Microsoft's Reactive Extensions (Rx) Framework

In case you are wondering what all of the "hoopla" is about, Microsoft's newest framework Reactive Extensions (Rx) is a set of libraries simplifying the construction of event-based applications in an asynchronous environment.  Rx quite literally unifies the .NET event model with a Complex Event Processor (CEP), where the query and filter language is LINQ!

Released a few months ago (though it's been around for a while in the "experimental" state), Reactive Extensions has been ridiculously under promoted.  I think the problem is that many of us software engineers have been unclear as to what it is, or how the framework fits into our architectures.  Perhaps because of the confusion, the Rx team produced a series of tutorials on Channel 9 explaining the "how and why" of the framework: http://channel9.msdn.com/Series/Rx-Workshop.

I took the time this weekend to review the 2+ hours worth of video content to distill a couple of high-level examples for your view pleasure.
Before we get started, please ensure you have downloaded the Reactive Extensions framework (or NuGet) and added a reference to the assembly in your project.

Let's begin with the "Hello World" equivalent of Rx:
//Any IEnumerable can be converted into an IObservable stream.
//For instance, consider the string "Here is my Message".
IObservable<char> obs = "Hello World".ToObservable();

//We will subscribe onto the observable a lamda expression that will
//print each character (as it is observed) to the console.
obs.Subscribe((ob) => { Console.WriteLine("Observing '{0}'", ob); });
Console Output
Observing 'H'
Observing 'e'
Observing 'l'
Observing 'l'
Observing 'o'
Observing ' '
Observing 'W'
Observing 'o'
Observing 'r'
Observing 'l'
Observing 'd'
In the previous example, we demonstrated how an IEnumerable can be converted into a stream of observable objects. On the next line of code, we subscribe to this stream using a lambda (Action<char>) that simply prints each character to the console.

Reactive Extensions, in one way, is a formalization by Microsoft of the familiar Pub/Sub (Eventing) model. But to describe the framework as some fancy "Pub/Sub" system would be insulting. In the next example, we will demonstrate how Rx also provides an abstraction to asynchronous programming.

//Invoke an action asynchronously
var o = Observable.Start(
  //Here is our Action<Unit> (void)
  () => {
    Console.WriteLine("Starting");
    Thread.Sleep(3000);
    Console.WriteLine("Done");
  }
);

//Block until the asynchronous action
//is done executing (or more appropriately,
//block until the first element of the IObservable<Unit>
//is returned).
o.First();
Console Output
Starting
Done
In the next example, we'll demonstrate how to use Pub/Sub in Rx without relying on an IEnumerable. Rx provides an interface ISubject and implementation Subject that is both IObservable and IObserver. This means that Subject objects can both be produce events and be subscribed onto. We will use a subject to register handlers and then feed objects into the Subject's data stream.

//Subjects are observable sequences as well an observer.
//You can subscribe lamdas onto the subject and add data
//to the stream.
ISubject<int> subject = new Subject<int>();

//Subscribe onto our subject with three lamdas:
// OnNext, OnError, and OnCompleted.
subject.Subscribe<int>(
    //On Next
    (item) => {
        Console.WriteLine("Next item: {0}", item);
    }, 
    //On Error
    (ex) => {
        Console.WriteLine("An exception occurred: {0}", ex.Message);
    }, 
    //On Completed
    () => {
        Console.WriteLine("Completed.");
    });

//Now let's manually add data to the subject stream
for (int i = 0; i < 10; i++)
{
    //Add data by calling OnNext
    subject.OnNext(i);
}
//Signal completion by calling OnCompleted
subject.OnCompleted();
Console Output
Next item: 0
Next item: 1
Next item: 2
Next item: 3
Next item: 4
Next item: 5
Next item: 6
Next item: 7
Next item: 8
Next item: 9
Completed.
So far the demos have been quite trivial. We'll conclude with a much more practical demo using Rx and WPF.

In this example, we are going to capture the total CPU utilization % every second and plot that utilization on a line chart (powered by Visiblox).

The XAML is pretty standard:
<Window x:Class="ReactiveUI.MainWindow"
  xmlns=
     "http://schemas.microsoft.com/winfx/2006/xaml/presentation"
  xmlns:x=
     "http://schemas.microsoft.com/winfx/2006/xaml"
  xmlns:charts=
     "clr-namespace:Visiblox.Charts;assembly=Visiblox.Charts"
  Title="Processor Utilization" Height="300" Width="600">
    <Grid>
  <charts:Chart Name="chart" Width="550" 
                   Title="Total CPU Utilization" 
          Background="Transparent" Margin="0" 
          LegendVisibility="Collapsed" >
      <!-- Add zooming and a trackball -->
      <charts:Chart.Behaviour>
    <charts:BehaviourManager x:Name="behaviourManager" 
                             AllowMultipleEnabled="True">
        <charts:TrackballBehaviour x:Name="track" />
        <charts:ZoomBehaviour />
    </charts:BehaviourManager>
      </charts:Chart.Behaviour>
      <!-- Define x and y axes. -->
      <charts:Chart.XAxis>
    <charts:DateTimeAxis ShowMinorTicks="False" 
                            ShowGridlines="False">
        <charts:DateTimeAxis.Range>
      <charts:DateTimeRange />
        </charts:DateTimeAxis.Range>
    </charts:DateTimeAxis>
      </charts:Chart.XAxis>
      <charts:Chart.YAxis>
    <charts:LinearAxis LabelFormatString="0'%" 
                          ShowMinorTicks="True" 
           ShowGridlines="True" Title="Utilization">
        <charts:LinearAxis.Range>
      <charts:DoubleRange Minimum="0" Maximum="110"/>
        </charts:LinearAxis.Range>
    </charts:LinearAxis>
      </charts:Chart.YAxis>
  </charts:Chart>
    </Grid>
</Window>
Here is the "code-behind". Follow the inline comments for a description of what's happening.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using Visiblox.Charts;

namespace ReactiveUI
{
  /// <summary>
  /// Interaction logic for MainWindow.xaml
  /// </summary>
  public partial class MainWindow : Window
  {
    /// <summary>
    /// The CPU Counter we will be using to monitor CPU utilization.
    /// </summary>
    private PerformanceCounter cpuCounter 
      = new PerformanceCounter(
              "Processor", "% Processor Time", "_Total");

    /// <summary>
    /// Default constructor for the Window
    /// </summary>
    public MainWindow()
    {
      //Initialize the GUI components
      InitializeComponent();

      //Set the min and max range for the chart.
      //Unfortunately, if we don't do this, the Visiblox
      //chart won't autoscale nicely (we need an interval
      //of at minimum a second!).
      IRange range2 = chart.XAxis.CreateRange();
      range2.Minimum = DateTime.Now;
      range2.Maximum = DateTime.Now.AddSeconds(60);
      chart.XAxis.Range = range2;

      //Now we are going to set up the observer.
      //We will keep a reference to the 
      //IObservable<CpuUtilizationInstant>
      //as "performanceObserver".
      var performanceObserver = 
        //Collect the "long" timespan offset
        //produced by the interval in the variable
        //"counter"
        from counter 
        //The "Interval" method will create
        //a collection of observable events,
        //with each event occurring at the defined
        //'interval'.
        in Observable.Interval(
          //We will defined the interval
          //to be every seconds
          TimeSpan.FromSeconds(1), 
          //And since we want our subscribers
          //to be able to modify the state of the
          //UI, we need to be able to synchronize
          //between the UI thread and the event thread.
          //The Rx extension for WPF includes the
          //DispatcherScheduler for this exact
          //situation.
          new DispatcherScheduler(this.Dispatcher))
        //Finally, we want to create and return a new
        //reading of the CPU utilization (storing it
        //in our custom-made structure).
        select new CpuUtilizationInstant(
          DateTime.Now, cpuCounter.NextValue());

      /*
       // Without comments, it's simply:
       
       var performanceObserver = 
        from counter 
        in Observable.Interval(
          TimeSpan.FromSeconds(1), 
          new DispatcherScheduler(this.Dispatcher))
        select new CpuUtilizationInstant(
          DateTime.Now, cpuCounter.NextValue());
       */

      //Create the dataset for a new series 
      //to display on the chart
      DataSeries<DateTime, float> dataForSeries 
        = new DataSeries<DateTime, float>("Total CPU Utilization");

      //Create a line series
      LineSeries lineSeries = new LineSeries();
      //Bind the dataset to the line series
      lineSeries.DataSeries = dataForSeries;
      //Set the thinkness of the line
      lineSeries.LineStrokeThickness = 1.5;
      //Add the line to the chart
      chart.Series.Add(lineSeries);

      //Now we will subscribe to the interval event
      //through our observer dubbed "performanceObserver"
      performanceObserver.Subscribe(
        //Handle the event using a lambda expression
        (cpuUtilizationInstant) => { 

          //Let's add a new data point on the chart
          dataForSeries.Add(
            new DataPoint<DateTime, float>(
              cpuUtilizationInstant.Instant, 
              cpuUtilizationInstant.PercentageUtilized));

          //And for debug purposes, send the instant
          //to the Debug output
          Debug.WriteLine(cpuUtilizationInstant);
      });

      
    }


    /// <summary>
    /// A simple container to hold the instantaneous reading of 
    /// CPU utilization.
    /// </summary>
    public struct CpuUtilizationInstant
    {
      /// <summary>
      /// The moment the CPU Utilization was recorded
      /// </summary>
      private DateTime instant;

      /// <summary>
      /// The utilization percentage at this recorded moment
      /// </summary>
      private float cpuUtilizationPercentage;

      /// <summary>
      /// Instantiate the struct with the time the instance was recorded
      /// and the percentage of CPU utilization at that moment.
      /// </summary>
      /// <param name="timeTaken">Time the utilization was recorded</param>
      /// <param name="cpuUtilizationPercentage">CPU Utilization (%)</param>
      public CpuUtilizationInstant(
                 DateTime timeTaken, float cpuUtilizationPercentage)
      {
        this.instant = timeTaken;
        this.cpuUtilizationPercentage = cpuUtilizationPercentage;

      }

      /// <summary>
      /// Get the instant the utilization was recorded
      /// </summary>
      public DateTime Instant { 
        get { return this.instant; } 
      }


      /// <summary>
      /// Get the percentage the CPU was utilized at
      /// this moment.
      /// </summary>
      public float PercentageUtilized { 
        get { return this.cpuUtilizationPercentage; } 
      }

      /// <summary>
      /// String representation of the current state of this
      /// struct instance.
      /// Ex Output: "At 11:06:04 AM, the Cpu was 5.086277% utilized." 
      /// </summary>
      /// <returns>String representation of the struct</returns>
      public override string ToString()
      {
        return string.Format("At {0}, the Cpu was {1}% utilized.", 
          this.instant.ToLongTimeString(), this.cpuUtilizationPercentage);
      }
    }
  }
}
The following is a video of the CPU Utilization demo in action!

Sample of the Debug Output
At 10:28:48 PM, the Cpu was 0% utilized.
At 10:28:49 PM, the Cpu was 23.15563% utilized.
At 10:28:50 PM, the Cpu was 22.47902% utilized.
At 10:28:51 PM, the Cpu was 33.51285% utilized.
At 10:28:52 PM, the Cpu was 42.08793% utilized.
At 10:28:53 PM, the Cpu was 44.70722% utilized.
At 10:28:54 PM, the Cpu was 33.75841% utilized.
At 10:28:55 PM, the Cpu was 30.23645% utilized.
At 10:28:56 PM, the Cpu was 31.96174% utilized.
At 10:28:57 PM, the Cpu was 25.83793% utilized.
At 10:28:58 PM, the Cpu was 29.00658% utilized.
Well, that's it! Reactive Extensions is an extremely powerful framework, and certainly a welcome feature on the .NET platform. In future posts, we'll discuss some of Rx's more powerful features, ending in a look at the new Reactive Extensions for JavaScript, a port of Rx to the browser.

Happy Coding!
Richard