In this post we are going to look at concurrent data structures, why they are helpful for synchronization purposes and which basic data structures TPL does provide you as an application developer.

What this post will cover

  • Why one needs concurrent data structures
  • Which data structures does TPL offer
  • Concurrent & deferred initialization

What this post will not cover

  • What is mutable shared state
  • How to achieve basic thread safety
  • Tasks and so on

Why one needs concurrent data structures

As I shared in the post about basic thread safety the most common issues and bugs with multithreaded code arise when you access mutable shared state in such an environment.
We already saw, that this can be overcome by techniques like locking or data types like a Monitor etc. But we also saw that this can readily become quite tedious and transform otherwise simple algorithms to very complex ones. Which in turn makes them hard to maintain and understand. Also you want to make sure that you utilize the cheapest and fastest synchronization technique that is available, else you might encounter the performance hits of lock contention.

For this purpose you can utilize the built in structures of TPL that helps you to concentrate on your algorithms.
In the next sections we are going to look at the most common ones in a set of consumer – producer like patterns. But we start of with deferred initialization by using the Lazy<T> class, that can make a lot of sense in multithreaded environments.

Lazy<T>

With this type you can defer the initialization of an object instance to the exact point where it is needed in your application. The best thing about this, is that Lazy<T> is inherently thread safe and (as long as not specified) will always guarantee that just one object is created and returned, no matter how many threads are trying to access the value.

In effect it is a highly efficient solution to handle resources that might or might not be needed during the lifetime of the application.

