Recommended way to share data with threads

Hello,

I have a que of zero or more pointers. Pointers are added to this que automatically. Each pointer leads to a message.
There is a thread that process the messages in the correct order and removes the pointer from the que after the message has been processed.

Now I would like to dynamically add an unknown number of threads (depending on the number of client connections) to process the messages.
The problem is that each thread has to process each of the messages (in the correct order), so the message can not be removed from the que until all threads have processed it. The threads can process the message in any order, so I can not be sure that a specific thread will be the last one to process it.

I would like to ask for some advice about how to implement this in an efficient, structured and robust way.

My first thought was to:
1. add a variable to the containing class that indicates the number of threads
2. add a variable to every message object that indicates the number of threads to expect as soon as its pointer is added to the que.
3. reduce the value in the message object with one every time the message object is retrieved from a message object
4. remove any pointer where the counter variable reached zero from the que

The problem with this approach is that one thread might process the same message twice (not a problem) and as a result another thread would not get to process it (this would be a problem).

Does anyone know of a better way to do this?

Thanks in advance, Nico
Normally, each message processor should pop the queue and process that message independently of the others.

It seems you are committed to sequential processing (from your business rules) and so the asynchronous mechanism won't work for you.

I'll state an example to demonstrate one solution.

A Banking application processes account transactions. The transactions can be processed in parallel, but single account transactions must be processed serially.

For example, these transactions occur on an account, starting with $15 in the account.
1. Transfer $20 in from A.
2. Transfer $22 out to B.

Clearly the order is important because if the transfer out runs first, it'll fail.

A solution would be to have multiple processors, but route account messages to the same processor. You might use (accno % nprocessors) to determine the processor for an account. Each processor has it's own queue, and account messages are taken off the main queue and sent to the right processor queue. Non account messages are sent to the next free processor.
Thank you for your example. I think my intention was not clear enough.

As an alternative example consider a class of students with a single teacher:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- the teacher puts this weeks time table in the que.
- every student (message processing thread) retrieves the time table and starts to use it
<< I would like to remove the time table after everybody knows it
- the teacher puts a change notification in the que
- every student retrieves the notification and updates his agenda
<< I would like to remove the notification after everybody knows it
- a new student is added to the class and gets the most current time table
- the teacher becomes ill and puts a class cancelation notification in the que
- every student including the new one retrieves the notification and enjoys a free day
<< I would like to remove the notification after everybody (including the new student) knows it
- a student moves and leaves the class
- the teacher puts a change notification in the que
- every student (message processing thread) retrieves the notification and updates his agenda
<< I would like to remove the notification after everybody (except the student that left) knows it
etc.


I was thinking about a single que that was accessible to every thread. That would make it easy to add message pointers, but it makes it difficult to determine when to delete a pointer.

If every processing thread (student) has its own que the problem changes and might indeed become easier. In that case the challenge would become how to add a message to the que of a dynamically created, already running std::thread.

Is it possible to put std::threads in a vector? And if so, is there a way to interact with a running thread (set or get thread local variable)?
If both are possible it would become possible to check if the thread has finished/disconnected and add messages to the que of each active thread.

Kind regards, Nico
Last edited on
Well that's more than message queuing and processing.

You're really dealing with a single persistent state that can be modified by messages. You also imply a pub/sub mechanism for clients that can accept amendments, and a full refresh mechanism for new clients.
I am not sure. I can already accept a new connection and create a new thread for it (including the "full refresh" that you mention (this is a simple initialization upon thread creation)).

Also the "state" is maintained inside the thread and updating in multiple threads is no different from doing it in a single thread.

So the problem that I am struggling with and where I would like to get some advice is actually how to get the messages to each of the threads in a proper way.
- a new student is added to the class and gets the most current time table
This implies a new connection receiving historic state.

Where will it get that full state from?
Sometimes code is the best description. In the below sample I used a single integer as extremely short "que" to keep the sample small. I intend to use a std::deque, such that the messages can always be placed in the que without disrupting the processing of the que content.
The problem is that none of the threads can determine they are the one that can reset the variable (or in the case of the std::deque the one that should pop_front)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

int someVariable = 0;
bool keepGoing = true;
std::mutex coutMutex;


