One of the most common task structures in concurrent systems is illustrated by the producer-consumer problem. In this problem, threads or processes are divided into two relative types: a producer thread is responsible for performing an initial task that ends with creating some result and a consumer thread that takes that initial result for some later task. Between the threads, there is a shared array or queue that stores the results being passed. One key feature of this problem is that the consumer removes the data from the queue and consumes it by using it in some later purpose. There is no way for the consumer thread or threads to repeatedly access data in the queue.
As an example, consider a researcher who discovers a previously unknown play and they believe it may have been written by a famous author. As part of their work, this researcher wants to know if the newly discovered text uses common words with the same frequency that author used in other works. This work could be done with a pipe-and-filter application that reads in the content of the texts, builds a search index of all words based on their frequency, performs some computation that compares the search indexes of all of the works. One thread is assigned the tasks of reading in the contents and producing the list of words, placing a word at a time in a queue. A second thread consumes the words by removing them from the queue and adding them to the data used to build the search index. This thread then becomes a producer because it provides the index to yet another thread that will be performing the index comparisons.
As a first variant on this problem, consider two threads that share an unbounded
queue. This approach can be implemented using a linked list approach for the
queue. Specifically, we can assume that there is a queue_t structure that
contains pointers to the front and back of the queue, where the nodes in the
queue are of type queue_node_t. The nodes all contain pointers to some sort
of data_t field. Code Listing 8.10 shows the framework for
enqueueing and dequeueing data.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /* Code Listing 8.10:
   Enqueue and dequeue operations for a linked list implementation of a queue
 */
void
enqueue_unsafe (queue_t *queue, data_t *data)
{
  /* Create a new node and make it the new back of the queue */
  queue->back->next = calloc (1, sizeof (queue_node_t));
  assert (queue->back->next != null);
  queue->back = queue->back->next;
  queue->back->data = data;
}
data_t *
dequeue_unsafe (queue_t *queue)
{
  /* If back = front, then the queue is empty */
  if (queue->back == queue->front)
    return NULL;
  data_t * data = queue->front->data;
  queue_node_t * next = queue->front->next;
  free (queue->front);
  queue->front = next;
  return data;
}
 | 
This implementation, which would be acceptable for a single-threaded
application, has a race condition when used in a concurrent setting.
Specifically, if one thread begins to enqueue() some data, another thread
that tries to dequeue() the data at the same time may get a NULL pointer
because the first thread has not yet reached the line the advances the back
pointer.
The solution here would be to refactor these functions to become a monitor as
shown in Code Listing 8.11. Each function would be passed a
reference to a shared mutex that would be locked on entry and released just
before the function returns. This solution eliminates the race condition
regarding the timing of access to the queue’s back field. Additionally, this
solution is generalizable regardless of the number of producers and consumers.
If there are multiple producers trying to enqueue() data, the mutex ensures
that they will not try to manipulate the queue at that same time. For brevity,
Code Listing 8.11 calls the functions from Code Listing 8.10.
/* Code Listing 8.11:
   A synchronized version of linked list enqueueing and dequeueing
 */
void
enqueue (queue_t *queue, data_t *data, pthread_mutex_t *lock)
{
  pthread_mutex_lock (lock);
  enqueue_unsafe (queue, data);
  pthread_mutex_unlock (lock);
}
data_t *
dequeue (queue_t *queue, pthread_mutex_t *lock)
{
  pthread_mutex_lock (lock);
  data_t * data = dequeue_unsafe (queue); 
  pthread_mutex_unlock (lock);
  return data;
}
In many cases, the unbounded queue of the previous section is neither feasible nor desirable. For instance, this approach makes it very easy to launch a denial-of-service attack against the application, particularly if this code is used in a server. Code Listing 8.12 shows a single line of code that quickly exhausts the program’s dynamic memory resources, leading to a system crash.
| 1 2 3 4 5 6 | /* Code Listing 8.12:
   A trivial denial-of-service attack against the code in 8.11
 */