Lazy<T> works by default with the public default constructor that is called by reflection of the supplied Type argument to Lazy<T>’s constructor (new Lazy<T>(typeof(MyType)). Yet if you have no default constructor you can also supply a Func<T> that is then used as a initialization method. This is in my experience the most common use case for the use of Lazy<T>.

public class MyExpensiveResource
{ 
// omitted for brevities sake
}
public class LazyDemo
{
private Lazy<MyExpensiveResource> _lazyResource;
public MyExpensiveResource MyExpensiveResource { get; set;}
public LazyDemo()
{
_lazyResource = new Lazy(MyExpensiveResource>(() => new MyExpensiveResource());
}
public void Initialize()
{
MyExpensiveResource = _lazyResource.Value;
}
}

This is not exactly a concurrent data structure in itself, but a deferred initialization pattern, yet because it is inherently thread safe it is still useful for highly concurrent scenarios.

Next we are going to look at concurrent collections.

Concurrent collections

The need for concurrent collections stems from the fact that the .Net standard collections are not thread safe. That is the operations the respective API exposes are not only not atomic and therefore not thread safe, but also have other issues. For instance you could simply lock around the Queue<T> ‘s dequeue method and it would make this method still not thread safe in itself:

public static object DoDequeue()
{
lock (_stateGuard)
{
return queue.Dequeue();
}
}

This would still be a problem, because the Dequeue method throws an exception if the queue is empty. You could then add another lock and a check if the queue has items in it, yet this would make the algorithm more complex with no actual improvement to the outcome.

But again TPL to the rescue, it provides us with different concurrent collections which exactly counter these issues. Therefore we look now at the ConcurrentQueue<T> and the ConcurrentStack<T> because those FIFO/LIFO structures are quite common in most applications.

ConcurrentQueue<T> and ConcurrentStack<T>

As mentioned before these collections are inherently thread safe FIFO and LIFO data structures. We are looking at the way the ConcurrentQueue<T> works with a producer – consumer pattern implementation:

public class Program
{
static void Main(string[] args) 
{
var queue = new ConcurrentQueue<string>();
foreach (FileInfo file in new DirectoryInfo(@"C:\Data")
.GetFiles("*.csv", SearchOption.AllDirectories))
{ 
// simulates synchronous producer
queue.Enqueue(file.FullName);
}
var consumers = new Task[4];
for (int i = 0; i < consumers.Length; i++)
{
consumers[i] = Task.Run(() => Consumer(queue));
}
Task.WaitAll(consumers); 
}
public static void Consumer(ConcurrentQueue<string> queue)
{ 
string file;
while (queue.TryDequeue(out file))
{
Console.WriteLine("{0}: Processing {1}", Task.CurrentId, file);
}
}
}

In this example we use a ConcurrentQueue<string> to load all CSV filenames into the queue and have them printed out concurrently by using individual Tasks.
Notice the TryDequeue instead of Dequeue method. What this does is that it internally checks if there are still items in the queue and returns true if so, and false if not. (you’d recognize this pattern a lot in .Net and I personally tend to use it quite often e.g. if I cannot ensure compile time thread safety etc.)
With the ConcurrentQueue the Enqueue method on the other hand exists. This method works like you would expect.

The ConcurrentStack works the same except for reverse order and that you would call Push and TryPop methods.

ConcurrentDictionary<K,V>

The Dictionary<K,V> is also not thread safe, often enough you would get exceptions like KeyNotFoundException or InvalidOperationException because the key is already registered/not registered.

Read based operations are safe though. This means that if you initialize a Dictionary that is immutable (like IReadOnlyDictionary<K,V>) you do not need to use the thread safe version. If you want to change the state of the Dictionary you will need to restrain to the ConcurrentDictionary<K, V> to avoid state courruption in an multithreaded environment.

Even though the ConcurrentDictionary<K, V> is still a hash map implementation, it uses a slightly different API. This is what we already saw with the ConcurrentQueue<T> and the ConcurrentStack<T>.
For example instead of add/remove you have TryAdd and TryRemove etc.

It also offers the following methods that are pretty self explanatory with the above mentioned information:

– TryAdd
– GetOrAdd (if you want to create a Dictionary concurrently)
– AddOrUpdate (different values for either add or update)
– TryUpdate (with comparison value)
all of this operations are atomic by nature!

ConcurrentBag<T>

The ConcurrentBag stores items for later consumption. Interestingly enough the List<T> has no thread safe version in the TPL, The Bag on the other hand has an implementation.
The concurrentBag is for a very specific usage of the producer consumer pattern.

In general the List and a simple lock are much more efficient and lock and List more efficient than bag, it is only useful for specific use case of producer consumer pattern. When using the concurrentBag it works best when the producer and consumer run on the same thread (e.g. the UI thread). This is because each thread gets its own linked list of the items in the bag and will try to get them from its own list before any other list. To manage this the concurrentbag allows two key methods:

  • void Add(T item)
  • bool TryTake(T out item)

So with this “stealing” and multiple threads that consume the items of the bag you would create some sort of load balancing between threads, because each thread has its own copy of the bags linked list. As I said it is a very special use case, which I did not came across in a production case (or could not think of in that circumstance)

Blocking Collections

All the above mentioned collections do not block the calling thread, which sometimes can be the desired outcome. The TryAdd method for example simply returns true/false and the thread does not block. So for some consumer/producer like situations the thread would need to sort of poll if the value can be applied. Which is ineffecient.

With blocking collection you can apply blocking semantics to the TPL concurrent Data structures. The BlockingCollection<T> can use either of the aforementioned collections, while the default is the ConcurrentQueue<T>.

The BlockingCollection<T> supplies methods for add of items, that simply is called Add. The remove/dequeue/pop method is called Take. It depends on the supplied underlying data structure if it applies FIFO/LIFO or random access semantics.

The calling Thread will be suspended if the take method shows no items in the collection. The Add operation on the other hand will not block the calling thread.

On initialization you can add a capacity to the BlockingCollectin:

var collection = new BlockingCollection<string>(new ConcurrentQueuue<string>(), 2);

Once the underlying collection reaches this capacity, the Add operation will also block the calling thread. If you still want a nonblocking Add semantic, you could then go about and use the TryAdd method that works similarly to the other Concurrent data structures Try- methods.

With the BlockingCollection<T> you have the ability to gracefully shutdown the processing of the underlying collection.
That is all the suspended threads are allowed to call the items and allows all threads that are processing to work out on all the items they have in processing.

Consuming Enumerable

One of the most effective ways to utilize the BlockingCollection<T> is to make use of the IEnumerable<T> implementation of it. The best thing about it is that it concurrently iterates over the items and during this process removes the items from the collection (think of a foreach which uses the Take method for each iterated item). This is done by calling the GetConsumingEnumerable method on the BlockingCollection.

Summary

In this post we looked at why one should at least consider the use of the TPL supplied concurrent data structures. The best reasoning being is that else your algorithms can become quite tedious and complex to manage and to maintain.

We looked at thread safe deferred initialization by using the Lazy<T> type. Also we looked at Concurrent collections that use slightly different semantics and APIs than the non concurrent standard implementations.

Last but not least we looked at the BlockingCollection that suspends threads for more efficient implementations of producer consumer patterns. Also we looked at how UI collections can be made fit for synchronization with TPL built in types.

Categories: TPL

0 Comments

Leave a Reply

Avatar placeholder