Threadpool problem

Hi , i am trying to realize a thread pool with a shared queue.

This is the code.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include "Filename_with_id.h"

#pragma once
class ThreadPool
{

	bool closed;
	int thread_number;
	std::queue<std::string> request;
	std::queue<Filename_with_id> response;
	std::string goalstring;

	std::mutex m_request;
	std::mutex m_response;

	std::condition_variable cv;

public:
	ThreadPool(void);
	ThreadPool(int thread_number,std::string goal_string);
	~ThreadPool(void);
	void start();  // creazione thread, tutti proveranno a leggere dalla coda, solo uno ci riuscirà e fara qualcosa 
					//gli altri aspetteranno che dal main venga insertito qualcosa 
					//il thread che è scappato fara quello che deve fare 
					// scriverà la coda di risposte
					// e ritornera in fila.

	int pusht(std::string r);                 // a disposizione del main per inserire new task
	bool pop();                 // a disposizione dei thread per leggere new task


	bool get_responses(Filename_with_id& result);// a disposizine del main per leggersi i risultati alla fine 
	void close();                //a disposizioen del main per chudere

   bool  put_responses(Filename_with_id& result);

	int File_contain_string(std::string filename);


};



THREADPOOL.CPP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125



#include "stdafx.h"
#include "ThreadPool.h"
 

ThreadPool::ThreadPool(int thread_number,std::string goal_string):thread_number(thread_number),goalstring(goal_string),closed(false)
{
}

ThreadPool::ThreadPool(void)
{
}


ThreadPool::~ThreadPool(void)
{
}

void ThreadPool::start(){

	for (int i=0;i<thread_number;i++){
	std::async(&ThreadPool::pop,this);
	
	}

}


//*************************** 
// Internal   tasks
//*************************** 

bool ThreadPool::put_responses(Filename_with_id& result){
	std::unique_lock<std::mutex> ul_response(m_response);
	 response.push(result);
	return true;
}

bool ThreadPool::pop(){

	//std::cout<<"hello i'am born now "<< std::this_thread::get_id()<<std::endl;

	while(!closed)  {
	std::unique_lock<std::mutex> ul_request(m_request);

	while(request.size()==0){
	cv.wait(ul_request);
	
	}
	 

	std::string afn=request.front();

	std::cout<<"hello i'am wake up for serving "<< afn<<std::endl;

	request.pop();
	cv.notify_all();
	 
	 
    put_responses(Filename_with_id(afn,File_contain_string(afn)));
	 
}
	 return true;

}

//*************************** 
// Main tasks
//*************************** 
int ThreadPool::pusht(std::string r){
	std::unique_lock<std::mutex> ul(m_request);
	 if(closed)return 0;
	 request.push(r);
	 return 1;
}





bool ThreadPool::get_responses(Filename_with_id& result){
	std::unique_lock<std::mutex> ul_response(m_response);
	if(response.size()==0) return false;
	result=response.front();
	response.pop();
	return true;
}

void ThreadPool::close(){
	closed=true;

}

//*************************** 
//  extern function 
//*************************** 


int ThreadPool::File_contain_string(std::string filename){
 
std::ifstream f(filename);
std::string s;

    if(!f) {
        std::cout<<"-->Il file" <<filename <<" non esiste!"<<std::endl;
        return -1;
    }
	//    cout<<"Il file" <<filename <<"  esiste!"<<endl;

    while(f.good()) //fino a quando c'è qualcosa da leggere ..
    {
        //legge tutta la riga dal file e la mette nella variabile s
        std::getline(f, s);
		if(s.find(goalstring)!=std::string::npos) { return 1;}
    }
    f.close(); //chiude il file
 
 
 
 
return 0;
 
}   



I don't understand why,but it seems that every different thread, see a different response queue...someone could help me?
Thanks in advance..
You have a couple design problems here. Though I'm not sure what is necessarily causing the issue you're seeing.

Problem #1: No more than 1 thread will ever do a job at a time. Since the m_request mutex is locked for the duration of the job, this ensures that only 1 thread can be doing a job at a time. This essentially makes this threadpool completely worthless.

Problem #2: You are not guarding access to the 'closed' member variable. Every thread polls it, but polling it during their run loop is not guarded which means there is possibility for race conditions. Remember that all shared variable accesses must be guarded. So you need to guard this not only in your thread loop, but also in your destructor where you're setting it to true. Either that, or make this variable atomic.

Problem #3: You do not wait for all threads to close before you destroy the ThreadPool object. This means destructors for all your locks/mutexes/etc can occur while threads are still running, practically ensuring you get strange/broken behavior when the object is destroyed. This might also lead to deadlocks or crashes.

