Click here to Skip to main content
15,890,947 members
Articles / Programming Languages / C#

Multithreaded Buffer Poll in .NET

Rate me:
Please Sign up or sign in to vote.
4.25/5 (3 votes)
19 Mar 2009CPOL6 min read 29.4K   264   20   3
This article presents a way to implement pipleline threads cooperation using buffer pool.

Multicore era – everyone heard about it – Parallel Extensions in Microsoft .NET, Intel TBB, even the new C++ standard includes basic support for threading. Now our small desktop (laptop) computers or small servers posses multicore possibilities – software shall explore these advantages.

Responsibilities of Presented Solution

  • Manage pool of data items (order of items might be kept or not)
  • Manage pool of empty items – reusing items (useful when these are large buffers)

Features

  • Bounded buffer problem solution for multiple producers-consumers pattern. It can also be classified as shared memory asynchronous message passing
  • Easy to use solution – all locking machinery hidden inside
  • Versions:
    • unordered – order in which items are taken does not matter – multiple consumers (one or multiple consumers)
    • ordered – order in which items are taken shall be preserved – one consumer (one or multiple producers)
  • Puts constraints on memory usage
  • Avoids frequent memory allocation and deallocation – more predictable running time (memory allocation time is not predictable)
  • Using Parallel Extensions number of running threads can easily and automatically fit into hardware (number of cores)

Data and task parallelism – pipeline processing. Data is divided into blocks, processed in multiple stages in parallel. Each stage can be executed by multiple running threads. The concept is similar to pipelined, superscalar processor architecture (Fig. 1). Arbitrary number of tasks can exist at each stage of processing (Fig. 2).

prod-cons_1.png

Fig. 1. Sample configuration of processing tasks

prod-cons_2.png

Fig. 2. Another configuration of processing tasks

The solution is usable when tasks are time consuming and can be easily parallelized (bag of tasks pattern). It is also usable when data arrives from outside (file, database) and reading is not a processor consuming operation. In such cases, additional threads can perform processing on data blocks already read (when next chunk read is in progress).

Idea

Every item has data and id assigned. Ids shall be unique and sequential (for OrderedItemsPool class). This is some kind of ticket algorithm. The code below presents schematic solution for 3 stage processing (one producer, multiple processing tasks, one consumer).

C#
static void Producer(ItemsPool<t1> bufout)
{
    try
    {
        int i = 0;
        while (i < maxNumberOfItems)
        {
            ItemData<t1> v;
            v.Value = bufout.DequeueEmpty();
            v.Value = …
            v.Id = i;
            bufout.EnqueueData(v);
            ++i;
        }
        bufout.EnqueueEndOfData(i);
    }
    catch (Exception e)
    {
        bufout.SetError("Producer problem", e);
    }
}

static void Processing(ItemsPoolsInOut<t1,> bufsInOut)
{
    try
    {
        T2 outData;
        ItemData<t1> inData;

        while (bufsInOut.DequeueEmptyAndData(out inData, out outData))
        {
            outData = performSomeProcessing(inData.Value);
            bufsInOut.EnqueueDataAndEmpty(outData, inData);
        }
    }
    catch (Exception e)
    {
        bufsInOut.SetError("Processing problem", e);
    }
}

static void Consumer(ItemsPool<t2> bufin)
{
    try
    {
        ItemData<t2> data;

        while (bufin.DequeueData(out data))
        {
            //process data.Value

            ...

            bufin.EnqueueEmpty(data.Value);
        }
    }
    catch (Exception e)
    {
        bufin.SetError("Consumer problem", e);
    }
}

static void Main(string[] args)
{
    ItemsPool<t1> inputBuffers = new OrderedItemsPool<t1>(inBuffersNumber);
    ItemsPool<t2> outputBuffers = new OrderedItemsPool<t2>(outBuffersNumber);

    Thread first = new Thread(() => Producer(inputBuffers));
    first.Start();

    for (int i = 0; i < calcTaskNumber; ++i)
        Task.Create((o) => 
	{ Processing(new ItemsPoolsInOut<t1,>(inputBuffers, outputBuffers)); });

    Thread consumer = new Thread(() => Consumer(outputBuffers));
    consumer.Start();

    consumer.Join();

    if (outputBuffers.IsError)
    {
        Console.WriteLine(outputBuffers.Error);
    }
}

