Friday, December 17, 2010

Task Parallel Library: Concurrency made easy in .NET

The .NET 4.0 Framework brought many notable features, but the highlight of the release was definitely Task Parallel Library (TPL).

Concurrency is an issue many of us have had to tackle at one point or another.  The way concurrency is handled in many frameworks feels very unnatural.  Typically, these frameworks require the use of classes with a parameterless function that does not return any value.  Execution of this function is often handled on another thread, which in a multi-core/processor environment could potentially be ran on a separate processor.  Synchronizing the state of applications using these special "Runnable" (Java) classes becomes very difficult, and like so many other things in these languages, requires more scaffolding to accomplish than really should be needed.

Concurrency is such an issue, many languages have been designed to addressed this issue.  Scala and Erlang are specific examples, designed around an "actor" model, where state is transferred using message-passing.  Clojure takes a different approach by using "Software Transactional Memory", enforcing an ACID model on updating state.  These approaches to concurrency are extremely compelling alternatives to the traditional thread model, but do us no good when we are creating software on highly-adopted languages like Java and C#.

Task Parallel Library is Microsoft's answer to the concurrency issue.  TPL makes the creation of highly-concurrent applications trivial, leaving the engineer to focus on modelling the domain and implementing business logic.  However, to understand the importance of the new framework, it's important to understand where we were at prior to TPL.

.NET's model for concurrency has morphed over time, we can definitely see that Microsoft has learned from the inadequacies of Java's thread model.  .NET 1.1 supported the passing of a static function to the Thread.Start() method by wrapping the method in a ThreadStart object.

using System;
using System.Threading;

public class ThreadExample {

    public static void ThreadProc() {
        for (int i = 0; i < 10; i++) {
            Console.WriteLine("ThreadProc: {0}", i);
            Thread.Sleep(0);
        }
    }

    public static void Main() {
        Thread t = new Thread(
           new ThreadStart(ThreadProc));
        t.Start();
    }
}
Excerpt from MSDN (System.Threading.Thread)


While we don't have to implement a special interface to achieve asynchonicity, the code is still pretty ugly.  This demo does not even demonstrate the trouble one would encounter trying to pull resulting data from the operation, and it certainly doesn't show a transfer of state from the main class to the wrapped function.


TPL answers the issue of concurrency by adding two distinct features to the .NET Framework:  Task Parallelism and Data ParallelismTask Parallelism addresses the execution of independent tasks in parallel, while Data Parallelism focuses on the use of parallel operations against a collection.  To support parallelism, the .NET Framework introduces a new set of concurrent collections:

System.Collections.Concurrent.BlockingCollection<T>
System.Collections.Concurrent.ConcurrentBag<T>
System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>
System.Collections.Concurrent.ConcurrentQueue<T>
System.Collections.Concurrent.ConcurrentStack<T>

Concurrent collections are excellent for working on a collection type from within a parallel loop or set of tasks.  Much of what we want done tends to be some sort of mapping of values to a distilled set, possibly followed by a reduction into a summary variable.  Most of the core .NET collections are not thread safe.  Even if you know there will be no conflicts in your data (e.g.: inserting on a single List collection from multiple threads where every key is unique), you will encounter threading issues.

Perhaps the most pervasive issue is the IndexOutOfRangeException which occurs when multiple threads attempt to insert on a non-thread-safe collection.  The real cause of the problem is that the List class must preallocate space for the collection, which is backed by an array of that type.  Gabe talks about the underlying reason behind the error on Stack Overflow:  http://stackoverflow.com/questions/4007248/index-out-of-range-exception-when-using-parallel-for-loop.

Let start by introducing Data Parallelism, which encompasses Parallel For and ForEach loops, as well as, PLINQ (Parallel LINQ).

* Both features in TPL can be found in the System.Threading.Tasks namespace.

First, let's look at the Parallel.For loop.  In this demonstration, we are going to add a million Guids to a ConcurrentBag in Parallel:

