.NET, C#, Threading

Multithreaded pattern

Came across a pretty neat multithreaded locking pattern I’d like to memorize (so I’ll write about it here). So assuming you don’t know how to write multithreaded code, like me, but you came across the problem that simple locking on resource won’t solve. Say you want multiple threads to read from a collection and multiple threads writing to a collection. This all can be done with simple lock on this particular collection. Let’s now say you also want the reading threads to wait, if there is nothing for them to read (a collection is empty) or the data is present but not ready to be read (it’s still being processed by someone) and you want the reading thread to be immediately notified once the data is present/ready so that he can jump in and use it. We can imagine this scenario coming quite naturally when you have some kind of work item queue. Some threads add new items to be processed in response to user action or a web service call from an external system and you have multiple data processing threads that take items from this queue after (and only after) they are enqueued and process them. When queue is empty data threads should lay low and wait, but should jump in as soon as there is some work for them. Now this scenario can be handled in many ways, not necessary multithreaded ( I can imagine a nice asynchronous implementation), but let’s assume you want multithreaded implementation, because of many processors available and high throughput demand (work items should be processed in parallel if possible).

Now the simplest implementation would just do the locking on queue as below, where you make sure that only one thread can add and remove item at the same time:



    class WorkitemQueue
    {
        private readonly Queue<Workitem> _items = new Queue<Workitem>();
 
        public void Enqueue(Workitem item)
        {
            lock (_items)
            {
                _items.Enqueue(item);

            }
        }

        public Workitem Dequeue()
        {
            lock (_items)
            {
                return _items.Dequeue();
            }
        }

    }


 

The problem with that is how will the reading thread know there is an item he can process ? Well it won’t, he’ll have to poll the queue and to immediately process new items, they’ll have to poll often! Each time waiting to obtain the lock if necessary which might actually be a problem if you have a lot of reading threads (and remember you wanted to! ). Moreover, since they will be fighting for the same lock that’s needed to add a new work item, they might slow down (or even starve potentially) new item inserts, the very thing they are waiting for. Oh, irony 😉

So what is the solution? Well have a look at this class:




    class WorkitemQueue
    {
        private readonly Queue<Workitem> _items = new Queue<Workitem>();
 
        public void Enqueue(Workitem item)
        {
            lock (_items)
            {
                _items.Enqueue(item);
                Monitor.PulseAll(_items);
            }
        }

        public Workitem Dequeue()
        {
            lock (_items)
            {
                while (_items.Count == 0)
                    Monitor.Wait(_items);
                return _items.Dequeue();
            }
        }


    }


 

All I really added was to calls to as Monitor class static methods:
Wait
PulseAll

But they make a huge difference. Now every time reading thread obtains a lock, but the queue is empty it calls Monitor.Wait. What this method does is it releases the lock and blocks current thread asking him to wait until the state (queue) will occur and then he’ll have a chance to try again. Once the thread was notified (I’ll talk about that in a moment), he’ll return to the very moment he finished at, a call to Monitor.Wait. But because the call is in a while loop he’ll check the blocking condition again to see if there are still any items for him to process. After all some other thread could have been first and the queue is empty again! If it’s not the item is dequeued and lock released the usual way.

What happens when we add new item to the queue? Well the only difference is the call to Monitor.PulseAll that, as you might expect, will notify all threads that are waiting for this particular lock and they can now try to aquire it. Calling this method releases the lock caller owns and allows next thread to acquire it.

This solves two important problems, first we don’t have the reading threads constantly fighting for lock over our queue, which might cause it to be unresponsive to new items insertion and secondly you don’t have to poll your queue so often as Monitor will notify waiting threads once there is something for them. In some cases you don’t even need polling at all, if you only read items in response to some external event, but you still have it guaranteed that empty queue case is handled.
Again, in this particular scenario, when you have a bunch of threads observing a queue you probably want to have some kind of mechanism that will guarantee that there always is at least one thread waiting to dequeue workitems, but that is a different subject and maybe I’ll write about it in future.

Standard

Leave a comment