How It Works

Generally heavy locking is used.:) Solution is based on monitors. We have 2 monitors here – first for operations on stack of empty items, second for operations on queue of data items.

When producer thread requires new empty item (DequeueEmpty method) it can be returned from pool of empty items or can be created, but there is a limitation on maximum number of items to be created (memory constraints). Thread has to wait on empty monitor's conditional variable when there are no free empty buffers and no more items can be created. When item is no longer needed, it must be returned to the empty pool (EnqueueEmpty method) or released explicitly (DropEmpty method). Returning or releasing is done by consumer thread whereas dequeuing empty by the producer thread.

When producer has filled empty item with data, it shall be put into data queue (EnqueueData method). On the other end, consumer thread obtains item using DequeueData method. Reading can be blocking when no data exists or when reading is ordered (OrderedItemsPool class) and proper item has not arrived yet. In such cases, thread waits on full monitor's conditional variable.

In case of OrderedItemsPool class, items are stored in priority queue (SortedList class used), for UnorderedItemsPool class ordinary Queue class is used.

Finalizing

When all items have been produced, thread shall mark end of data. This is done using the EnqueueEndOfData method. Information is propagated among all tasks and they can stop processing – DequeueData method returns false when no more items exist to be processed.

EndOfData is a special item with largest id number. After EndOfData was stored, items with larger id cannot be stored (EndOfDataException will appear).

Errors

Work done in multiple threads suffer from error propagating problem. When exception rises in one thread and this thread is part of the processing chain, other threads (tasks) shall be informed. Otherwise deadlock can appear. Therefore exception shall be caught and passed into communication chain using SetError method. After that, ErrorInProcessingException will appear in all tasks processing data as a signal of a problem. The original exception is stored as inner exception.

The main thread can be informed of errors using IsError and Error properties of a pool. The values of the properties shall be obtained from the pool used by the thread that the main thread is waiting for (see code below):

C#
consumer.Join();
if (outputBuffers.IsError)
{
   Console.WriteLine(outputBuffers.Error);
}

Deadlocks

When task lives in the middle of processing chain, it is important to first obtain output item and then input item to avoid deadlock. Deadlock can appear when no output item is available (and in the processing chain order shall be preserved – OrderedItemsPool class). Such situation can be easily visible when input pool has more items than output pool. Helper class ItemsPoolsInOut was added as an example solution for such case (DequeueEmptyAndData method).

Moreover ordered items pool (OrderedItemsPool class) shall never appear after unordered items pool (UnorderedItemsPool class) otherwise deadlock can happen.

Problems

  • Item not returned to pool (or dropped explicitly) – can lead to deadlock. The problem can be easily solved by resigning from empty pool (and unfortunately memory constraints)
  • Heavy locking – .NET monitors are lighter than interprocess mutexes or semaphores, but there is still lot of synchronization – is a more lock free solution available?
  • Problem with background (detached) threads when exception occurs
  • Processing stopping not implemented

Example

Usage of the presented solution is processing of large file, i.e. encryption and compression. Encryption is realized with RijndaelManaged class whereas compression uses DeflateStream class. Data is read in chunks of 512 KB, processed and written to output file. The second program allows for decompression and decryption of data.

Work is divided into 4 groups: reader (one thread), encryptors (several tasks, several threads), compressors (several tasks, several threads) and writer (one thread). The second program (decompressor, decryptor) looks analogically.

Note that each group of tasks uses different TaskManager object, otherwise it can end in deadlock. Additionally reader and writer are separate threads.

Of course, it might be wiser to put compression (decompression) and encryption (decryption) into one task (no additional buffers and synchronizations) but this is an example just to show possibilities.

Moreover compression using deflate is crap – I know that – it is just an example – if real compression is needed, replace the algorithm. :)

The code uses Microsoft Parallel Extensions Jun08 CTP therefore it is necessary to download and install this library.

Additional note for implementation – DeflateStream class constructor has leaveOpen argument which is absent in CryptoStream class, therefore streams cannot be reused. This seems to be quite inconsistent in design – maybe some Microsoft architect can explain why?