//Our concurrent collection
ConcurrentBag<guid> bagOfGuids 
   = new ConcurrentBag<guid>();

//Parallel For's signature in this example is:
// (int startingInt, 
//  int endingInt, 
//  Func(int iteration) action)
Parallel.For(1, 1000000, 
   (i) => {
      //Every ten thousandth key let's output a
      //status message.
      Debug.WriteLineIf((i % 10000) == 0, 
         string.Format("{0} Keys Created.", i));
      //Add the Guid to the collection
      bagOfGuids.Add(Guid.NewGuid());
   });

And here is the output. Notice how the messages are out of order? This is happening because the messages are being outputted by different thread (some finishing faster than others).

10000 Keys Created.
20000 Keys Created.
500000 Keys Created.
510000 Keys Created.
520000 Keys Created.
530000 Keys Created.
30000 Keys Created.
40000 Keys Created.
540000 Keys Created.
50000 Keys Created.
60000 Keys Created.
70000 Keys Created.
550000 Keys Created.
80000 Keys Created.
90000 Keys Created.
100000 Keys Created.
560000 Keys Created.
570000 Keys Created.
120000 Keys Created.
130000 Keys Created.
140000 Keys Created.
580000 Keys Created.
110000 Keys Created.
160000 Keys Created.
150000 Keys Created.
170000 Keys Created.
590000 Keys Created.
600000 Keys Created.
610000 Keys Created.
180000 Keys Created.
620000 Keys Created.
200000 Keys Created.
210000 Keys Created.
630000 Keys Created.
640000 Keys Created.
650000 Keys Created.
220000 Keys Created.
230000 Keys Created.
190000 Keys Created.
240000 Keys Created.
660000 Keys Created.
670000 Keys Created.
680000 Keys Created.
250000 Keys Created.
690000 Keys Created.
700000 Keys Created.
710000 Keys Created.
260000 Keys Created.
720000 Keys Created.
270000 Keys Created.
290000 Keys Created.
730000 Keys Created.
300000 Keys Created.
280000 Keys Created.
320000 Keys Created.
740000 Keys Created.
330000 Keys Created.
340000 Keys Created.
750000 Keys Created.
350000 Keys Created.
360000 Keys Created.
760000 Keys Created.
380000 Keys Created.
370000 Keys Created.
770000 Keys Created.
390000 Keys Created.
400000 Keys Created.
780000 Keys Created.
410000 Keys Created.
420000 Keys Created.
430000 Keys Created.
440000 Keys Created.
790000 Keys Created.
450000 Keys Created.
460000 Keys Created.
470000 Keys Created.
480000 Keys Created.
800000 Keys Created.
490000 Keys Created.
810000 Keys Created.
820000 Keys Created.
830000 Keys Created.
840000 Keys Created.
860000 Keys Created.
850000 Keys Created.
870000 Keys Created.
880000 Keys Created.
890000 Keys Created.
900000 Keys Created.
910000 Keys Created.
920000 Keys Created.
930000 Keys Created.
940000 Keys Created.
950000 Keys Created.
960000 Keys Created.
310000 Keys Created.
970000 Keys Created.
990000 Keys Created.
980000 Keys Created.

Parallel ForEach Loops work a lot similar, but obviously require some IEnumberable<T> collection as its iteration source. In the next demo, we will partition the ConcurrentBag of 1,000,000 Guids using a consistent hashing strategy I use on Redis (the remainder of first byte of the Guid divided by the desired number of partitions [very effective!]).


//Create a concurrent dictionary to store the
//partitions
ConcurrentDictionary<int, ConcurrentBag<Guid>> partitions 
   = new ConcurrentDictionary<int, ConcurrentBag<Guid>>();