Not a problem, but something very weird: Why do you have a separate 'start' function? Why not just do that in the constructor? This means the class will be broken / in a bad state if you forget to call start.
Last edited on
Problem #1: No more than 1 thread will ever do a job at a time. Since the m_request mutex is locked for the duration of the job, this ensures that only 1 thread can be doing a job at a time. This essentially makes this threadpool completely worthless.

You are right! i have solved in this way.

std::unique_lock<std::mutex> ul_request(m_request);

while((request.size()==0) && (closed==false)){
cv.wait(ul_request);

}

1
2
3
4
5
6
7
8
9
10
11
12
	
if(closed==true && request.size()==0)  return true; 
 // se la coda è stata chiusa, e la lista è vuota muoio 

	if(closed==false   && request.size()>0){
	////////////////////////////////////////////////
	//  inizio Lavoro sulla coda
	///////////////////////////////////////////////
	std::string afn=request.front();
	std::cout<<"hello i'am wake up for serving "<< afn<<std::endl;
	request.pop();
	ul_request.unlock();
	cv.notify_all(); 
	///////////////////////////////////////////////
	// fine Lavoro sulla coda
	///////////////////////////////////////////////
	put_responses(Filename_with_id(afn,File_contain_string(afn)));



Problem #2: You are not guarding access to the 'closed' member variable. Every thread polls it, but polling it during their run loop is not guarded which means there is possibility for race conditions. Remember that all shared variable accesses must be guarded. So you need to guard this not only in your thread loop, but also in your destructor where you're setting it to true. Either that, or make this variable atomic.

ok , i have to protect it with a new mutex , true?

Problem #3: You do not wait for all threads to close before you destroy the ThreadPool object. This means destructors for all your locks/mutexes/etc can occur while threads are still running, practically ensuring you get strange/broken behavior when the object is destroyed. This might also lead to deadlocks or crashes.

I have introduced a wait method that remain locked till all child dies...
can i insert it also in the destructor?

Not a problem, but something very weird: Why do you have a separate 'start' function? Why not just do that in the constructor? This means the class will be broken / in a bad state if you forget to call start.

I dont know why :) is my first multithread program ...i'm not so sure of what i'am doing :)




Thanks a lot. it is very useful for me!










