I've simplified the code and removed the function that connected to the database .
Instead, an unordered map is updated within each of four callable objects. Each callable object uses a unique key, 1 to 4, to update the map, so there should be no conflicts.
I am performing an explicit shutdown by calling "stop_service", which performs a join_all and then explicitly stops the io_service object (ios).
I expected the map to have four string entries, but the output varies from one execution to another.
$ ./thread_pool_test2
more_work( int )
work()
In trystruct work
1:work done
2:worker done
4:do_try done
3:more_work done
$ ./thread_pool_test2
more_work( int )struct work
work()
1:work done
2:worker done
3:more_work done
So for the first execution the unordered map shows all 4 entries, but for the second not.
Can anybody explain the behaviour, given the fact that a join_all is performed.
Because ios.stop(); is required to stop io_service::run. Otherwise the service simply keeps running.
What you can do is introducing another variable like threads_running which is incremented at the beginning of wrap_job(). In stop_service() you sleep for a while (say 100ms) in a loop until the required amount of threads are running. You can make threads_running atomic but it doesn't need a lock.
thanks, I've included a boost::this_thread::sleep_for(boost::chrono::seconds{5});
in stop_service and the hash table entries appear as expected on each execution.
Whilst including the delay is not full proof, at least it proves that the updating can occur.
There is currently no member function in Boost::thread_group that indicates if threads are currently running or not.
I plan to use the thread pool class to insert and update 2 different databases on separate servers.
FWIW this edit to the original example works for me:
1 2 3 4
void stop_service() {
work_ios.reset(); // allow the worker threads to exit when done
thread_grp.join_all(); // wait for the worker threads to finish work
}
where work_ios was changed to std::unique_ptr<boost::asio::io_service::work> rather than a direct member, and of course lock_guards added around all the ht[x] ='s.
thanks for the tip on resetting work object. Really helpful.
With regard to the hash table updates:
The documentation states:
Different elements in the same container can be modified concurrently by different threads, except for the elements of std::vector<bool> (for example, a vector of std::future objects can be receiving values from multiple threads).
So, for the above case, if the key is guaranteed to be unique, then hash table is ideal candidate as a container and thread safe (based on key usage) without using a mutex.
if the key is guaranteed to be unique, then hash table is ideal candidate as a container and thread safe (based on key usage) without using a mutex.
hash table is not an array: unordered_map::operator[] can rebuild the table and invalidate every iterator. Actually even without rehash, concurrent insert would be a data race on a collision as you'd be modifying the same linked list.
In other words, you're not modifying elements, you are inserting them.
If the key was unique, would you in fact end up with collisions ?
If not, presumably there would be no linked list to accommodate contentious "buckets", only an associative key and bucket.
In addition, reserve ensures that the buckets are pre-allocated. Also, buckets are moved around, not copied if a resize does ever occur.
Lastly, I always use the key for access, so no iteration is ever performed over a hash table.
I may change my mind however, I'll be monitoring collisions based on bucket_count and bucket_size.
If the key was unique, would you in fact end up with collisions ?
Yes. Guaranteed if the number of buckets is less than the number of distinct keys, possible, with decreasing probability, as the number of buckets grows.
Also, buckets are moved around, not copied if a resize does ever occur
Nodes are what's moved. Attempting to access the map concurrently with that is a data race - there's no telling what your operator[] or .get or iterator dereference would possibly see! You could modify elements concurrent with rehash if you preserved pointers/references (not iterators) to those elements before the rehash began, though.
I always use the key for access, so no iteration is ever performed over a hash table.
Using a key for access is a data race against unordered_map insert. Once you built the map (and synchronized that last insert with the first access by key), then sure, read and modify your existing elements, by key or by iteration or however you want - the cppreference quote would apply then.
I only listed the first few reasons why this is invalid that came to mind. Proper approach is to start with code that does not violate container thread safety guarantees and then see if there are optimization opportunities with alternatives.
unfortunately the execution occasionally results in undefined behaviour.
So mostly it executes as expected, but occasionally an undefined behaviour type error occurs, for example:
I've removed the increment/decrement from wrap_job. My previous post did not have it included. I've also removed the atomic.
You mentioned "Why do you use io_service at all? Seems like you are doing nothing with it."
A thread is launched with: thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
A thread generally runs till it completes or is terminated.
To allow a thread to run multiple callable objects while it is always running, it will run a function that is read from a queue, hence io_service::run(). io_serivce::run() however stops running when there is no more work. To prevent ios from terminating io_service::work is used. The Boost documentation can explain this in more detail.
Callable objects are placed onto the queue either by dispatch or post.
Refer to this note, https://stackoverflow.com/questions/2326588/boost-asio-io-service-dispatch-vs-post, for the difference between the two.
So the following ios.post( boost::bind( &thread_pool::wrap_job, this, boost::function< void() >(job) )); places the object onto the queue, and the method &thread_pool::wrap_job runs it.
The reason I am using an unordered_map is more is because it is an associative container, an array would not be suitable for the application in mind.
Cubbi wrote:
FWIW this edit to the original example works for me:
Well, destructing the service object will stop the processing. So isn't it actually the same as calling ios.stop(); directly?
The way I read the original post, the goal was to wait for all posted jobs to complete, and then shut down the thread pool. If it's okay to abort the posted jobs, then yes, ios.stop or the destructor is all that's needed.
thanks for the code. The reason I did not use a vector of threads is because the boost thread group offers simpler management.
In addition, if I were to use a vector of threads, then I would need to create and manage a thread safe queue, to distribute jobs amongst threads.
Using the boost thread group along with io_service offers the above in a much simpler way.
You mentioned " the goal was to wait for all posted jobs to complete". That is indeed the goal, hence I was appreciative of coder777's suggestion of using work_ios.reset();, which I've included in the stop_service method (as per the previous code post), but forgot to include in the destructor, which I now have.
Regarding the shared_lock, the boost documentation states:
"shared ownership as well as exclusive ownership. This is the standard multiple-reader / single-write model: at most one thread can have exclusive ownership, and if any thread does have exclusive ownership, no other threads can have shared or exclusive ownership. Alternatively, many threads may have shared ownership."
So can one not write exclusively, but read in a shared way using a shared_lock providing one uses a shared_mutex ?
I am unsure, but it seems safe to do it this way.