//Iterate in parallel over the previous collection
Parallel.ForEach(bagOfGuids,
  (guid) =>
   {
      //This is my key partitioning strategy
      //creating 20 partitions of keys.
      int index = guid.ToByteArray()[0] % 20;
      //Try adding the partition to the dictionary
      //this will fail if the key already exists
      //*Note: Using if(!dictionary.ContainsKey(...)) 
      //does not work very well in a concurrent environment.
      //This is why the concurrent dictionary has the
      //TryAdd function.
      partitions.TryAdd(index, new ConcurrentBag<Guid>());
      //Add the key to the appropriate partition
      partitions[index].Add(guid);
   });

//Here's a little LINQ magic to show you how the keys
//were distributed in the bag.
partitions.ToList().ForEach(
   (kvp) => { 
      Debug.WriteLine("Partition {0} has {1} keys.", 
         kvp.Key, kvp.Value.Count); 
   });

Partition 0 has 50786 keys.
Partition 1 has 50835 keys.
Partition 2 has 50760 keys.
Partition 3 has 50591 keys.
Partition 4 has 50365 keys.
Partition 5 has 51199 keys.
Partition 6 has 51138 keys.
Partition 7 has 50950 keys.
Partition 8 has 50912 keys.
Partition 9 has 50891 keys.
Partition 10 has 50655 keys.
Partition 11 has 50723 keys.
Partition 12 has 50564 keys.
Partition 13 has 50743 keys.
Partition 14 has 50803 keys.
Partition 15 has 50418 keys.
Partition 16 has 46620 keys.
Partition 17 has 47033 keys.
Partition 18 has 46961 keys.
Partition 19 has 47052 keys.

In addition to traditional Data Parallelism, TPL also includes parallel extensions to LINQ. Here's a little taste as to how parallelism can be applied to LINQ:

IEnumerable<Guid> partitionZero =
   //Note the use of AsParallel() on the
   //source collection
   from guid in bagOfGuids.AsParallel() 
   where ((guid.ToByteArray()[0] % 20) == 0) 
   select guid;

Debug.WriteLine("There are {0} keys in partition zero.", 
   partitionZero.Count());

Here is the output from the code above. You will notice that we get the same count as we did in the previous demo.

There are 50786 keys in partition zero.

Now that we have seen how parallelism can be applied to data, let's look at how we can use TPL to address problems traditionally handled by the Thread class. Let's demonstrate the basics of Task and Task.Factory objects which form the foundation of Task Parallelism in .NET 4.0:

//Store a collection of concurrent tasks.
//You can just as easily create a task without
//keeping a reference of it, but you will
//learn later that their are benefits to
//remembering these tasks.
Task[] tasks = new Task[]{

  //Create the first parallel task
   Task.Factory.StartNew(()=> { 

      for(int i = 0; i < 1000000; i++){
         Debug.WriteLineIf((i % 50000) == 0, 
            string.Format(
            "Hit {0} iterations on the first task.", i));
      }

   }),

   //Create the second parallel task
   Task.Factory.StartNew(()=> {

      for (int i = 0; i < 1000000; i++){
         Debug.WriteLineIf((i % 50000) == 0, 
            string.Format(
            "Hit {0} iterations on the second task.", i));
      }

   }),

   //Create the third parallel task
   Task.Factory.StartNew(()=> {

         for (int i = 0; i < 1000000; i++)
         {
         Debug.WriteLineIf((i % 50000) == 0, 
            string.Format(
            "Hit {0} iterations on the third task.", i));
         }

    })
};

And the output of the three concurrent tasks:

