Question about multi-threading

Hello c++ fellows,

I have experience in C++, but I never had to work with multi-threading before, so I thought I consult you and ask for help.

My application receives a lot of data (about 250MB/min) and needs to process this data in 2 steps. Because it is so much I need to optimize everything and would like to use multiple cores (if present).

First, I am a bit confused about multi-cores and threads. Sometimes I read that threads are automatically assigned to multiple cores and sometimes I read that even if multiple cores are presents, the threads just run on one. Can you tell me what is actually the case?

Now to my problem in particular. Here you can see all relevant code to understand my problem:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class MyChannel
{
    private:
        unsigned int                id;
        std::vector<MyRect>         record;
        unsigned int                recordPos;
        std::vector<MyRect>         recordProcessBuffer;
        std::vector<ProcessedRect>  recordFurtherProcessingBuffer;
    public:
        void AddRecord(MyRect &newRect);
        void ProcessData();
        void FurtherProcessData();
};

void MyChannel::AddRecord(MyRect &newRect)
{
    if(recordPos == maxRecords -1 )
    {
        std::copy(record.begin(), record.begin()+maxRecords, recordProcessBuffer.begin());
        recordPos=0;
        //ProcessData();
        return;
    }

    record[recordPos]=newRect;

    recordPos++;
}

void MyChannel::ProcessData()
{
    //Do something with recordProcessBuffer and fill recordFurtherProcessingBuffer
}

void MyChannel::FurtherProcessData()
{
    //Do something with recordFurtherProcessingBuffer
}


class MySystem
{
    private:
        std::vector<MyChannel>  channel;
    public:
        static void Callback( void* myClass,  MyRect* records, int NumRecords, int ChannelID );
};

void MySystem::Callback( void* myClass,  MyRect* record, int NumRecords, int ChannelID )
{
    MySystem *thisClass = (MySystem*) myClass;
    for (int i=0; i < NumRecords; i++)
    {
        thisClass->channel[ChannelID].AddRecord(record[i]);
    }
}


MySystem can generate up to 128 channels. My Callback function receives records for a particular channel and adds the records to the channel to save them in "record". If a particular number of records are received (maxrecords) I want to copy the data in a buffer to process it. Instead of polling for the number of records received so far, I thought it is easiest to call the ProcessData() function from within the AddRecord function if maxrecords is reached. But then I wouldn't receive any new records in this particular channel, until the ProcessData function returns, and that can take time. This is why I need to use multi threading. Now I was wondering, what would be the most efficient way to do multi threading in this case, because of the huge amount of data I need to optimize where I can. So I thought about 2 things: To generate a thread in AddRecord() and let the thread do ProcessData(). However, then I end up with up to 128 threads because the time maxrecords is reached is almost equal for all channels. And I could imagine, that this would procude so much overhead that I wouldn't optimize my performance but decrease it (but I don't know because as I said, I have no idea about threads). The other option I thought about is to have 3 threads: One for recieving data, one for processing data and one for further processing. And all channels would use them. However as far as I understand, a thread is deleted once the function that should be executed by the thread returns, but I want to keep them open to let them process another function a few milliseconds later...but I don't think this is possible. So I don't really know what I should do now. The questions that bother me most are:

*How do I gain the optimal performance with threads for my problem on a multi-core processor.
*What libary gives me the best performance for doing this.
*And how excatly do I implement the threading (3 threads total or 3 threads per channel).

Regarding my platform: Due to the API that sends me the records, I have to use the Visual C++ Express compiler. Though, normally I prefer to stay cross-platform and I also have no experience with the WIN API.

I would really appreciate comments and suggestions.

Have a nice day
Mantrid
Last edited on
sometimes I read that threads are automatically assigned to multiple cores and sometimes I read that even if multiple cores are presents, the threads just run on one.

They will execute on whatever cores are available unless you take special measures to assign your threads to particular cores or groups of cores.

I end up with up to 128 threads [...] this would procude so much overhead

Threads on their own don't produce much overhead (not in those amounts anyway), but setup/teardown of a thread is an expensive operation. A workable simple solution could be to setup a pool of 128 worker threads from the beginning, and in Callback(), append the nth buffer to the nth worker thread's input queue when the buffer is full.

3 threads: One for recieving data, one for processing data and one for further processing. And all channels would use them

That makes the logic a bit more complicated (each thread has to receive data from 128 queues: it should be fair and not start with channel[0] at all times), but if each core is fast enough to keep up with the I/O under these conditions, then sure, try that too. I am kinda used to 64-128 core systems, so three threads feel like a waste of CPU bandwidth :)

How do I gain the optimal performance with threads for my problem on a multi-core processor

