Hi,
I have a classic project - the producer/consumer problem. I have to do three versions of this: one which is serial, and the other two using openmp sections to parallelize the producer and the consumer. So far, everything compiles and runs without a hitch, but I am not quite sure I'm doing this right.
I made the serial program using two variables representing the read and write index of the queue I'm using to hold data to synchronize the producer and the consumer, so the producer will not insert more data into the queue if it's full and the consumer won't try to read from the queue if it's empty. I carried this over to the openmp implementations and it seems to work as well. However, just to make sure it was working the way I intended, I made the Producer and Consumer do their thing a random number of times in the serial program. The outputs would tell me if the Producer actually filled up the queue or if the Consumer ran into an empty queue. So far so good, everything was working perfectly. However, when I run the OpenMP implementations, in no case do I get the condition where the Consumer runs up against an empty queue, or the Producer runs up against a full queue. The program runs as if it was serial. I have checked my settings (I'm using Visual Studio) and I do indeed have openmp enabled. So I'm wondering why I never run into these error conditions. Does one thread always start (and finish) before the other? Might there be a problem with the way I declared my parallel sections? I mean, the code runs perfectly, maybe a little
too perfectly.
Here's my code. First up, the serial implementation. Focus on the function ProducerConsumerSequential:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
// ProducerConsumer_Sequential_A.cpp : Defines the entry point for the console application.
//
//*********** This version avoids the use of locks **************//
#include "stdafx.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>
using namespace std;
#define MAX_ELEMS 1024
volatile int readIndex, writeIndex, num, readNum;
volatile int queue[MAX_ELEMS];
void ProducerConsumerSequential();
void Producer();
void Consumer();
void PrintConsumerSum();
void PrintElementsInQueue();
int _tmain(int argc, _TCHAR* argv[])
{
ProducerConsumerSequential();
return 0;
}
void ProducerConsumerSequential()
{
cout << "This is a sequential implementation of the Producer Consumer problem.\n";
cout << "The producer will insert numbers into a queue a random number of times, \n";
cout << "then the consumer will extract a random number of times and add the numbers \n";
cout << "from the queue. If the producer cannot insert (because the queue is full) or \n";
cout << "the consumer cannot extract (because the queue is empty), an error will be \ndisplayed.\n";
readIndex = writeIndex = num = readNum = 0;
int randTimes;
char userContinue;
srand(time(NULL));
while(true)
{
randTimes = (rand() % 10) + 1;
for(int i = 0; i < randTimes; i++)
{
Producer();
}
randTimes = (rand() % 10) + 1;
for(int i = 0; i < randTimes; i++)
{
Consumer();
}
PrintConsumerSum();
PrintElementsInQueue();
cout << "\nDo you want to continue? (Y/N)" << endl;
cin >> userContinue;
if (userContinue == 'Y' || userContinue == 'y')
{
}
else
{
break;
}
}
std::system("Pause");
}
void Producer()
{
int nextElement = (writeIndex + 1) % MAX_ELEMS;
if (nextElement != readIndex)
{
queue[writeIndex] = num;
writeIndex = nextElement;
num++;
//return;
}
else
{
cout << "\nPRODUCER: The queue is full; cannot place another number." << endl;
return;
}
}
void Consumer()
{
if (writeIndex == readIndex)
{
cout << "\nCONSUMER: The queue is empty; cannot extract another number." << endl;
return;
}
else
{
int nextElement = (readIndex + 1) % MAX_ELEMS;
readNum += queue[readIndex];
readIndex = nextElement;
//return;
}
}
void PrintConsumerSum()
{
cout << "\nThe sum of the elements extracted by Consumer so far: " << readNum << endl;
}
void PrintElementsInQueue()
{
if (writeIndex == readIndex)
{
cout << "\nQueue is empty." << endl;
}
else if (writeIndex < readIndex)
{
cout << "\nThe elements in the queue are: ";
for(int k = 0; k < MAX_ELEMS - readIndex; k++)
{
cout << queue[readIndex + k] << ", ";
}
for(int k = 0; k < writeIndex; k++)
{
cout << queue[k] << ", ";
}
cout << endl;
}
else
{
cout << "\nThe elements in the queue are: ";
for(int k = 0; k < writeIndex - readIndex; k++)
{
cout << queue[readIndex + k] << ", ";
}
cout << endl;
}
}
|
Here's my OpenMP implementation. This is the simple one, where the Producer and Consumer are doing one data element at a time. The other version is with 128 elements at a time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
// ProducerConsumer_OpenMP_1.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <iostream>
#include <stdlib.h>
#include <omp.h>
using namespace std;
#define MAX_ELEMS 1024
#define VALUES_AT_A_TIME 1
volatile int readIndex, writeIndex, num, readNum;
volatile int queue[MAX_ELEMS], history[VALUES_AT_A_TIME];
void ProducerConsumerOpenMP_1();
void Producer();
void Consumer();
void PrintConsumerSum();
void PrintElementsInQueue();
void PrintLatestNumbersInQueue();
int _tmain(int argc, _TCHAR* argv[])
{
ProducerConsumerOpenMP_1();
return 0;
}
void ProducerConsumerOpenMP_1()
{
cout << "This is an OpenMP implementation of the Producer Consumer problem.\n";
cout << "The producer will insert numbers into a queue one by one, \n";
cout << "then the consumer will extract and add the numbers from the queue. \n";
cout << "If the producer cannot insert (because the queue is full) or the \n";
cout << "consumer cannot extract (because the queue is empty), an error will be \ndisplayed.\n";
readIndex = writeIndex = num = readNum = 0;
char userContinue;
while(true)
{
#pragma omp parallel sections
{
#pragma omp section
{
Producer();
}
#pragma omp section
{
Consumer();
}
}
PrintConsumerSum();
PrintElementsInQueue();
PrintLatestNumbersInQueue();
cout << "\nDo you want to continue? (Y/N)" << endl;
cin >> userContinue;
if (userContinue == 'Y' || userContinue == 'y')
{
}
else
{
break;
}
}
std::system("Pause");
}
void Producer()
{
int nextElement = (writeIndex + 1) % MAX_ELEMS;
if (nextElement != readIndex)
{
queue[writeIndex] = history[0] = num;
writeIndex = nextElement;
num++;
//return;
}
else
{
cout << "\nPRODUCER: The queue is full; cannot place another number." << endl;
return;
}
}
void Consumer()
{
if (writeIndex == readIndex)
{
cout << "\nCONSUMER: The queue is empty; cannot extract another number." << endl;
return;
}
else
{
int nextElement = (readIndex + 1) % MAX_ELEMS;
readNum += queue[readIndex];
readIndex = nextElement;
//return;
}
}
void PrintConsumerSum()
{
cout << "\nThe sum of the elements extracted by Consumer so far: " << readNum << endl;
}
void PrintElementsInQueue()
{
if (writeIndex == readIndex)
{
cout << "\nQueue is empty." << endl;
}
else if (writeIndex < readIndex)
{
cout << "\nThe elements in the queue are: ";
for(int k = 0; k < MAX_ELEMS - readIndex; k++)
{
cout << queue[readIndex + k] << ", ";
}
for(int k = 0; k < writeIndex; k++)
{
cout << queue[k] << ", ";
}
cout << endl;
}
else
{
cout << "\nThe elements in the queue are: ";
for(int k = 0; k < writeIndex - readIndex; k++)
{
cout << queue[readIndex + k] << ", ";
}
cout << endl;
}
}
void PrintLatestNumbersInQueue()
{
cout << "\nThe latest numbers added to the queue were: ";
for(int i = 0; i < VALUES_AT_A_TIME; i++)
{
cout << history[i] << ", ";
}
cout << endl;
}
|
Now, given that the code is supposed to run in parallel, I would have expected the Consumer section to at some point randomly run before the Producer section and output its error message, but that never happens - it always reads a number, as if my code were executing serially. Is my conception of OpenMP threads/sections correct, i.e. the sections run in parallel and if the consumer gets to the verification point before the producer has a chance to put a number in the queue, it should output the error message? Is it just that my producer is running faster than my consumer? Why am I never getting the "error message" output that I get if I run them randomly (albeit in the serial version)?