Clarification on OpenMP sections needed

Hi,

I have a classic project - the producer/consumer problem. I have to do three versions of this: one which is serial, and the other two using openmp sections to parallelize the producer and the consumer. So far, everything compiles and runs without a hitch, but I am not quite sure I'm doing this right.

I made the serial program using two variables representing the read and write index of the queue I'm using to hold data to synchronize the producer and the consumer, so the producer will not insert more data into the queue if it's full and the consumer won't try to read from the queue if it's empty. I carried this over to the openmp implementations and it seems to work as well. However, just to make sure it was working the way I intended, I made the Producer and Consumer do their thing a random number of times in the serial program. The outputs would tell me if the Producer actually filled up the queue or if the Consumer ran into an empty queue. So far so good, everything was working perfectly. However, when I run the OpenMP implementations, in no case do I get the condition where the Consumer runs up against an empty queue, or the Producer runs up against a full queue. The program runs as if it was serial. I have checked my settings (I'm using Visual Studio) and I do indeed have openmp enabled. So I'm wondering why I never run into these error conditions. Does one thread always start (and finish) before the other? Might there be a problem with the way I declared my parallel sections? I mean, the code runs perfectly, maybe a little too perfectly.

Here's my code. First up, the serial implementation. Focus on the function ProducerConsumerSequential:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// ProducerConsumer_Sequential_A.cpp : Defines the entry point for the console application.
//

//*********** This version avoids the use of locks **************//

#include "stdafx.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>

using namespace std;

#define MAX_ELEMS 1024

volatile int readIndex, writeIndex, num, readNum;
volatile int queue[MAX_ELEMS];

void ProducerConsumerSequential();
void Producer();
void Consumer();
void PrintConsumerSum();
void PrintElementsInQueue();

int _tmain(int argc, _TCHAR* argv[])
{
	ProducerConsumerSequential();
	return 0;
}

void ProducerConsumerSequential()
{
	cout << "This is a sequential implementation of the Producer Consumer problem.\n";
	cout << "The producer will insert numbers into a queue a random number of times, \n";
	cout << "then the consumer will extract a random number of times and add the numbers \n";
	cout << "from the queue. If the producer cannot insert (because the queue is full) or \n";
	cout << "the consumer cannot extract (because the queue is empty), an error will be \ndisplayed.\n";

	readIndex = writeIndex = num = readNum = 0;
	int randTimes;
	char userContinue;
	srand(time(NULL));

	while(true)
	{
		randTimes = (rand() % 10) + 1;
		for(int i = 0; i < randTimes; i++)
		{
			Producer();
		}

		randTimes = (rand() % 10) + 1;
		for(int i = 0; i < randTimes; i++)
		{
			Consumer();
		}

		PrintConsumerSum();
		PrintElementsInQueue();

		cout << "\nDo you want to continue? (Y/N)" << endl;
		cin >> userContinue;

		if (userContinue == 'Y' || userContinue == 'y')
		{
		}
		else
		{
			break;
		}
	}

	std::system("Pause");
}

void Producer()
{
	int nextElement = (writeIndex + 1) % MAX_ELEMS;

	if (nextElement != readIndex)
	{
		queue[writeIndex] = num;
		writeIndex = nextElement;
		num++;
		//return;
	}
	else
	{
		cout << "\nPRODUCER: The queue is full; cannot place another number." << endl;
		return;
	}	
}

void Consumer()
{
	if (writeIndex == readIndex)
	{
		cout << "\nCONSUMER: The queue is empty; cannot extract another number." << endl;
		return;
	}
	else
	{
		int nextElement = (readIndex + 1) % MAX_ELEMS;
		readNum += queue[readIndex];
		readIndex = nextElement;
		//return;
	}
}

void PrintConsumerSum()
{
	cout << "\nThe sum of the elements extracted by Consumer so far: " << readNum << endl;
}

void PrintElementsInQueue()
{
	if (writeIndex == readIndex)
	{
		cout << "\nQueue is empty." << endl;
	}
	else if (writeIndex < readIndex)
	{
		cout << "\nThe elements in the queue are: ";
		for(int k = 0; k < MAX_ELEMS - readIndex; k++)
		{
			cout << queue[readIndex + k] << ", ";
		}
		for(int k = 0; k < writeIndex; k++)
		{
			cout << queue[k] << ", ";
		}
		cout << endl;
	}
	else
	{
		cout << "\nThe elements in the queue are: ";
		for(int k = 0; k < writeIndex - readIndex; k++)
		{
			cout << queue[readIndex + k] << ", ";
		}
		cout << endl;
	}
}


Here's my OpenMP implementation. This is the simple one, where the Producer and Consumer are doing one data element at a time. The other version is with 128 elements at a time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// ProducerConsumer_OpenMP_1.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <iostream>
#include <stdlib.h>
#include <omp.h>

using namespace std;

#define MAX_ELEMS 1024
#define VALUES_AT_A_TIME 1

volatile int readIndex, writeIndex, num, readNum;
volatile int queue[MAX_ELEMS], history[VALUES_AT_A_TIME];

void ProducerConsumerOpenMP_1();
void Producer();
void Consumer();
void PrintConsumerSum();
void PrintElementsInQueue();
void PrintLatestNumbersInQueue();

int _tmain(int argc, _TCHAR* argv[])
{
	ProducerConsumerOpenMP_1();
	return 0;
}

void ProducerConsumerOpenMP_1()
{
	cout << "This is an OpenMP implementation of the Producer Consumer problem.\n";
	cout << "The producer will insert numbers into a queue one by one, \n";
	cout << "then the consumer will extract and add the numbers from the queue. \n";
	cout << "If the producer cannot insert (because the queue is full) or the \n";
	cout << "consumer cannot extract (because the queue is empty), an error will be \ndisplayed.\n";

	readIndex = writeIndex = num = readNum = 0;
	char userContinue;

	while(true)
	{
#pragma omp parallel sections
		{
#pragma omp section
			{
				Producer();
			}

#pragma omp section
			{
				Consumer();
			}
		}

		PrintConsumerSum();
		PrintElementsInQueue();
		PrintLatestNumbersInQueue();

		cout << "\nDo you want to continue? (Y/N)" << endl;
		cin >> userContinue;

		if (userContinue == 'Y' || userContinue == 'y')
		{
		}
		else
		{
			break;
		}
	}

	std::system("Pause");
}

void Producer()
{
	int nextElement = (writeIndex + 1) % MAX_ELEMS;

	if (nextElement != readIndex)
	{
		queue[writeIndex] = history[0] = num;
		writeIndex = nextElement;
		num++;
		//return;
	}
	else
	{
		cout << "\nPRODUCER: The queue is full; cannot place another number." << endl;
		return;
	}	
}

void Consumer()
{
	if (writeIndex == readIndex)
	{
		cout << "\nCONSUMER: The queue is empty; cannot extract another number." << endl;
		return;
	}
	else
	{
		int nextElement = (readIndex + 1) % MAX_ELEMS;
		readNum += queue[readIndex];
		readIndex = nextElement;
		//return;
	}
}

void PrintConsumerSum()
{
	cout << "\nThe sum of the elements extracted by Consumer so far: " << readNum << endl;
}

void PrintElementsInQueue()
{
	if (writeIndex == readIndex)
	{
		cout << "\nQueue is empty." << endl;
	}
	else if (writeIndex < readIndex)
	{
		cout << "\nThe elements in the queue are: ";
		for(int k = 0; k < MAX_ELEMS - readIndex; k++)
		{
			cout << queue[readIndex + k] << ", ";
		}
		for(int k = 0; k < writeIndex; k++)
		{
			cout << queue[k] << ", ";
		}
		cout << endl;
	}
	else
	{
		cout << "\nThe elements in the queue are: ";
		for(int k = 0; k < writeIndex - readIndex; k++)
		{
			cout << queue[readIndex + k] << ", ";
		}
		cout << endl;
	}
}

void PrintLatestNumbersInQueue()
{
	cout << "\nThe latest numbers added to the queue were: ";
	for(int i = 0; i < VALUES_AT_A_TIME; i++)
	{
		cout << history[i] << ", ";
	}
	cout << endl;
}


Now, given that the code is supposed to run in parallel, I would have expected the Consumer section to at some point randomly run before the Producer section and output its error message, but that never happens - it always reads a number, as if my code were executing serially. Is my conception of OpenMP threads/sections correct, i.e. the sections run in parallel and if the consumer gets to the verification point before the producer has a chance to put a number in the queue, it should output the error message? Is it just that my producer is running faster than my consumer? Why am I never getting the "error message" output that I get if I run them randomly (albeit in the serial version)?
Topic archived. No new replies allowed.