measure the rate at which one core processes data vs. the rate of input, put together a few prototypes and measure performance.

The questions you should be asking are what's the appropriate way to communicate data between threads: do you use a concurrent queue, and if so, which library, or do you just use container-level locks and condition variable for empty(). How far do you let the queues grow if the worker threads are not making progress (someone is using up your CPU), while the input keeps coming, etc.
Thank you for answering.

They will execute on whatever cores are available unless you take special measures to assign your threads to particular cores or groups of cores.


So if the load of one core is 100%, new threads will be processed on the other core? Thats nice, I don't need to assign threads to particular cores, I just want to make sure if one core is completely busy, other threads will be processed by other cores. So I don't need to do this by hand.

but setup/teardown of a thread is an expensive operation. A workable simple solution could be to setup a pool of 128 worker threads from the beginning


This sounds good. But I thought when a worker thread completes the function it was passed, it is teared down automatically? Or did I got this wrong? Because in Callback(), I would need to pass the AddRecord() function to the specific worker thread but it completes before the next record for this channel arrives; so the thread would already be teared down before the next AddRecord is passed to it. But I'm starting to think I got something very wrong.

The questions you should be asking are what's the appropriate way to communicate data between threads


I didn't think this would be an issue. So the case is, that I am writing an on-line evaluation program for electrophysiological data. The data aquisition center constantly sends the recordings to my program. I save now all the recordings from different channels to my channel objects in a vector called record, which can store exactly 10 seconds of data per channel (or rather the amound of records that accumulate in exactly 10s). I don't want to use insert or push_back because I don't want to spend so much time with memory reallocation, this is why I have a fixed size. Now after all records of 10s are stored, I copy the records in another buffer (recordProcessBuffer) and the records that are comming in after that are stored in record again but from the first position on (recordPos=0). So the main thread writes to record and every 10s to recordProcessBuffer. The worker thread would just read from recordProcessBuffer and write in recordFurtherProcessingBuffer. And another worker thread would just read recordFurtherProcessingBuffer and write somewhere else. So there would be no variable that is read and written by 2 different threads. So the only problem I would have is if a worker thread reads recordProcessBuffer, while the main thread copys new data in it, but this can not happen, because the main thread sends the ProcessData() function after the data is copied. And if the ProcessData function needs longer than 10s to complete (which is the only case where recordProcessBuffer is read while another thread writes it), then I am screwed anyway because I can't analyse the data life anymore if the processing takes longer than the incomming data. Thats why I didn't looked into thread safety so far. I hope one can understand what I wrote. Maybe this is stupid or wishfull thinking, but I thought this would be ok.

which library


Yes that's the question, which library is best for gaining optimal performance? I thought about using boost::thread, but I'm not sure if this is the best choice.
if the load of one core is 100%, new threads will be processed on the other core

the concept of "load" is average over long periods of time. At any moment, each core is either idle or executing some thread. When a new thread becomes runnable (created or awakened), the OS scheduler either places it on an idle core so it begins executing, or stops another running thread, removes it from its core, and places the new thread there, or does nothing.

Because in Callback(), I would need to pass the AddRecord() function to the specific worker thread but it completes before the next record for this channel arrives; so the thread would already be teared down before the next AddRecord is passed to it.

You're thinking of creating a new thread that is executing AddRecord(). I'm thinking of notifying an existing thread that there's data ready to be processed. That existing thread would then call AddRecord()/ProcessData(), and return to waiting once ProcessData() is done.

Speaking of which, copying the filled buffer and just overwriting the old values is rather wasteful, you should move (or swap if your compiler doesn't have moves).

The library doesn't really matter at your scale. I'd go with boost since it's what i know best and since it's trivial to convert to C++ later on (unless your visual studio is VC11, which already does C++ threads)
Last edited on
Thanks again, you are very helpful!

You're thinking of creating a new thread that is executing AddRecord(). I'm thinking of notifying an existing thread that there's data ready to be processed. That existing thread would then call AddRecord()/ProcessData(), and return to waiting once ProcessData() is done.


How exactly do I do this? E.g. I want one thread to process all incomming data, I did this the following way:

In the MySystem class:
1
2
    private:
        boost::thread               receiverThread;


And in the Callback function:
 
thisClass->receiverThread = boost::thread(&MyChannel::AddRecord, &thisClass->channel[ChannelID], record[i]);


However, if I do it this way, multiple threads are started. I already tried the documentation, but there is nothing about letting one unique thread execute several functions consecutively.

I also tried
 
thisClass->receiverThread(&MyChannel::AddRecord, &thisClass->channel[ChannelID], record[i]);

But this generates an error: Expression does not result in a function with 3 parameters.
Topic archived. No new replies allowed.