This article describes a 'plain' C++ implementation of producer/consumers queues and shows the inner workings of this synchronization mechanism.

## Background

The problem of producer-consumer processes has been studied from the '70s even before multi-threading became important. That's because these processes play an integral part of any interaction between a computer and the outside world. Even when data was read from punched cards, a program needed to 'consume' a card before the reading process could 'produce' another one. Similarly, a print line needed to be 'consumed' by the printer before the next one could be produced.

While for some time these problems have stayed in the realm of operating systems, once user processes got access to multi-threading facilities, they became a concern for everyday users. For a short background on different implementations, one can check the Wikipedia page.

If you are looking for a C# implementation, you can check Mark Clifton's article on this subject. Here, we are going to stay with the good old C++.

## A Producer/Consumer Example

Our example here is going to be taken directly from Mark's article mentioned above. We will write a program that finds all the prime numbers less than a certain limit. We want however to take advantage of all the cores of the CPU in order to 'speed-up' the process. I put speed-up in quotation marks because here we are not interested in real speed: following Mark's example, we will take the same very inefficient function to determine if a number is prime or not:

bool IsPrime (int n)
{
bool ret = true;
for (int i = 2; i <= n / 2 && ret; ret = n % i++ != 0)
;
return ret;
}

The general strategy is to have one producer thread adding to a queue the numbers to be checked for "prime-ness" while a number of consumer threads will pick up those numbers and place all positive results in another queue. For added fanciness, the output queue will keep both the prime number and an ID of the consumer thread that computed it.

Here are the data structures used for this setup:

struct result
{
int prime;
int worker;
};
...
sync_queue<int> nums;
sync_queue<result> primes;

The `nums`

queue keeps all the numbers to be tested and the `primes`

queue keeps the positive results.

`sync_queue`

is a producer/consumer queue structure that provides orderly access for all threads; we will see it's inner workings in a moment.

### Producer Thread

This is the simplest. It just fills the `nums`

queue with all the numbers up to a certain limit. At the end, it places a number of zeroes as a signal for consumers that the job is done. A consumer will terminate when it extracts a `0`

from the queue:

thread producer ([&nums]()->int {
for (int i = 2; i < 500000; i++)
nums.produce (i);
for (int i = 0; i < NTHREADS; i++)
nums.produce (0);
return 0;
});

### Consumer Threads

The code for a consumer thread is not much more complicated. It extract a number from the `nums`

queue, checks if it is a prime using the `IsPrime`

function and, if it is a prime, posts a new result in the `primes`

queue adding its own consumer number to know who calculated it:

auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};

When the number retrieved is `0`

, the function returns and the thread terminates.

The main application thread creates a number of consumer threads and starts them all before starting the producer thread:

thread* consumers[NTHREADS];
for (int thnum = 0; thnum < NTHREADS; thnum++)
{
auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};
consumers[thnum] = new thread (checker);
consumers[thnum]->start ();
}

### Running the Rodeo

Now that everything is setup, we can start the show:

stopwatch t_prod, t_cons;
t_prod.start ();
t_cons.start ();
producer.start (); producer.wait (); t_prod.stop ();
cout << "sync_queue finished producing" << " in " << fixed
<< setprecision (2) << t_prod.msecEnd ()/1000. << "sec" << endl;
for (int i = 0; i < NTHREADS; i++)
consumers[i]->wait ();
t_cons.stop ();
cout << "finished consuming" << " in " << fixed
<< setprecision (2) << t_cons.msecEnd () / 1000. << "sec" << endl;
cout << "Expecting 41538 primes, found "
<< primes.size () <<endl;
vector<int> found_by(NTHREADS);
while (!primes.empty ())
{
result r = primes.consume ();
found_by[r.worker]++;
}
for (int i = 0; i < NTHREADS; i++)
cout << "Consumer " << i << " found " << found_by[i]
<< " primes." << end;

On my machine (no speed monster this one), I get something like:

sync_queue finished producing in 1.67sec
finished consuming in 4.34sec
Expecting 41538 primes, found 41538
Consumer 0 found 4869 primes.
Consumer 1 found 5530 primes.
Consumer 2 found 5467 primes.
Consumer 3 found 4844 primes.
Consumer 4 found 4863 primes.
Consumer 5 found 5529 primes.
Consumer 6 found 5596 primes.
Consumer 7 found 4840 primes.