while (1)
  enqueue (queue, data, lock);
 | 
Given this weakness, systems software typically imposes constraints on the
number of items in the queue. One approach would be to use semaphores with the
linked list queue above. Another approach is to use a finite circular array as
the basis. For this approach, we will still assume the use of a queue_t data
type to represent the queue, however the internal implementation uses an array
instead of linked nodes. Figure  8.3.1 illustrates the
structure of the queue.
Code Listing 8.13 shows the unsafe version of the enqueue() and
dequeue() operations for an array implementation of a queue. Within the
queue_t data type, contents is an array of pointers to the enqueued data,
while front and back are indexes into this array. When either of the
indexes are incremented, the operations must recalculate the value modulo the
array size to ensure that the values stay within the bounds of the array. This
approach creates a circular structure, where the spaces in the array can be
reused. Once the back index reaches the end of the array, it would return to the
first position and continue from there.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | /* Code Listing 8.13:
   Enqueue and dequeue operations for an array queue implementation
 */
void
enqueue_unsafe (queue_t *queue, data_t *data)
{
  /* Store the data in the array and advance the index */
  queue->contents[queue->back++] = data;
  queue->back %= queue->capacity;
}
data_t *
dequeue_unsafe (queue_t *queue)
{
  data_t * data = queue->contents[queue->front++];
  queue->front %= queue->capacity;
  return data;
}
 | 
The implementation in Code Listing 8.13 has a fundamental flaw in both operations, as neither enforces the limited capacity. This design choice was intentional, as this code was meant to encapsulate the enqueueing and dequeueing behavior. That is, we can now reason through the synchronization issues for the producer-consumer problem without regard for the specific queue implementation.
To build the rationale for the solution to the producer-consumer problem,
consider Code Listing 8.14. This implementation attempts to keep
track of the number of items (queue->size) in the queue, comparing it with
values that indicate whether the queue is full or empty.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | /* Code Listing 8.14:
   An unsuccessful attempt at solving producer-consumer
 */
void
enqueue_failure (queue_t *queue, data_t *data)
{
  if (queue->counter == queue->capacity) return;
  enqueue_unsafe (queue, data);
  queue->counter++;
}
data_t *
dequeue_failure (queue_t * queue)
{
  if (queue->counter == 0)
    return NULL;
  data_t * data = dequeue_unsafe (queue); 
  queue->counter--;
  return data;
}
 | 
The attempted solution in Code Listing 8.14 fails because it has
race conditions on the queue’s counter variable. If the producer is
attempting to enqueue an item at the same moment the consumer is dequeueing one,
the outcome depends on whether the producer’s check for available space happens
before or after the consumer decrements the counter. A similar race
condition arises if the queue is empty and both functions are called. This race
condition could be fixed by wrapping the accesses to the counter with a
mutex.
Code Listing 8.14 has another flaw from the producer’s perspective:
there is no indication that there was a failure to enqueue the item.
Addressing this failure would require changing the function’s interface to
return a status to indicate success or failure. This approach, however, imposes
an undesirable burden on the user. That is, the programmer who is building the
producer and consumer threads or processes must build in a fail-safe mechanism
to respond accordingly if the enqueue() or dequeue() operations fail.
A better, more user-friendly solution to the producer-consumer problem is to use semaphores. Note that semaphores incorporate the key functionality that we need: atomic incrementing and decrementing of a counter variable. The previous approach was trying to re-invent this already-solved problem. Code Listing 8.15 shows the framework for solving the producer-consumer problem for a single producer and single consumer.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | /* Code Listing 8.15:
   Solution for a single producer and single consumer
 */
void
enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items)
{
  sem_wait (space);
  enqueue_unsafe (queue, data);
  sem_post (items);
}
data_t *
dequeue (queue_t * queue, sem_t *space, sem_t *items)
{
  sem_wait (items);
  data_t * data = dequeue_unsafe (queue); 
  sem_post (space);
  return data;
}
 | 