void processingUpdates(int index, int initialValue)
{
    bool thisIsTheLastThreadToProcessTheChange = false;
    int lastValue = initialValue;
    while (keepGoing)
    {
        if (lastValue != someVariable) // check if the front of the que should be processed
        {
            lastValue = someVariable;
            std::unique_lock<std::mutex> locker(coutMutex);
            std::cout << "Thread " << index << " noticed that the value changed to " << lastValue << "\n";
            if (thisIsTheLastThreadToProcessTheChange) someVariable = 0; // or pop_front of the que
        }
        else std::this_thread::sleep_for (std::chrono::milliseconds(100));
    }
}


int main()
{
    for (int i=0; i<5; i++) // start 5 processing threads
    {
        std::thread t = std::thread(&processingUpdates, i, someVariable);
        t.detach();
    }
    while (keepGoing)
    {
        if (someVariable == 0)
        {
            std::cout << "enter the new value or a negative value to stop\n";
            std::cin >> someVariable;
            keepGoing = (someVariable > -1);
        }
        else std::this_thread::sleep_for (std::chrono::milliseconds(100));
    }
    return 0;
}
I found a way to do it, by adding two instead of one additional property to my message objects. One keeps track of the number of threads that have processed the message and the other prevents threads from processing the same message twice.
However, I still think there should be a better way to do this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include <iostream> // std::cout
#include <thread>   // std::thread
#include <mutex>    // std::mutex
#include <chrono>   // std::chrono::system_clock
#include <ctime>    // localtime
#include <iomanip>  // put_time
#include <sstream>  // stringstream
#include <string>

struct Data
{
    int value;
    int counter;
    std::string timestamp;
};


Data someVariable;
int numberOfThreads = 0;
bool keepGoing = true;
std::mutex coutMutex;

std::string getTimestamp()
{
    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
         duration -= seconds;
    // probably the clock won't have enough resolution for nanoseconds on any system, but this will give us the maximum resolution
    auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);
    auto in_time_t = std::chrono::system_clock::to_time_t(now);

    std::stringstream ss;
    ss << std::put_time(std::localtime(&in_time_t), "%Y%m%d %H%M%S.") << std::to_string(nanoseconds.count());
    return ss.str();
}

void processingUpdates(int index)
{
    std::string lastTimestamp = getTimestamp();
    bool thisIsTheLastThreadToProcessTheChange = false;
    while (keepGoing)
    {
        // process messages that we have not processed yet
        if (lastTimestamp.compare(someVariable.timestamp) < 0 )
        {
            std::unique_lock<std::mutex> locker(coutMutex);
            lastTimestamp = someVariable.timestamp;
            someVariable.counter = someVariable.counter + 1;
            std::cout << "Thread " << index << " noticed value " << someVariable.value << " with timestamp " << someVariable.timestamp << ". Counter is now " << someVariable.counter << "\n";
        }
        else std::this_thread::sleep_for (std::chrono::milliseconds(100));
        // clean up messages that have been processed by every thread
        if (someVariable.counter == numberOfThreads)
        {
            std::cout << "Thread " << index << " would delete message with value " << someVariable.value << " and timestamp " << someVariable.timestamp << "\n";
            someVariable.counter = someVariable.counter + 1;
        }
    }
}

int main()
{
    someVariable.counter = 0;
    someVariable.timestamp = getTimestamp();
    someVariable.value = 0;

    for (int i=0; i<5; i++) // start 5 processing threads
    {
        numberOfThreads++;
        std::thread t = std::thread(&processingUpdates, i);
        t.detach();
    }
    while (keepGoing)
    {
        int temp = -1;
        {
            std::unique_lock<std::mutex> locker(coutMutex);
            std::cout << "Enter the new value or a negative value to stop\n";
        }
        std::cin >> temp;
        keepGoing = (temp > -1);

        someVariable.value = temp;
        someVariable.counter = 0;
        someVariable.timestamp = getTimestamp();
        std::this_thread::sleep_for (std::chrono::milliseconds(500));
    }
    return 0;
}
Last edited on
If you think that works for you, ... fine. But, it doesn't seem to meet the requirements you layed out above.
Topic archived. No new replies allowed.