Changelog

  • 2009-03-19
    • SetError method changed - race condition removed (double-check locking pattern used)
  • 2009-03-16
    • Initial revision

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Poland Poland
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
Generalvery interesting work Pin
Cheeso22-Mar-09 9:38
Cheeso22-Mar-09 9:38 
GeneralRe: very interesting work Pin
Cheeso22-Mar-09 10:33
Cheeso22-Mar-09 10:33 
ps: Regarding the problems you identified. . . particularly heavy locking, deadlocking, etc.

I used a slightly different twist on the design. I do not use an expandable stack for work items. Instead I use a fixed number of workitems, proportional to the number of threads used. Each workitem stores or holds its original input, and the outputs from various stages in the pipeline, as well as its state in the pipeline.

The workitem states are a linear progression. In the parallel compress/encrypt example, the jobs are: read, compress, encrypt, write. There are corresponding "-ing" and "-ed" states for each job: Reading, Read, Compressing, Compressed, Encrypting, Encrypted, Writing, Written. There is also a "None" state, for a workitem that has not yet been read. Each worker thread can do only one kind of work: move a workitem from a particular state to the next one. For example, the reader thread can move a workitem from "None" to "Read", with a brief stop in "Reading". The compressor thread can move a workitem from "Read" to "Compressed" with a brief stopover in "Compressing".

Each worker thread does this: lock a workitem, and then examine it. If the workitem has a state that the worker thread can operate on, then the worker operates on the workitem, updates its status, and drops the lock. It then moves to the next workitem in the list. If the workitem is not in a state that the worker can operate on, the worker has two options: wait, or move to the next item. How it decides between these two options is based on the workitem state, relative to the state the worker can deal with.

If the state of a workitem lies before the state that a worker thread can operate on, then the worker thread calls Monitor.Wait() on the workitem. If the state of a workitem lies after the worker's operating state, then the worker thread just unlocks the workitem and goes to the next one.

Example:
do
{
    workitem = _pool[_nextToCompress % _pool.Count];
    lock(workitem)
    {
        // If the status is less than what we want, then wait on workitem.
        // Numerically "less" means the buffer is waiting to be filled, or
        // is filling.
        // If the status us numerically higher,  then we go to the next buffer.
        // The thread will always look for a good candidate to process.  If
        // either of the writer thread or reader threads is blocked, then
        // the compressor threads can cycle around, which can be inefficient.
        // But it prevents deadlock.

        if (workitem.status < (int)WorkItem.Status.Read)
            Monitor.Wait(workitem);
        else if (workitem.status == (int)WorkItem.Status.Read)
        {
            // use the workitem
            DeflateOneSegment(workitem);

            // increment the pointer using Interlock.Increment because
            // there is more than one compressor thread.
            Interlocked.Increment(ref _nextToCompress);

            workitem.status = (int)WorkItem.Status.Compressed;
            Monitor.Pulse(workitem);
        }
    }

    if (_doneReading && _nextToCompress == _nextToFill)
        break;

} while (true);


The reader worker is special in that there is only one, and it operates on the first stage in the pipeline; for that reason it never moves to the next workitem, it always waits.

do
{
    workitem = _pool[_nextToFill % _pool.Count];
    lock(workitem)
    {
    // If the status is what we want, then use the workitem.
    // Otherwise, wait.

    if (workitem.status == (int)WorkItem.Status.None ||
        workitem.status == (int)WorkItem.Status.Done)
    {
        workitem.status = (int)WorkItem.Status.Reading;
        int bytesRead = _inStream.Read(workitem.buffer, 0, workitem.buffer.Length);
        if (bytesRead == 0)
        {
        // the stream is EOF, no more to read
        workitem.status = (int)WorkItem.Status.Done;
        _doneReading= true;
        }
        else
        {
        workitem.inputBytesAvailable=  bytesRead;
        workitem.status = (int)WorkItem.Status.Read;
        // no need for interlocked.increment: there is only one reader thread.
        _nextToFill++;
        }
        Monitor.Pulse(workitem);
    }
    else
        Monitor.Wait(workitem);
    }
}
while (! _doneReading);


The locking is not heavy at all. My tests using the DEFLATE algorithm, which is CPU-bound, show a nearly linear performance improvement with the number of CPUs. This indicates the locking is very low overhead.
GeneralRe: very interesting work Pin
Polanek22-Mar-09 22:44
Polanek22-Mar-09 22:44 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.