Instead of having some central control distribute the work among consumers, the `sync_queue`

allowed each consumer to pick its work unit and generate results. Some of the threads got a bit more, some a bit less but, all in all, the work was distributed fairly.

## The Inner Workings of a Producer/Consumer Queue

`sync_queue`

is a template class derived from `std::queue`

. It provides two main methods: `produce`

and `consume`

. To synchronize access, it uses a semaphore as well as a critical section object to keep everything consistent. The `produce`

method is very simple:

template <class M, class C=std::deque<M>>
class sync_queue : protected std::queue<M, C>
{
public:
...
virtual void produce (const M& obj)
{
lock l (update); this->push (obj); con_sema.signal (); }
...
protected:
semaphore con_sema;

A `lock`

object acquires the critical section to prevent simultaneous access. The object to be produced is pushed in the queue and the semaphore is signaled. When the function the lock object goes out of scope and the critical section is released.

Consuming is slightly more complicated:

virtual M consume ()
{
M result;
update.enter ();
while (std::queue<M, C>::empty ())
{
update.leave ();
con_sema.wait (); update.enter ();
}
result = this->front (); this->pop ();
update.leave ();
return result;
}

Again, we enter the critical section and check if the queue is empty. If so, we leave the critical section and start waiting for the consumers' semaphore to be signaled by a producer. When awoken by a signal, again we enter the critical section and loop again.

At this point, two things might have happened:

- No one else got the object and we find that queue is not empty. In this case, we exit the
`while`

loop, pick up the object and leave the critical section. - Another hungry consumer got the object and we find the queue empty. In this case, we leave the critical section and wait for another signal at the consumers' semaphore.

In addition to these main methods, there is another method to check if the queue is empty and another one to return the size of the queue. Note that both of them are only indicative because the result might change before the caller has a chance to check it.

## Bounded Producer/Consumer Queue

A sharp-eyed reader might have noticed that `sync_queue::produce`

method has no error checking. It blissfully calls `std::queue::push`

and assumes there is enough memory for the new object. This can be seen also from the run times of the producer and consumer threads in the example above: it took producer only 1.7 seconds to fill the queue of numbers to be checked and took consumers 4.4 seconds to empty it.

The `bounded_queue`

class allows you to limit the number of objects that can be queued. If a producer finds the bounded queue full, it has to wait until a consumer removes some of the objects. To do that, we need one more semaphore, `pro_sema`

that is initialized with the maximum size of the queue. The `produce`

method becomes:

template< class M, class C = std::deque<M> >
class bounded_queue : public sync_queue<M, C>
{
public:
bounded_queue (size_t limit_) : limit (limit_)
{
pro_sema.signal ((int)limit);
}
void produce (const M& obj)
{
this->update.enter ();
while (std::queue<M, C>::size () > limit)
{
this->update.leave ();
pro_sema.wait ();
this->update.enter ();
}
this->push (obj);
this->con_sema.signal ();
this->update.leave ();
}
...
protected:
size_t limit;
semaphore pro_sema;

You can see that it is much more similar to the `consume`

method shown before. It enters the critical section and, if the queue is full, repeatedly tries to find space for the new object by waiting on `pro_sema`

.

If we change the prime numbers example to use the `bounded_queue`

structure with 20 entries, the results look like this:

bounded_queue finished producing in 4.32sec
finished consuming in 4.32sec
Expecting 41538 primes, found 41538
Consumer 0 found 5103 primes.
Consumer 1 found 5156 primes.
Consumer 2 found 5192 primes.
Consumer 3 found 5240 primes.
Consumer 4 found 5227 primes.
Consumer 5 found 5091 primes.
Consumer 6 found 5267 primes.
Consumer 7 found 5262 primes.

The time required by the producer thread is the same as the time required by the consumers. That is because the producer is held up by the limited queue size.

## Conclusion

The producer/consumer queues presented in this article provide an easy to use mechanism for inter-thread communication. The threading primitives shown here (`thread`

, `critical_section`

, `semaphore`

, etc.) are part of the MLIB project. You can download the complete project from the GitHub project page.

## History

- 4
^{th} October, 2020: Initial version

Mircea is an OOP (old, opinionated programmer) with more years of experience than he likes to admit. Always opened to new things, he is however too bruised to follow any passing fad.

Lately he hangs around here hoping that some of the things he learned can be useful to others.