The structure of this approach is to use signaling with two semaphores. The key
insight here is that there are actually two bounds that need enforced: a
maximum and a minimum. If the queue is full, then the producer needs to wait
until there is a space available. That is, the producer must call
sem_wait(space) and wait if necessary. The space semaphore is
initialized to the capacity of the queue, so it would only be 0 if that many
items are already in the queue. Once the consumer has removed an item from the
queue, it performs the counterpart call sem_post(space) to alert the
producer that a space is available. In short, the space semaphore enforces
the maximum capacity of the queue.
At the same time, the consumer must not attempt to remove an item from the queue
if none have been enqueued. Depending on the internal state of the queue, this
attempt to dequeue an item might return old data that has previously be removed
or an unexpected NULL pointer. The items semaphore, which is initialized
to 0 and represents the number of enqueued items, enforces this constraint. If
the queue is empty, this semaphore would have an internal value of 0, causing
the consumer to block when it calls sem_wait(items). Once the producer puts
an item into the queue, it would perform the corresponding sem_post(items)
that would increment the semaphore and unblock the consumer.
The solution in the previous section works successfully if there is only a single producer. However, if there are multiple producers, the previous solution will not work and should not be used. The problem is the line highlighted in Code Listing 8.16. Recall that post-increments in C are not atomic operations. That is, the internal execution at the machine language level involves a three-step process: load the value into a register, increment the value in the register, and store the result back into memory. If two threads try to perform this increment at the same time, the increments could interfere with each other.
| 1 2 3 4 5 | /* Code Listing 8.16:
   Non-atomic increments are always race conditions
 */
queue->contents[queue->back++] = data;
 | 
The solution in this case would be to add a lock as shown in Code Listing 8.17. Acquiring and releasing the lock as shown here minimizes the size
of the critical section. That is, the operations on the semaphores do not
require protection. Including the calls to sem_wait() and sem_post()
within the critical section for the lock would unnecessarily prevent the other
producers from manipulating the semaphores, partially eliminating the benefit of
using a semaphore.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | /* Code Listing 8.17:
   Solution for a single producer and single consumer
 */
void
enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items,
         pthread_mutex_t *lock)
{
  sem_wait (space);
  pthread_mutex_lock (lock);
  enqueue_unsafe (queue, data);
  pthread_mutex_unlock (lock);
  sem_post (items);
}
 | 
Note that this approach does not require any modification to the dequeue()
operation used by the consumer. Since there is only a single consumer, there is
no race condition on incrementing the queue->front variable. This solution
could be extended to multiple consumers by introducing a second lock for
consumers used in dequeue() as shown in Code Listing 8.18. It
is important, in this case, to use separate locks rather than simply reusing the
same lock for both producers and consumers. When both the space and
items semaphores have positive values, then the front and back of the queue
are guaranteed to be in different places. That is, there are multiple items in
the queue, so the producer and consumer within the critical sections would not
be interfering with each other. Consequently, there is no reason that the
producer needs to lock out the consumer, or vice versa.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /* Code Listing 8.18:
   Solution for a multiple producers and consumers
 */
void
enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items,
         pthread_mutex_t *producer_lock)
{
  sem_wait (space);
  pthread_mutex_lock (producer_lock);
  enqueue_unsafe (queue, data);
  pthread_mutex_unlock (producer_lock);
  sem_post (items);
}
data_t *
dequeue (queue_t * queue, sem_t *space, sem_t *items,
         pthread_mutex_t *consumer_lock)
{
  sem_wait (items);
  pthread_mutex_lock (consumer_lock);
  data_t * data = dequeue_unsafe (queue); 
  pthread_mutex_unlock (consumer_lock);
  sem_post (space);
  return data;
}
 |