Hit 0 iterations on the first task.
Hit 0 iterations on the second task.
Hit 50000 iterations on the first task.
Hit 50000 iterations on the second task.
Hit 100000 iterations on the first task.
Hit 100000 iterations on the second task.
Hit 150000 iterations on the first task.
Hit 150000 iterations on the second task.
Hit 200000 iterations on the first task.
Hit 200000 iterations on the second task.
Hit 250000 iterations on the first task.
Hit 250000 iterations on the second task.
Hit 300000 iterations on the first task.
Hit 300000 iterations on the second task.
Hit 350000 iterations on the first task.
Hit 350000 iterations on the second task.
Hit 400000 iterations on the first task.
Hit 400000 iterations on the second task.
Hit 450000 iterations on the first task.
Hit 450000 iterations on the second task.
Hit 500000 iterations on the first task.
Hit 500000 iterations on the second task.
Hit 550000 iterations on the first task.
Hit 550000 iterations on the second task.
Hit 600000 iterations on the first task.
Hit 600000 iterations on the second task.
Hit 650000 iterations on the first task.
Hit 650000 iterations on the second task.
Hit 700000 iterations on the first task.
Hit 700000 iterations on the second task.
Hit 750000 iterations on the first task.
Hit 750000 iterations on the second task.
Hit 800000 iterations on the first task.
Hit 800000 iterations on the second task.
Hit 850000 iterations on the first task.
Hit 850000 iterations on the second task.
Hit 900000 iterations on the first task.
Hit 900000 iterations on the second task.
Hit 950000 iterations on the first task.
Hit 950000 iterations on the second task.
Hit 0 iterations on the third task.
Hit 50000 iterations on the third task.
Hit 100000 iterations on the third task.
Hit 150000 iterations on the third task.
Hit 200000 iterations on the third task.
Hit 250000 iterations on the third task.
Hit 300000 iterations on the third task.
Hit 350000 iterations on the third task.
Hit 400000 iterations on the third task.
Hit 450000 iterations on the third task.
Hit 500000 iterations on the third task.
Hit 550000 iterations on the third task.
Hit 600000 iterations on the third task.
Hit 650000 iterations on the third task.
Hit 700000 iterations on the third task.
Hit 750000 iterations on the third task.
Hit 800000 iterations on the third task.
Hit 850000 iterations on the third task.
Hit 900000 iterations on the third task.
Hit 950000 iterations on the third task.

Sometimes it's necessary to wait for all of the concurrent tasks to complete before regular execution can continue. TPL provides this functionality using the Task.WaitAll(params Task[] tasks) function. You will see an demonstration of WaitAll in the next example; I have staggered the number of operations with the intention of the first two tasks finishing long before the third.

Task[] waitForAllTasks = new Task[]{
   Task.Factory.StartNew(()=> { 
      for(int i = 0; i < 1000; i++){
         Debug.WriteLineIf((i % 100) == 0, 
          string.Format(
           "Hit {0} iterations on the first task.", i));
      }
   }),
   Task.Factory.StartNew(()=> {
      for (int i = 0; i < 10000; i++)
      {
         Debug.WriteLineIf((i % 1000) == 0, 
          string.Format(
           "Hit {0} iterations on the second task.", i));
      }
   }),
   Task.Factory.StartNew(()=> {
      for (int i = 0; i < 100000; i++)
      {
         Debug.WriteLineIf((i % 10000) == 0, 
          string.Format(
           "Hit {0} iterations on the third task.", i));
      }
   })
};

//This will be printed before all tasks are finished
Debug.WriteLine("Waiting for all tasks to finish");

//Block until all concurrent tasks have finished.
Task.WaitAll(waitForAllTasks);

//This should be printed after all tasks have finished
Debug.WriteLine("Finished all tasks");

Here is the output:

Hit 0 iterations on the first task.
Hit 0 iterations on the second task.
Hit 100 iterations on the first task.
Hit 1000 iterations on the second task.
Hit 200 iterations on the first task.
Hit 2000 iterations on the second task.
Hit 300 iterations on the first task.
Hit 3000 iterations on the second task.
Hit 400 iterations on the first task.
Hit 4000 iterations on the second task.
Hit 500 iterations on the first task.
Hit 5000 iterations on the second task.
Hit 600 iterations on the first task.
Hit 6000 iterations on the second task.
Hit 700 iterations on the first task.
Hit 7000 iterations on the second task.
Hit 800 iterations on the first task.
Hit 8000 iterations on the second task.
Hit 900 iterations on the first task.
Hit 9000 iterations on the second task.
Hit 0 iterations on the third task.
Hit 10000 iterations on the third task.
Hit 20000 iterations on the third task.
Waiting for all tasks to finish
Hit 30000 iterations on the third task.
Hit 40000 iterations on the third task.
Hit 50000 iterations on the third task.
Hit 60000 iterations on the third task.
Hit 70000 iterations on the third task.
Hit 80000 iterations on the third task.
Hit 90000 iterations on the third task.
Finished all tasks

