1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
#include <cstdlib>
#include <pthread.h>
#include <thread>
#include <chrono>
#include <iostream>
#include <mutex>
#include <semaphore.h>
#include <unistd.h>
#include <sstream>
#define BUFFER_SIZE 5
typedef int buffer_item;
bool quit = false;
pthread_mutex_t mutex;
sem_t buf;
int sem_value;
int producer_position = 0, consumer_position = 0;
// Initialize buffer
int buffer_array[BUFFER_SIZE] = {-1,-1,-1,-1,-1};
//std::cout << sem_value << std::endl; //prints number of semaphores left
bool buffer_insert_item( buffer_item item )
{
buffer_array[producer_position % BUFFER_SIZE] = item;
std::cout << item << " at position " << producer_position << std::endl;
producer_position = (producer_position + 1) % BUFFER_SIZE;
};
bool buffer_remove_item()
{
if(buffer_array[consumer_position % BUFFER_SIZE] % 2 == 1)
{
//is prime
std::cout << buffer_array[consumer_position % BUFFER_SIZE] <<" is a prime number at position " << consumer_position << std::endl;
}
else
std::cout << buffer_array[consumer_position % BUFFER_SIZE] <<" at position " << consumer_position << std::endl;
consumer_position = (consumer_position + 1) % BUFFER_SIZE;
};
void* producer(void *argv)
{
int holder = atoi((char *)argv);
std::cout << "producer start" << std::endl;
unsigned int seed;
seed = time(NULL);
//this is for each thread outputting both random and unique numbers
std::stringstream X;
X << std::this_thread::get_id();
int id = std::stoull(X.str());
std::cout << id << std::endl;
while(quit != true)
{
//seeding random
//adding the id to the number generated makes it random even to parallel threads
buffer_item random_num = std::abs(((rand_r(&seed) + id) % 100)+1);
holder = (rand_r(&seed) % holder) + 1;
std::this_thread::sleep_for (std::chrono::milliseconds(holder));
sem_getvalue(&buf, &sem_value); //sets sem_value to how many semaphores are left
if (sem_value > 0 && sem_value <= 5)
{
//we lock the mutext first so we dont have a semaphore waiting to lock
/* aquire the mutex lock */
pthread_mutex_lock( &mutex );
//hold semaphore
sem_wait( &buf );
/*** CRITICAL SECTION ***/
std::cout << "inside the producer thread" << std::endl;
std::cout << sem_value << " this is the number of semaphore locks remaining" << std::endl;
//to have each number be random
while (random_num == std::abs(((rand_r(&seed) + id) % 100)+1))
{
random_num = std::abs(((rand_r(&seed) + id) % 100)+1);
}
//std::cout<< random_num << std::endl;
buffer_insert_item(random_num);
//release the semaphore
//sem_post( &buf );
/* release the mutex lock */
pthread_mutex_unlock( &mutex );
}
//else if(sem_value == 0)
else if (producer_position == consumer_position)
{
std::cout << "buffer full" << std:: endl;
//print out buffer full
std::this_thread::sleep_for (std::chrono::milliseconds(holder));
}
}
}
void* consumer(void *argv)
{
int holder = atoi((char *)argv);
std::cout << "consumer start" << std::endl;
unsigned int seed;
seed = time(NULL);
//this is for each thread outputting both random and unique numbers
std::stringstream X;
X << std::this_thread::get_id();
int id = std::stoull(X.str());
std::cout << id << std::endl;
while(quit != true)
{
holder = (rand_r(&seed) % holder) + 1;
std::this_thread::sleep_for (std::chrono::milliseconds(holder));
sem_getvalue(&buf, &sem_value); //sets sem_value to how many semaphores are left
if (sem_value >= 0 && sem_value < 5)
{
//we lock the mutext first so we dont have a semaphore waiting to lock
/* aquire the mutex lock */
pthread_mutex_lock( &mutex );
//hold semaphore
//sem_wait( &buf );
/*** CRITICAL SECTION ***/
std::cout << "inside the consumer thread" << std::endl;
std::cout << sem_value << " this is the number of semaphore locks remaining" << std::endl;
buffer_remove_item();
//release the semaphore
sem_post( &buf );
/* release the mutex lock */
pthread_mutex_unlock( &mutex );
}
//else if(sem_value == 5)
else if (consumer_position == producer_position)
{
std::cout << "buffer empty" << std::endl;
//print out buffer empty
std::this_thread::sleep_for (std::chrono::milliseconds(holder));
}
}
}
int main( int argc, char *argv[] )
{
pthread_t thread ;
sem_init( &buf, 0, 5 );
// Get command line arguments
int time_term = atoi(argv[1]);
int time_sleep = atoi(argv[2]);
int num_pro = atoi(argv[3]);
int num_con = atoi(argv[4]);
std::cout << time_term << std::endl <<
time_sleep << std::endl << num_pro <<
std::endl << num_con << std::endl;
// Create producer thread(s)
for(int i = 0; i < num_pro; i++)
{
pthread_create(&thread,NULL,producer,(void *) argv[2]);
}
// Create consumer thread(s)
for(int i = 0; i < num_con; i++)
{
pthread_create(&thread,NULL,consumer,(void *) argv[2]);
}
// Sleep
std::cout << "start sleep" << std::endl;
std::this_thread::sleep_for (std::chrono::milliseconds(time_term));
std::cout << "end sleep" << std::endl;
// Join Threads
quit = true; //setting quit value to exit threads
std::cout << "waiting for threads to quit" << std::endl;
pthread_join(thread,NULL);
// Display Statistics
// Exit
}
|