Clearing out pipes

Feb 17, 2011 at 6:02am
Hey guys I'm writing a program that reads input from a file then searches for a number in that file. It has to do the searching in its child processes and the information must be sent back and forth between pipes.

Right now I've restarted the program from scratch because I got pretty far into it and couldn't figure out why it wouldn't work. I've pinpointed my problem and it seems that whatever I write to the first pipe gets stuck in there for subsequent reads.

Suppose num.txt has 100 numbers in it and the first number is 22. Well the way I have to program it the first number sent over the pipe should be the number of inputs the child process should expect to receive. So for 100 inputs divided among 10 processes the first number sent would be 10. After it reads that it should go into a loop to read the rest of the numbers. But a sample output shows that it keeps reading 10 (the first number sent to it) each time following the initial read.

If I add a usleep(100) command in the write for loop it works fine, but this won't work all the time I'm assuming due to different processor speeds. Basically I'm looking for a way to check if the child process has read from the pipe before writing the next number to the pipe.

Any suggestions?

Here is my code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <fstream>
#include <sys/wait.h>
using namespace std;

#define BUFFER_SIZE 25
#define READ_END	0
#define WRITE_END	1

int main(void)
{
	char write_msg[BUFFER_SIZE];
	char read_msg[BUFFER_SIZE];
	pid_t pid;
	int first_pipe[2];
	int second_pipe[2];
	int i;
	int numToSearch;
	int numProcesses = 10;
	int numInputs = -1;
	string line;
	ifstream inFile ("num.txt");		// Input file containing numbers

	if (pipe(first_pipe) == -1) {
		fprintf(stderr,"First Pipe failed");
		return 1;
	}

 	if (pipe(second_pipe) == -1) {
                fprintf(stderr,"Second Pipe failed");
                return 1;
        }

	// Prompt the user for the number they want to search for
	cout << "Please enter the number you want to search for: "; 
	cin >> numToSearch;

		// Get the number of numbers in the input file
	if (inFile.is_open())
  	{
   		while (inFile.good())
   		 {
			getline (inFile,line);
			numInputs++;
    		 }
		inFile.close();
	}
	
	else
	{
		cout << "Error opening the input file" << endl;
	}

	int inputArray[numInputs]; // Define an array for storing the numbers in the input file
	int numExpected = numInputs / numProcesses;
	inFile.open("num.txt");	// Reopen the file

	// Store the values in an array
	for (i=0; i < numInputs; i++)
	{
		inFile >> inputArray[i];
	}

	pid = fork();

	if (pid < 0) {
		fprintf(stderr, "Fork failed");
		return 1;
	}

	if (pid > 0) {  /* parent process */
		/* close the unused ends of each pipe */
		close(first_pipe[READ_END]);
		close(second_pipe[WRITE_END]);

		/* write the number expected to the pipe first */
		sprintf(write_msg, "%d", numExpected); // Added!!!
		write(first_pipe[WRITE_END], write_msg, strlen(write_msg)+1);
		usleep(100)  ;// ******This is where I need help  *******//

		for (i = 0; i < numExpected; i++)
		{
			sprintf(write_msg, "%d", inputArray[i]); // Added!!!
			write(first_pipe[WRITE_END], write_msg, strlen(write_msg)+1);
			usleep(100); // ******This is where I need help *******//
		}

		/* now close the write end of the pipe */
		close(first_pipe[WRITE_END]);

		wait(NULL);
		/* read the result from the second pipe */
		read(second_pipe[READ_END], read_msg, BUFFER_SIZE);
		printf("parent read >%s<\n",read_msg);

		close(second_pipe[READ_END]);
	}
	else { /* child process */
		/* close the unused ends of the pipes */
		close(first_pipe[WRITE_END]);
		close(second_pipe[READ_END]);

		int childArray[numExpected];
		// Read the number expected first
		read(first_pipe[READ_END], read_msg, BUFFER_SIZE);
		int numComing = atoi(read_msg);		
		cout << "Created child array of " << numComing << " size" << endl;

		/* read from the pipe */
		for (i = 0; i < numExpected; i++)
		{
			read(first_pipe[READ_END], read_msg, BUFFER_SIZE);
			childArray[i] = atoi(read_msg);		
			cout << "childArray [" << i << "] = " << childArray[i] << endl;		
		}
		
		//printf("child read >%s<\n",read_msg);

		/* write to the pipe */
		sprintf(write_msg, "%d", numComing); // Added!!!
		write(second_pipe[WRITE_END], write_msg, strlen(write_msg)+1);
		
		/* close the write end of the pipe */
		close(first_pipe[READ_END]);
		close(second_pipe[WRITE_END]);
	}

	return 0;
}


This is the num.txt file I am using for inputs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
22
5
8
64
21
5
78
6
4
98
1
13
12
12
45
210
5
8
64
21
5
78
6
4
98
-1
13
12
12
45
210
5
8
-64
21
5
78
6
-4
98
1
13
12
12
45
0
5
-8
-64
121
10
5
8
64
21
5
78
6
4
98
1
13
12
12
45
210
5
8
64
21
5
78
6
4
98
-1
13
12
12
45
210
5
8
-64
21
5
78
6
-4
98
1
13
12
12
45
0
5
-8
-64
121

Last edited on Feb 17, 2011 at 6:49am
Feb 17, 2011 at 1:57pm
pipes are stream-oriented in nature. That means that if the writer writes "ABCD" in a single write, the reader will not necessarily read "ABCD" in a single read. It might read "A" in the first read then "BCD" in the second, and so on.

Feb 17, 2011 at 8:56pm
So is there a way to make sure the read has occurred before writing the next value to the pipe?
Feb 18, 2011 at 12:11am
Not with just the pipe; you'd need another synchronization mechanism on top of it, such as a semaphore.
Topic archived. No new replies allowed.