Bounded Buffer - Revisited

In this problem, we shall redo the bounded buffer problem using a monitor and condition variables. The desired monitor has the buffer and the two pointers as its local variable and provides an initialization function BufferInit(), a function for retrieving data PUT() and a function for depositing data PUT(). The function prototypes are shown below.

Click here to download a copy of this file.

void  BufferInit(void);       /* initializes the monitor  */
int   GET(void);              /* get an item from buffer  */
int   PUT(int  value);        /* add an item to buffer    */

The following is the implementation file, buf-m-2.c. Click here to download a copy of this program.

#include  <thread.h>
#include  <synch.h>

#include  "buf-m-2.h"

#define   BUF_SIZE   5                  /* buffer size              */

static int   Buf[BUF_SIZE];             /* the actual buffer        */
static int   in;                        /* next empty location      */
static int   out;                       /* next filled location     */
static int   count;                     /* no. of items in buffer   */

static mutex_t  MonitorLock;            /* monitor lock             */

static cond_t   UntilNotFull;           /* wait until full cv       */
static cond_t   UntilNotEmpty;          /* wait until full cv       */

void  BufferInit(void)
{
     in = out = count = 0;

     mutex_init(&MonitorLock, USYNC_THREAD, (void *) NULL);

     cond_init(&UntilNotFull, USYNC_THREAD, (void *) NULL);
     cond_init(&UntilNotEmpty, USYNC_THREAD, (void *) NULL);
}

int  GET(void)
{
     int  value;

     mutex_lock(&MonitorLock);          /* lock the monitor         */
          while (count == 0)            /* while nothing in buffer  */
               cond_wait(&UntilNotEmpty, &MonitorLock); /* then wait*/
          value = Buf[out];             /* retrieve an item         */
          out = (out + 1) % BUF_SIZE;   /* advance out pointer      */
          count--;                      /* decrease counter         */
          cond_signal(&UntilNotFull);   /* signal consumers         */
     mutex_unlock(&MonitorLock);        /* release monitor          */
     return  value;                     /* return retrieved vague   */
}

int  PUT(int  value)
{
     mutex_lock(&MonitorLock);          /* lock the buffer          */
          while (count == BUF_SIZE)     /* while buffer is full     */
               cond_wait(&UntilNotFull, &MonitorLock); /* then wait */
          Buf[in] = value;              /* add the value to buffer  */
          in = (in + 1) % BUF_SIZE;     /* advance in pointer       */
          count++;                      /* increase count           */
          cond_signal(&UntilNotEmpty);  /* signal producers         */
     mutex_unlock(&MonitorLock);        /* release the lock         */
     return  value;                     /* return the value         */
}

The main program is similar to a previous one. We have equal number of producer threads and consumer threads. Each producer thread deposits some number of positive integers into the buffer followed by an end-of-data mark EOD. Each consumer keeps retrieving data from the buffer until a EOD is read. Since all relevant programming details have been moved to a monitor, the main program and the producer and consumer threads look simpler.

Click here to download a copy of this file.

#include  <stdio.h>
#include  <thread.h>

#include  "buf-m-2.h"

#define   MAX_ITEMS    20
#define   MAX_THREADS   5
#define   EOD          -1

mutex_t   ScreenLock;

int       Max_Run;                       /* max # of deposit/withdraw*/

void  Fill(char x[], int n)
{
     int  i;

     for (i = 0; i < n*2; i++)
          x[i] = ' ';
     x[i] = '\0';
}

void *Producer(void *voidPTR)
{
     int   *intPTR = (int *) voidPTR;
     int   ID      = *intPTR;
     int   i, data;
     char  filler[100];

     Fill(filler, ID);
     mutex_lock(&ScreenLock);
          printf("%sProducer %d started ...\n", filler, ID);
     mutex_unlock(&ScreenLock);

     for (i = 0; i < Max_Run; i++) {     /* for each iteration       */
          thr_yield();                   /* rest for unspecified time*/
          data = ID*10000 + i;           /* generate a data item     */
          PUT(data);
          mutex_lock(&ScreenLock);       /* lock the screen          */
               printf("%sProducer %d has added %d to the buffer\n",
                       filler, ID, data);/* display a message        */
          mutex_unlock(&ScreenLock);     /* release the screen       */

     }

     thr_yield();                        /* rest for unspecified time*/
     PUT(EOD);
     mutex_lock(&ScreenLock);            /* lock the screen          */
          printf("%sProducer %d adds EOD and exits\n", filler, ID);
     mutex_unlock(&ScreenLock);          /* release the screen       */
     thr_exit(0);
}

void *Consumer(void *voidPTR)
{
     int   *intPTR = (int *) voidPTR;
     int   ID      = *intPTR;
     int   i, data;
     char  filler[100];

     Fill(filler, ID+10);
     mutex_lock(&ScreenLock);
          printf("%sConsumer %d started ...\n", filler, ID);
     mutex_unlock(&ScreenLock);

     do {                                /* iterate until EOD        */
          thr_yield();                   /* rest for unspecified time*/
          mutex_lock(&ScreenLock);       /* lock the screen          */
               printf("%sConsumer %d is waiting for an item\n",
                       filler, ID);      /* display a message        */
          mutex_unlock(&ScreenLock);     /* release the screen       */

          data = GET();
          if (data != EOD) {
               mutex_lock(&ScreenLock);
                    printf("%sConsumer %d has taken %d from the buffer\n",
                           filler, ID, data);      /* display data   */
               mutex_unlock(&ScreenLock);/* release the buffer       */
          }
     } while (data != EOD);              /* do until EOD is seen     */

     mutex_lock(&ScreenLock);
          printf("%sConsumer %d receives EOD and exits\n", filler, ID);
     mutex_unlock(&ScreenLock);
     thr_exit(0);
}

void  main(int argc, char *argv[])
{
     thread_t   pID[MAX_THREADS];        /* producer ID              */
     thread_t   cID[MAX_THREADS];        /* consumer ID              */
     size_t     pStatus[MAX_THREADS];    /* producer status          */
     size_t     cStatus[MAX_THREADS];    /* consumer status          */
     int        pArg[MAX_THREADS];       /* producer argument        */
     int        cArg[MAX_THREADS];       /* consumer argument        */
     int        Threads;                 /* # of producers/consumers */
     int        i;

     if (argc != 3)  {
          printf("Use %s #-of-iterations #-of-producers/consumers\n", argv[0]);
          exit(0);
     }
     Max_Run = abs(atoi(argv[1]));
     Threads = abs(atoi(argv[2]));
     if (Threads > MAX_THREADS) {
          printf("The no. of producers/consumers is too large.  Reset to %d\n",
                 MAX_THREADS);
          Threads = MAX_THREADS;
     }

     printf("Parent started ...\n");

     mutex_init(&ScreenLock, USYNC_THREAD, (void *) NULL);
     BufferInit();

     for (i = 0; i < Threads; i++) {     /* start consumers first    */
          cArg[i] = i + 1;
          thr_create(NULL, 0, Consumer, (void *) &(cArg[i]),
                     0, (void *) &(cID[i]));
     }
     for (i = 0; i < Threads; i++) {     /* followed by producers    */
          pArg[i] = i + 1;
          thr_create(NULL, 0, Producer, (void *) &(pArg[i]),
                     0, (void *) &(pID[i]));
     }

     for (i = 0; i < Threads; i++)       /* wait producers/consumers */
          thr_join(cID[i], 0, (void *) &(cStatus[i]));
     for (i = 0; i < Threads; i++)
          thr_join(pID[i], 0, (void *) &(pStatus[i]));

     printf("Parent exits ...\n");
}