This is the final code , if it could be useful for someone even if I am sure that is still plenty of bug.
(it's an object for fgrep function)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include "stdafx.h"
#include "ThreadPool.h"
 

ThreadPool::ThreadPool(int thread_number,std::string goal_string):thread_number(thread_number),goalstring(goal_string),closed(false)
{
}

ThreadPool::ThreadPool(void)
{
}


void ThreadPool::wait_die(void)
{
	for (int i=0;i<thread_number;i++){
    f[i].get();
	}
}

ThreadPool::~ThreadPool(void)
{
	 
}


void ThreadPool::start(){

	 

	for (int i=0;i<thread_number;i++){
	f[i]=std::async(&ThreadPool::pop,this);
	
	}


}


//*************************** 
// Internal   tasks
//*************************** 

bool ThreadPool::put_responses(Filename_with_id& result){
	std::unique_lock<std::mutex> ul_response(m_response);
	 response.push(result);
	return true;
}

bool ThreadPool::pop(){

	//std::cout<<"hello i'am born now "<< std::this_thread::get_id()<<std::endl;
	int flag;


	while(!closed)  {
	std::unique_lock<std::mutex> ul_request(m_request);

	while((request.size()==0)  && (closed==false)){
	cv.wait(ul_request);
	
	} 

	if(closed==true && request.size()==0)  return true;  // se la coda è stata chiusa, e la lista è vuota muoio 

	if(closed==false   && request.size()>0){
	////////////////////////////////////////////////
	//  inizio Lavoro sulla coda
	///////////////////////////////////////////////
	std::string afn=request.front();
	std::cout<<"hello i'am wake up for serving "<< afn<<std::endl;
	request.pop();
	ul_request.unlock();
	cv.notify_all(); 
	///////////////////////////////////////////////
	// fine Lavoro sulla coda
	///////////////////////////////////////////////
	put_responses(Filename_with_id(afn,File_contain_string(afn)));
	
	
	if(closed==true && request.size()==0) return true;
	 
	}}

	 return true;

}

//*************************** 
// Main tasks
//*************************** 
int ThreadPool::pusht(std::string r){
	std::unique_lock<std::mutex> ul(m_request);
	 if(closed)return 0;
	 request.push(r);
	 cv.notify_all();
	 return 1;
}





bool ThreadPool::get_responses(Filename_with_id& result){
	std::unique_lock<std::mutex> ul_response(m_response);
	if(response.size()==0) return false;
	result=response.front();
	response.pop();
	return true;
}

void ThreadPool::close(){
	closed=true;  // you have to protect closed,it's a shared variable 
 
	for(int i=0;i<thread_number;i++) cv.notify_all();

}

//*************************** 
//  extern function 
//*************************** 


int ThreadPool::File_contain_string(std::string filename){
 
std::ifstream f(filename);
std::string s;

    if(!f) {
        std::cout<<"-->Il file" <<filename <<" non esiste!"<<std::endl;
        return -1;
    }
	//    cout<<"Il file" <<filename <<"  esiste!"<<endl;

    while(f.good()) //fino a quando c'è qualcosa da leggere ..
    {
        //legge tutta la riga dal file e la mette nella variabile s
        std::getline(f, s);
		if(s.find(goalstring)!=std::string::npos) { return 1;}
    }
    f.close(); //chiude il file
 
 
 
 
return 0;
 
}   

Perhaps it would be better if
a. the thread pool is made generic; it can be given any task to be executed.
b. find a string in a file is just one of those tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#include <atomic>
#include <functional>
#include <string>
#include <fstream>
#include <chrono>

struct thread_pool
{
    explicit thread_pool( std::size_t nthreads = 4 ) // nthreads is number of threads in the pool
    { while( threads.size() <  nthreads ) threads.emplace_back( thread_fun(*this) ) ; }

    ~thread_pool()
    {
        stopping = true ;
        cv.notify_all() ;
        for( auto& t : threads ) t.join() ; // wait for threads to finish
    }

    template < typename FN, typename... ARGS >   
    void push( FN fn, ARGS&&... args ) // could be any task
    {
        {
            std::unique_lock<std::mutex> lock(mutex) ;
            tasks.emplace( std::bind( fn, std::forward<ARGS>(args)... ) ) ;
        }
        cv.notify_one() ;
    }

private:
    thread_pool( const thread_pool& ) = delete ;
    thread_pool& operator= ( const thread_pool& ) = delete ;

    std::vector< std::thread > threads ;
    std::queue< std::function< void() > > tasks ;

    std::mutex mutex ;
    std::condition_variable cv ;
    std::atomic<bool> stopping { false } ;

    struct thread_fun
    {
        explicit thread_fun( thread_pool& p ) : pool(p) {}

        thread_pool& pool ;

        void operator() () const
        {
            while( !pool.stopping )
            {
                std::function< void() > task ;

                {
                    std::unique_lock<std::mutex> lock( pool.mutex ) ;
                    while( !pool.stopping && pool.tasks.empty() ) pool.cv.wait(lock) ;
                    if( pool.stopping ) return ;

                    task = pool.tasks.front() ;
                    pool.tasks.pop() ;
                }

                task() ;
            }
        }
    };
};

struct find_in_file // one of the tasks that could be handed over to a thread pool
{
    void operator() ( const std::string& file_name, const std::string& goal_string )
    {
        std::cout << "task: look for '" << goal_string << "' in file " << file_name + '\n' << std::flush ;
        std::ifstream file( file_name ) ;
        std::string line ;
        while( std::getline(file,line) )
        {
            if( line.find(goal_string) != std::string::npos )
            {
                std::lock_guard<std::mutex> lock(mutex) ;
                result.emplace_back(file_name) ;
                return ;
            }
        }
    }

    static std::vector<std::string> result ;
    static std::mutex mutex ;
};

std::vector<std::string> find_in_file::result ;
std::mutex find_in_file::mutex ;

int main()
{
    const std::string goal_string = "arbitrary precision" ;
    
    {
       thread_pool pool(3) ; // pool with three threads

       pool.push( [] { std::cout << "from task: hello world!\n" << std::flush ; } ) ;
       
       const char* files[] = { __FILE__, "/usr/share/doc/bc/README", "/usr/share/doc/bison/README" } ;
       for( auto f : files ) pool.push( find_in_file(), f, goal_string ) ;

       pool.push( [] { std::cout << "from task: hello again!\n" << std::flush ; } ) ;
       
       std::this_thread::sleep_for( std::chrono::milliseconds(200) ) ; // wait a bit for threads to pick up the tasks
    } // thread poll desztroyed
    
    std::cout << "\nresult:\n-----------\n" ;
    for( auto& str : find_in_file::result ) 
        std::cout << '\'' << goal_string << "' was found in file " << str << '\n' ;
}

http://coliru.stacked-crooked.com/a/0c5818234bcab1bf
Yes sure :) Thanks for the code..i will study it.
Topic archived. No new replies allowed.