Thread Pool Issues

I'm working on a thread pool library for personal use. It creates a static number of threads and then accepts tasks: nullary function objects that return anything except void (void will be made legal in the future). That method returns a future called ReturnValue that can retrieve the return value for that task.

The real defining feature of the library is that it does not include any thread headers outside its one source file. The interfaces are templated to remain type safe, but the implementation deals in void*. The back-end currently uses boost::thread, which seriously hurts compile times even when placed in a precompiled header.

I'm stuck on implementing one of the thread pool's methods: void finish(). It "blocks" the main thread until all the tasks have finished executing. This is all I can come up with:

1
2
3
4
5
void ThreadPool::finish()
{
    while(!impl->queue.empty())
        boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}


This mostly works, but I recently discovered that it can return before the last few tasks have finished. It waits for the queue of tasks to be empty, but does not wait for the last few tasks to actually finish.

The threads run this function:

1
2
3
4
5
6
7
8
9
void threadFunc(TaskQueue* queue)
{
    while(true)
    {
        TaskQueue::task_t task = queue->pop();
        std::shared_ptr <void> p(task.first()); //call the function from the queue and save the return value
        task.second->set_value(p); //set the promise with the return value
    }
}


The finished threads block in queue->pop():

1
2
3
4
5
6
7
8
9
10
11
auto TaskQueue::pop() -> task_t
{
    boost::unique_lock <boost::mutex> lock(m);

    while(queue.empty())
        c.wait(lock); //here (c is a condition variable, notify_one() called when something is pushed)

    task_t ret(queue.front());
    queue.pop();
    return ret;
}


Based on this, ThreadPool::finish() should return only when the queue is empty and then all the worker threads are blocked in c.wait(lock);. How can I detect this condition? The implementation should be reasonably close to zero-overhead (that is, little to no overhead except when finish is being called).
Last edited on
Poll & Sleep is usually a bad way to go. What you want sounds like a condition variable. Boost has them implemented in its thread library.

They're basically a locking mechanism that waits for a specific event to be triggered (in your case, you want to wait until all tasks are complete).

Upon each thread's completion, they would have to notify the main thread somehow, and then signal its condition variable to be "notified". This will temporarily wake the main thread so it an check its condition. If the condition is still false, it'll go back to sleep. Otherwise it will resume.


Something like this would work:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// owned by main thread
int number_of_tasks = /* however many tasks are executing */
boost::mutex task_count_mutex;
boost::condition_variable task_cv;


// owned by main thread, called by individual task threads when they complete
void TaskComplete()
{
    boost::lock_guard<boost::mutex> lock(task_count_mutex);
    --number_of_tasks;  // decrease number of pending tasks 
    task_cv.notify_one();  // poke the main thread to wake it up
}

// main thread, waiting for tasks to complete
void Waiting()
{
    boost::unique_lock<boost::mutex> lock(task_count_mutex);
    task_cv.wait( lock, [&] () { return (number_of_tasks <= 0); } );
}



EDIT:

Wait you're already using a condition variable. So then you already know about them? Why couldn't you use them for the main thread then? Am I misunderstanding?


Also...
 
c.wait(lock); //here (c is a condition variable, notify_one() called when something is pushed) 


It's possible for a condition variable wait to have "sporadic" wakeups, even when they haven't been notified. Therefore you should pretty much always provide a predicate function to ensure the condition is actually met.
Last edited on
It never occurred to me to count the number of active tasks. That could actually work really well.

I know that a condition variable would be better than this polling wait. I'm just trying to get the code working at this point, and the polling implementation didn't require changes to any other functions.
Topic archived. No new replies allowed.