1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
|
#include <boost/lexical_cast.hpp> // lexical_cast
#include <cerrno> // errno
#include <cstdio> // perror
#include <cstdlib> // atoi, exit,
#include <iomanip> // setw
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h> // usleep
using namespace std;
// make the Buffer hold integers
typedef int bufferItem;
/******************************************************************************
*
******************************************************************************/
template <class T>
class CircularQueue
{
public:
CircularQueue(T* buffer);
T front() const { return this->buffer[head]; }
T back() const { return this->buffer[tail]; }
int size() const { return numItems; }
int getMaxSize() const { return maxSize; }
bool full() const { return numItems == maxSize; }
bool empty() const { return numItems == 0; }
bool enqueue(T item); // add an item to the back of the queue
bool dequeue(T* item = NULL); // remove from the front of the queue
void display() const;
private:
int maxSize;
int numItems;
int head;
int tail;
T* buffer;
};
// shared memory ---- GLOBAL variables
const int BUFFER_SIZE = 5;
bufferItem buffer[BUFFER_SIZE];
CircularQueue<bufferItem> cq(buffer);
pthread_mutex_t mutex;
sem_t empty;
sem_t full;
// functions
void checkArgs(int argc, char** argv);
void initialize();
void* producer(void* threadID);
void* consumer(void* threadID);
void exitError(string msg, int errCode = 0);
/******************************************************************************
* main()
* - check and get the command line arguments
* - initialize the buffer, mutex lock and semaphores
* - create the producer threads
* - create the consumer threads
* - sleep
* - cancel the threads
* - exit
******************************************************************************/
int main(int argc, char** argv)
{
// check and assign the args
checkArgs(argc, argv);
int sleepTime = atoi(argv[1]);
int numProducer = atoi(argv[2]);
int numConsumer = atoi(argv[3]);
// initialize the buffer, mutex lock, and semaphores
initialize();
pthread_t producers[numProducer]; // keep track of the threads
pthread_t consumers[numConsumer]; // keep track of the threads
int r = 0; // used for checking the return codes
int* threadID = NULL;
cout << "Produced by P# Consumed by C#" << endl
<< "======== ===== ======== =====" << endl;
// create the producer threads
for (int i = 0; i < numProducer; i++)
{
threadID = new int(i + 1);
r = pthread_create(&producers[i], NULL, producer, (void*) threadID);
if (r)
exitError("Failed to create producer thread", r);
}
// create the consumer threads
for (int i = 0; i < numConsumer; i++)
{
threadID = new int(i + 1);
r = pthread_create(&consumers[i], NULL, consumer, (void*) threadID);
if (r)
exitError("Failed to create consumer thread", r);
}
// ZZzZZZzzZZZzz
sleep(sleepTime);
// cancel the threads
for (int i = 0; i < numProducer; i++)
{
r = pthread_cancel(producers[i]);
if (r)
exitError("Failed to cancel producer thread", r);
}
for (int i = 0; i < numConsumer; i++)
{
r = pthread_cancel(consumers[i]);
if (r)
exitError("Failed to cancel consumer thread", r);
}
// destroy the mutex lock and the semaphores
r = pthread_mutex_destroy(&mutex);
if (r)
exitError("Failed to destroy the mutex lock", r);
r = sem_destroy(&empty);
if (r)
exitError("Failed to destroy the 'empty' semaphore", r);
r = sem_destroy(&full);
if (r)
exitError("Failed to destroy the 'full' semaphore", r);
return 0;
}
/******************************************************************************
* initialize()
* - initializes the mutex lock, semaphores and the buffer
******************************************************************************/
void initialize()
{
int r; // stores the resulting return value
// init the mutex lock w/ default attribs.
r = pthread_mutex_init(&mutex, NULL);
if (r)
exitError("Failed to initialize the mutex lock", r);
// sem_init(ptr_to_sem, sharing_lvl, semaphore_initial_value)
// init the EMPTY SEMAPHORE
r = sem_init(&empty, 0, BUFFER_SIZE);
if (r)
exitError("Failed to initilize the 'empty' semaphore", r);
// init the FULL SEMAPHORE
r = sem_init(&full, 0, 0);
if (r)
exitError("Failed to initilize the 'full' semaphore", r);
// init the Buffer
for (int i = 0; i < BUFFER_SIZE; i++)
buffer[i] = 0;
}
/******************************************************************************
* producer()
* - generates pseudorandom numbers and pushes them to the queue
******************************************************************************/
void* producer(void* threadID)
{
int id = *(int*)threadID;
delete (int*) threadID;
threadID = NULL;
bufferItem itemProduced = -1;
int r1;
int r2;
while (true)
{
usleep(rand() % 150000); // sleep a random amount of time
itemProduced = (rand() % 1000); // use 3-digit numbers 0-999
// acquire the semaphore and the mutex lock
r1 = sem_wait(&empty);
r2 = pthread_mutex_lock(&mutex);
if (r1)
exitError("Failed to acquire the `empty` semaphore", r1);
if (r2)
exitError("Failed to acquire the mutex lock", r2);
// insert item into shared global buffer and print what was done
if (cq.enqueue(itemProduced))
{
cout << setw(5) << right << itemProduced
<< setw(6) << 'P' << left << id << endl;
}
else
exitError("Failed to produce a new item");
r1 = pthread_mutex_unlock(&mutex); // release the krakens!
r2 = sem_post(&full); // increment the 'full' semaphore
if (r1)
exitError("Failed to release the mutex lock", r1);
if (r2)
exitError("Failed to increment the number of full slots", r2);
}
return NULL;
}
/******************************************************************************
* consumer()
* - consumes numbers from the queue
******************************************************************************/
void* consumer(void* threadID)
{
int id = *(int*)threadID;
delete (int*) threadID;
threadID = NULL;
bufferItem itemConsumed = -1;
int r1;
int r2;
while (true)
{
usleep(rand() % 150000); // sleep a random amount of time
r1 = sem_wait(&full);
r2 = pthread_mutex_lock(&mutex);
if (r1)
exitError("Failed to acquire the `full` semaphore", r1);
if (r2)
exitError("Failed to acquire the mutex lock", r2);
// consume item from shared global buffer and print what was done
if (cq.dequeue(&itemConsumed))
{
cout << setw(20) << right << itemConsumed
<< setw(7) << 'C' << left << id << endl;
}
else
exitError("Failed to consume an item");
r1 = pthread_mutex_unlock(&mutex); // release the krakens!
r2 = sem_post(&empty);
if (r1)
exitError("Failed to release the mutex lock", r1);
if (r2)
exitError("Failed to increment the number of empty slots", r2);
}
return NULL;
}
|