Sometimes we don't care if all tasks have finished before continuing synchronous execution. If we only need a subset of those tasks to finish, we can use the Task.WaitAny(params Task[] tasks) method to force the main thread to wait until the input tasks have exited.

Task[] notWaitingForLongRunningTask = new Task[]{
   Task.Factory.StartNew(()=> { 
      for(int i = 0; i < 1000; i++){
         Debug.WriteLineIf((i % 100) == 0, 
          string.Format(
           "Hit {0} iterations on the first task.", i));
      }
   }),
   Task.Factory.StartNew(()=> {
      for (int i = 0; i < 10000000; i++)
      {
         Debug.WriteLineIf((i % 1000000) == 0, 
          string.Format(
           "Hit {0} iterations on the long running task.", i));
      }
   })
};

//Wait only for the first task to finish
Task.WaitAny(notWaitingForLongRunningTask[0]);

Debug.WriteLine("Only waiting for the first task to finish.");

//Wait for all remaining tasks
Task.WaitAll(notWaitingForLongRunningTask);

Debug.WriteLine("All tasks finished.");

And the output...

Hit 0 iterations on the first task.
Hit 0 iterations on the long running task.
Hit 100 iterations on the first task.
Hit 200 iterations on the first task.
Hit 1000000 iterations on the long running task.
Hit 300 iterations on the first task.
Hit 400 iterations on the first task.
Hit 500 iterations on the first task.
Hit 600 iterations on the first task.
Hit 2000000 iterations on the long running task.
Hit 700 iterations on the first task.
Hit 800 iterations on the first task.
Hit 900 iterations on the first task.
Only waiting for the first task to finish.
Hit 3000000 iterations on the long running task.
Hit 4000000 iterations on the long running task.
Hit 5000000 iterations on the long running task.
Hit 6000000 iterations on the long running task.
Hit 7000000 iterations on the long running task.
Hit 8000000 iterations on the long running task.
Hit 9000000 iterations on the long running task.
All tasks finished.

My final demonstration is how to get data out of Parallel tasks. Fortunately, this is a lot simpler than using traditional thread models. We can use a special form of Task<T> to accomplish this functionality.

//We want Tasks that return a List of Guids
Task<List<Guid>>[] tasksReturningValues 
   = new Task<List<Guid>>[]{

   Task<List<Guid>>.Factory.StartNew(()=>{
      List<Guid> guids = new List<Guid>();
      for(int i = 0; i < 1000; i++){
         guids.Add(Guid.NewGuid());
      }
      //Your delegate must return the specified
      //template type
      return guids;
   }),

   Task<List<Guid>>.Factory.StartNew(()=>{
      List<Guid> guids = new List<Guid>();
      for(int i = 0; i < 1000; i++){
         guids.Add(Guid.NewGuid());
      }
      return guids;
   }),

};

//Let's wait for all tasks to finish
Task.WaitAll(tasksReturningValues);

//Create a list of Guids
List<Guid> returnedGuids = new List<Guid>();

//Add all Guids from the first task
returnedGuids.AddRange(tasksReturningValues[0].Result);

//Add all Guids from the second task
returnedGuids.AddRange(tasksReturningValues[1].Result);

//Display the count (we should have 2000)
Debug.WriteLine("I have {0} Guids in my bag.", 
  returnedGuids.Count);

And Finally...

I have 2000 Guids in my bag.

As you can see, Task Parallel Library allows Engineers to easy convert traditional synchronous applications into highly concurrent ones with little effort.

Richard

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.