ThreadMentor: The Producer/Consumer (or Bounded-Buffer) Problem

Problem

Suppose we have a circular buffer with two pointers in and out to indicate the next available position for depositing data and the position that contains the next data to be retrieved. See the diagram below. There are two groups of threads, producers and consumers. Each producer deposits a data items into the in position and advances the pointer in, and each consumer retrieves the data item in position out and advances the pointer out.

Write a program, using a monitor, to correctly coordinate the producers and consumers and their depositing and retrieving activities.

Analysis

We have seen this problem before using three semaphores. Now, we will redo it with a monitor. Because the boundary of a monitor provides mutual exclusion, the protection of the buffer and in and out pointers become unnecessary. We used two semaphores, one for blocking producers if the buffer is full and the other for blocking consumers if the buffer is empty. The initial value of the former is the size of the buffer. However, condition variable signals are not counted and we have to count it by our program. As a result, we still need a condition variable for blocking producers if the buffer is full, and a second condition variable for blocking consumers if the buffer is empty. We also need a counter that records the number of filled slots.

Program

The following is the monitor interface. Condition variables NotFull and NotEmpty block producers and consumers, respectively. Private variable Buffer[BUFFER_SIZE] is the buffer, In and Out are the in and out pointers, and NumberOfItems counts the number of data items in the buffer. Note that BUFFER_SIZE is defined in header file ProducerConsumer-Thrd.h.

#include "ThreadClass.h"
#include "ProducerConsumer-Thrd.h"

class BufferMonitor : public Monitor 
{
     public:
          BufferMonitor(char* Name);  
          void Put(Item_t item);   // add an item into the buffer
          void Get(Item_t *item);  // get an item from the buffer

     private:
          Condition   NotFull,      // wait until buffer is not full
                     NotEmpty;      // wait until buffer is not empty
          int Buffer[BUFFER_SIZE];  // the buffer
          int In;                   // next empty slot in the buffer
          int Out;                  // next available data slot
          int NumberOfItems;        // number of items in the buffer
};
Click here to download this file (ProducerConsumer-mon.h)

The implementation of this monitor class is shown below. The constructor initializes In, Out and NumberOfItems to zero as usual. Note that this is a Hoare monitor.

Method Put() first tests if the buffer is full. If it is, the calling thread (i.e., a producer) waits on condition variable NotFull. If the buffer is not full or a producer is released by a consumer, the data is stored in the next available slot, the pointer is advanced and the number of items is increased by one. Finally, because the buffer has at least one item, consumers are notified by the execution of NotEmpty.Signal().

Method Get(), on the other hand, first tests if the buffer is empty. If it is empty, the calling thread (i.e., a consumer) is blocked. If the buffer is not empty or a consumer is released from condition variable NotEmpty, the next available data item is retrieved from the buffer, the pointer is advanced, and, the number of items is reduced by one. Finally, a producer is notified with NotFull.Signal() because the buffer is not full.

#include <iostream.h>>
#include "ThreadClass.h"
#include "ProducerConsumer-mon.h"

BufferMonitor::BufferMonitor(char* Name)
              :Monitor(Name, HOARE), NotFull("Notfull"),
               NotEmpty("NotEmpty") 
{
     NumberOfItems = 0;                 // no data items yet
     In = Out = 0 ;                 
};

void BufferMonitor::Put(Item_t item)
{
     MonitorBegin();                       
     if (NumberOfItems == BUFFER_SIZE)  // buffer is full
          NotFull.Wait();
     Buffer[In] = item;                 // add item into the buffer       
     In = (In + 1) % BUFFER_SIZE;       // advance input pointer
     NumberOfItems++;                    // have one more item
     NotEmpty.Signal();                 // release a waiting consumers
     MonitorEnd();                      
}

void BufferMonitor::Get(Item_t *item)
{
     MonitorBegin();                      
     if (NumberOfItems == 0)            // buffer is empty
          NotEmpty.Wait();
     *item = Buffer[Out];               // retrieve data
     Out = (Out + 1) % BUFFER_SIZE;     // advance out pointer
     NumberOfItems--;                   // have one less item
     NotFull.Signal();                  // release a waiting producer 
     MonitorEnd();                          
}
Click here to download this file (ProducerConsumer-mon.cpp)

With methods Get() and Put() and a central control of the buffer, all buffer related operations are in the monitor. As a result, the thread part becomes very clean. Because the thread part and the main program are almost identical to the semaphore version, we will not reproduce the thread class and the main program on this page. Click on the file name to download a copy.

Thread interface ProducerConsumer-Thrd.h
Thread implementation ProducerConsumer-Thrd.cpp
The Main Program ProducerConsumer-main.cpp

If this monitor is changed to the Mesa type, the if statements in both methods must be changed to while statements for obvious reason.