
|
#include <boost/lexical_cast.hpp> // lexical_cast
#include <cerrno> // errno
#include <cstdio> // perror
#include <cstdlib> // atoi, exit,
#include <iomanip> // setw
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h> // usleep
using namespace std;
// make the Buffer hold integers
typedef int bufferItem;
/******************************************************************************
*
******************************************************************************/
template <class T>
class CircularQueue
{
public:
CircularQueue(T* buffer);
T front() const { return this->buffer[head]; }
T back() const { return this->buffer[tail]; }
int size() const { return numItems; }
int getMaxSize() const { return maxSize; }
bool full() const { return numItems == maxSize; }
bool empty() const { return numItems == 0; }
bool enqueue(T item); // add an item to the back of the queue
bool dequeue(T* item = NULL); // remove from the front of the queue
void display() const;
private:
int maxSize;
int numItems;
int head;
int tail;
T* buffer;
};
// shared memory ---- GLOBAL variables
const int BUFFER_SIZE = 5;
bufferItem buffer[BUFFER_SIZE];
CircularQueue<bufferItem> cq(buffer);
pthread_mutex_t mutex;
sem_t empty;
sem_t full;
// functions
void checkArgs(int argc, char** argv);
void initialize();
void* producer(void* threadID);
void* consumer(void* threadID);
void exitError(string msg, int errCode = 0);
/******************************************************************************
* main()
* - check and get the command line arguments
* - initialize the buffer, mutex lock and semaphores
* - create the producer threads
* - create the consumer threads
* - sleep
* - cancel the threads
* - exit
******************************************************************************/
int main(int argc, char** argv)
{
// check and assign the args
checkArgs(argc, argv);
int sleepTime = atoi(argv[1]);
int numProducer = atoi(argv[2]);
int numConsumer = atoi(argv[3]);
// initialize the buffer, mutex lock, and semaphores
initialize();
pthread_t producers[numProducer]; // keep track of the threads
pthread_t consumers[numConsumer]; // keep track of the threads
int r = 0; // used for checking the return codes
int* threadID = NULL;
cout << "Produced by P# Consumed by C#" << endl
<< "======== ===== ======== =====" << endl;
// create the producer threads
for (int i = 0; i < numProducer; i++)
{
threadID = new int(i + 1);
r = pthread_create(&producers[i], NULL, producer, (void*) threadID);
if (r)
exitError("Failed to create producer thread", r);
}
// create the consumer threads
for (int i = 0; i < numConsumer; i++)
{
threadID = new int(i + 1);
r = pthread_create(&consumers[i], NULL, consumer, (void*) threadID);
if (r)
exitError("Failed to create consumer thread", r);
}
// ZZzZZZzzZZZzz
sleep(sleepTime);
// cancel the threads
for (int i = 0; i < numProducer; i++)
{
r = pthread_cancel(producers[i]);
if (r)
exitError("Failed to cancel producer thread", r);
}
for (int i = 0; i < numConsumer; i++)
{
r = pthread_cancel(consumers[i]);
if (r)
exitError("Failed to cancel consumer thread", r);
}
// destroy the mutex lock and the semaphores
r = pthread_mutex_destroy(&mutex);
if (r)
exitError("Failed to destroy the mutex lock", r);
r = sem_destroy(&empty);
if (r)
exitError("Failed to destroy the 'empty' semaphore", r);
r = sem_destroy(&full);
if (r)
exitError("Failed to destroy the 'full' semaphore", r);
return 0;
}
/******************************************************************************
* initialize()
* - initializes the mutex lock, semaphores and the buffer
******************************************************************************/
void initialize()
{
int r; // stores the resulting return value
// init the mutex lock w/ default attribs.
r = pthread_mutex_init(&mutex, NULL);
if (r)
exitError("Failed to initialize the mutex lock", r);
// sem_init(ptr_to_sem, sharing_lvl, semaphore_initial_value)
// init the EMPTY SEMAPHORE
r = sem_init(&empty, 0, BUFFER_SIZE);
if (r)
exitError("Failed to initilize the 'empty' semaphore", r);
// init the FULL SEMAPHORE
r = sem_init(&full, 0, 0);
if (r)
exitError("Failed to initilize the 'full' semaphore", r);
// init the Buffer
for (int i = 0; i < BUFFER_SIZE; i++)
buffer[i] = 0;
}
/******************************************************************************
* producer()
* - generates pseudorandom numbers and pushes them to the queue
******************************************************************************/
void* producer(void* threadID)
{
int id = *(int*)threadID;
delete (int*) threadID;
threadID = NULL;
bufferItem itemProduced = -1;
int r1;
int r2;
while (true)
{
usleep(rand() % 150000); // sleep a random amount of time
itemProduced = (rand() % 1000); // use 3-digit numbers 0-999
// acquire the semaphore and the mutex lock
r1 = sem_wait(&empty);
r2 = pthread_mutex_lock(&mutex);
if (r1)
exitError("Failed to acquire the `empty` semaphore", r1);
if (r2)
exitError("Failed to acquire the mutex lock", r2);
// insert item into shared global buffer and print what was done
if (cq.enqueue(itemProduced))
{
cout << setw(5) << right << itemProduced
<< setw(6) << 'P' << left << id << endl;
}
else
exitError("Failed to produce a new item");
r1 = pthread_mutex_unlock(&mutex); // release the krakens!
r2 = sem_post(&full); // increment the 'full' semaphore
if (r1)
exitError("Failed to release the mutex lock", r1);
if (r2)
exitError("Failed to increment the number of full slots", r2);
}
return NULL;
}
/******************************************************************************
* consumer()
* - consumes numbers from the queue
******************************************************************************/
void* consumer(void* threadID)
{
int id = *(int*)threadID;
delete (int*) threadID;
threadID = NULL;
bufferItem itemConsumed = -1;
int r1;
int r2;
while (true)
{
usleep(rand() % 150000); // sleep a random amount of time
r1 = sem_wait(&full);
r2 = pthread_mutex_lock(&mutex);
if (r1)
exitError("Failed to acquire the `full` semaphore", r1);
if (r2)
exitError("Failed to acquire the mutex lock", r2);
// consume item from shared global buffer and print what was done
if (cq.dequeue(&itemConsumed))
{
cout << setw(20) << right << itemConsumed
<< setw(7) << 'C' << left << id << endl;
}
else
exitError("Failed to consume an item");
r1 = pthread_mutex_unlock(&mutex); // release the krakens!
r2 = sem_post(&empty);
if (r1)
exitError("Failed to release the mutex lock", r1);
if (r2)
exitError("Failed to increment the number of empty slots", r2);
}
return NULL;
}
|