Consumer Producer
[Back]

Basic Problem
see textbook :-)
[top]
The
Consumer/Producer using Semaphores
- typedef struct {
- char buf[BSIZE]; // if empty==BSIZE buffer is empty.
if empty==0, then it's full
- sema_t occupied, empty;
- int nextin, nextout;
- sema_t pmutex, cmutex;
- } BUFFER;
initialization section
- BUFFER *bufferp; // allocate a pointer to a BUFFER
- bufferp = malloc( sizeof(BUFFER)
);
// create a BUFFER type
- sema_init(&buffer->occupied, 0, USYNC_THREAD,
0); // force consumer to always wait first
- sema_init(&buffer->empty, BSIZE, USYNC_THREAD, 0); // allow for
BSIZE accesses, i.e, buf is empty
- sema_init(&buffer->pmutex, 1, USYNC_THREAD,
0); // producer mutex
- sema_init(&buffer->cmutex, 1, USYNC_THREAD,
0); // consumer mutex
- bufferp->nextin = 0; // initialize ring buffer
- cond_init( &count_nonzero, USYNC_THREAD,
0); // intraprocesss
usage
- void producer (BUFFER *b, chare item) {
- sema_wait( &b->empty );
- sema_wait( &b->pmutex
);
// protect against other producers accessing this buffer
- b->buf[ b->nextin ] =
item;
// insert datum
- b->nextin = (b->nextin + 1 ) % BSIZE;
// ring buffer
- sema_post( &b->pmutex
);
// allow other producers in
- sema_post( &b->occupied
);
// let any consumers know a datum is available
- }
-
- char consumer(BUFFER *b) {
- char item;
- sema_wait( &b->occupied
);
// wait if another consumer or producer is here
- sema_wait( &b->cmutex
);
// protect buffer against other consumers
- item = b->buf[ b->nectout
];
// get next datum from buffer
- b->nextout = (b->nextout + 1) BSIZE;
// ring buffer
- sema_post( &b->cmutex
);
// allow other consumers into this buffer
- sema_post( &b->empty
);
// let producer know of buffer's status
- return ( item );
- }
[top]
-
- #include <thread.h>
// or <synch.h>
- mutex_t count_lock;
// mutex variable
- cond_t count_nonzero;
// conditional variable
- int
count;
// variable to be protected
-
- void dec()
- { mutex_lock( &count_lock);
// lock to ensure atomic access to count
- while ( count==0)
- cond_wait( &count_nonzero,
&count_lock); // release lock and wait for
signal
- count--;
- mutex_unlock( &count_lock);
//release lock
- }
-
- void inc() {
- mutex_lock( &count_lock);
- if ( count==0)
- cond_signal(
&count_nonzero); // signal any object waiting on the count_nonzero queue
- count++;
- mutex_unlock( &count_lock);
- }
-
-
- typedef struct {
- char buf[BSIZE];
// actual storage
- int occupied;
- int nextin, nextout;
// pointers
- mutex_t mutex;
// lock for exclusive access
- cont_t less, more;
// conditional variables
- } BUFFER;
-
- void producer(BUFFER *b, char item) {
- mutex_lock( &b->mutex
);
// capture buffer
- while ( b->occupied >= BSIZE)
// no room in buffer
- cond_wait( &b->less, &b->mutex
); // block and release mutex
- b->buf[ b->nextin] = item;
- b->nextin = ( b->nextin + 1) % BSIZE; // ring buffer
- b->occupied++;
// count of buffer
- cond_signal( &b->more
);
// signal consumers
- mutex_unlock( &b->mutex
);
// release buffer
- }
-
- char consumer(BUFFER *b) {
- char item;
- mutex_lock( &b->mutex
);
// capture buffer
- while ( b->occupied <= 0
)
// wait if buffer is empty
- cond_wait( &b->more, &b->mutex);
// block and relase mutex
- item = b->buf[ b->nextout ];
- b->nextout = (b->nextout + 1) % BSIZE;
// ring buffer
- b->occupied--;
// actual count of buffer
- cond_signal(
&b->less);
// signal producers
- mutex_unlock( &b->mutex
);
// release buffer
- return (item);
- }
- BUFFER *b1; // This fragment initializes the conditional variables
- b1 = malloc( sizeof( BUFFER) ); // and creates buffers and threads.
- cond_init( &b->less, USYNC_THREAD, 0);
- cond_init( &b->more, USYNC_THREAD, 0);
- b->occupied = b->nextout = b->nextin = 0;
- for (i=0; i<N; i++) {
- thr_create( NULL, NULL, foo, (void *) b1, THR_BOUND, NULL);
- thr_create( NULL, NULL, boo, (void *) b1, THR_BOUND, NULL);
- }
-
-
- // Sample worker threads that use BUFFERs.
- void foo(void *b) { void boo(void *b) {
- BUFFER *bp=b; BUFFER *bp=b; DATUM datum;
- while (true) { while (true) {
- datum = get_data(); datum = consumer(bp);
- producer(bp,datum); process_data(datum);
- } }
- } }