How to approach multithreading asynchronous functions

Hi and thanks for reading,

I am practicing C++ to further get better at writing multi-threaded capable programs.. specifically I am now attempting to solve the problem of using a 3rd party API's set of classes and functions...

Here is the code

helloworld.cpp

1
2
3
4
5
6
7
8
9
  void requestSomeVectorOfData(std::vector<DataPayload> & _return, const std::vector<DataType> & requested_datas, const std::string& request_type) {

    APICOMMAND command_type = ST_DATAREQUEST;

    if(single_test_client.isConnected()) {
      std::cout << "Requesting... multiple datas\n";
      single_test_client.processMessages(_return, command_type, requested_datas); 
    }
  }


this Vector of requests gets pushed down into an area where it's understood.. we want to retrieve 100's or even 100's of thousands of these `DataPayloads`.. and the API seems to have a async function call with callback approach.

My Question is... what is perhaps a good simple way to approach using multi-threading to tackle these calls in the quickest thread-safe manner?

So far my psuedo code approach sounds like this

TestCppClient.cpp implements 3rd party functionality but you write it yourself so to speak...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
  call this function once connection is established
*/
void TestCppClient::processMessages(std::vector<DataPayload> & _return, APICommand request_type, const std::vector<DataType> & payload)
{
	time_t now = time(NULL);

	printf("\tTestCppClient: processMessages(): M_STATE: %d , request_type: %d !\n" , m_state, request_type);
	switch (m_state) {
		case ST_CONNECT:
			printf("\tTestCppClient: processMessages(): ST_CONNECT!\n");
			break;
		case ST_DATAREQUEST:
			printf("\tTestCppClient: processMessages(): ST_DATAREQUEST!\n");
			getalldatas(_return, payload);
			break;
		case ST_DATAREQUEST_ACK:
			printf("\tTestCppClient: processMessages(): ST_DATAREQUEST_ACK\n");
			break;
	}

	m_osSignal.waitForSignal();
	errno = 0;
	m_pReader->processMsgs();
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

void TestCppClient::getalldatas(std::vector<DataPayload> & _return, const std::vector<DataType> & req_load)
{


    // use multi-threading to instantiate a threadpool which has local queues.. or work stealing queues

    // feed each thread I have ( I have 12 right now on my machine in total to work with) an equal amount of "work"

    //.     in each thread.. execute this request via the 3rd party client
           
    //.      m_pClient->reqFundamentalData(request_id, some_single_request_load, report_type_str);

    // finish only when all commands are through?
   
    //m_state = ST_DATAREQUEST;
}


the callback where my requested data will "comeback"

1
2
3
4
5
void TestCppClient::yourReturnedData(RequestId reqId, const std::string& data) {
	printf("\tTestCppClient: yourReturnedData!!!! ReqId: %ld, strlen() = %zu\n", reqId, strlen(data.c_str()));
}

// this data somehow needs to be appended into _return back at the top function call 



my problem is.. how would I detect at a high level if the entire API is blocking my requests and I need to throttle myself?

Perhaps A better approach is write futures?? and bind the returning future.. to when I see the callback with the proper request_id.. and if so append that to my _return at the requestSomeVectorOfData() level?

there must be a simpler way but.. I am quite newb here and still grappling with basics.. but I can write some basic threadpools that can split up work and steal work from each other.. what I am seeing is, if I maybe write my job in the form of a function that returns a future... and wrap the async callback.. I can do some elegant/clean/understandable solutions here... but would love to hear others thoughts how they approach async code when wanting multi-threaded performance

Is it a bother to use async package but get multithreaded performance that's reliable/tunable?

edit: seems this article gives me a good starting point.. implement a use of futures.. but still have a threadpool with stealable work queues.. and perhaps in the future returning function I have a retry mechanism?

source: https://www.spiria.com/en/blog/desktop-software/solving-the-problems-with-the-futures/
Last edited on
what is perhaps a good simple way to approach using multi-threading to tackle these calls in the quickest thread-safe manner?
Why do you think you need multi threading? That third party api seems to have already an async mechanism.


how would I detect at a high level if the entire API is blocking my requests and I need to throttle myself?
If you can use that api with multiple threads depends on whether it is thread safe or not. Maybe it parallelize the requests already so you don't need to do it. That it supports async callback hints to this. It should be in the documentation.


Why do you think you need multi threading? That third party api seems to have already an async mechanism.


I am a bit naive.. but I am feeling without threading I am getting paused on each request due to it going over HTTP/Socket call... under the covers.. and thought I need to introduce a mechanism to not pause on this.

so maybe I simply, at first.. use this single client in a very simple way. The vendor states you can open multiple client instances (up to a certain number).. so perhaps the "pooling" occurs at that level in order to parallelize fully...

Maybe for now then I try just the simple execute all requests in serial... then await their response id in that callback ? or push somehow the callbacks data back into the helloworld.cpp ? Perhaps this is where I am lacking how to get from the callback... back into my helloworld caller functions stack



If you can use that api with multiple threads depends on whether it is thread safe or not. Maybe it parallelize the requests already so you don't need to do it. That it supports async callback hints to this. It should be in the documentation.


As per my response above.. I will test it in a single thread with serial execution.. and maybe experiment with adding a work stealing threadpool.. reason being is this is a WEB based API and of course there are lots of possibilities for IO to pause... and maybe get a baseline... and THEN go back up and pool these testCppClient instances..? Give each worker one of these client instances?

Thanks for your insights!
Last edited on
there are non blocking and blocking socket operations. Before you thread, you need to understand how to use those.
the 2 cent version is that nonblocking reads will just give you nothing if nothing was there this time, then next iteration when you read, you can try again rather than wait on a reply.


If using those clears up the problem, then you may or may not need threads or may be able to reduce it to a 'read socket' background thread. You may want to track by IP address so your I/O is aware of who it is talking to and in where each remote machine is in the conversation? A vector of active connections and its state?
Last edited on
but I am feeling without threading I am getting paused on each request due to it going over HTTP/Socket call... under the covers.. and thought I need to introduce a mechanism to not pause on this.
Yes, you can send only one request at a time per socket.

The vendor states you can open multiple client instances (up to a certain number).. so perhaps the "pooling" occurs at that level in order to parallelize fully...
To speed up the transmition you can have multiple clients in their own thread. So one thread per client.

I would suggest that each thread shares one queue (it must be protected with mutex). So when a thread is idle it takes a request from the queue (if there is one) and send it via the client.
You may feed the requests to that quieue from the main thread.
A vector can be used as well if you have the data beforehand.

You can use the callbacks to determine whether there was an error, but basically the transmission is done when all threads are done.
@jonnin

there are non blocking and blocking socket operations. Before you thread, you need to understand how to use those.


Understanding how my 3rd party wrote their socket code in this case... correct? Since I am simply calling functions which, under the covers... are performing the socket connect(s).. I merely configure IP.. Port.. from a small set of possible ports (it's basically hardcoded by the server I talk to.. which is all proprietary )

What I can do is.. do a naive threading .. see if it chokes or speeds up.. just experiment.. and see how I fare vs a serial execution.. and if the callback error function is called I will push those somewhere in the main thread to tell it to... pause perhaps/somehow

@coder777

To speed up the transmition you can have multiple clients in their own thread. So one thread per client.

...


Thank you again. I'll start with the naive serial execution on 1 client and 1 thread to test but sounds like I can focus in this way soon after with multiple clients.. 1 per thread

just going to research callbacks and getting responses back to the main thread properly without polling

Thank you both!

You can use the callbacks to determine whether there was an error, but basically the transmission is done when all threads are done.


it seems the control flow in the example is one in which there is a STATE variable.. and whenever the callback is called.. it sets this STATE.. and then later processMessages has the job of doing a switch statement and processing that resulting response...

This seems inefficient to me no, or prone to need multiple passes if the STATE == ACKNOWLEDGE CALLBACK ... I cannot find anywhere in the sample code where they via callback... call processMessages themselves..? shall I not just, on every call back... pump up a sort of "call back response vector" with all the request_ids .. and once I've sent the complete transmission of requests over the wire... I merely spend some arbitrary time waiting for all the known request id's to appear in this ... or fill it with error messages to either be retried or ignored?

Sorry if this is cryptic or strange questions.. just never really had to think at this level coming from what i work with day to day usually..

not sure without the whole design. some systems require an ack and if they do not get it, they spam their message until they do. Others use the idea in various ways. If it wants it, you should reply before it decides you did not get the message or that could cause who knows what to happen.
big thanks @jonnin and @coder777

for reference.. I am attempting to learn C++ to push pull data from a service called Interactive Brokers.. their implementation is so bad.. many have rewritten it.. as an way to learn C++ myself.. I have taken up this challenge so thank you for helping me understand a bit..

I can see others have rewritten vast amounts of the code samples.. into new ways.. so I paste one example below in case others get curious

https://github.com/JanBoonen/TwsApiCpp

marking as solved and already confirmed that the _ACK signal is really only there for the sample code.. so far it actually doesn't really do anything required.. so I will focus on the control flow ...

I have been using python and downloading all ticker symbols was taking around 2.5 days.. but now thanks to switching to C++ I got 300 tickers out in a couple seconds.. by my back of the napkin estimate I will be able to scale up this client and have that same 2.5day data download go to roughly 8 minutes! Wow
I did a side by side of a simple program and the python was 10 times slower, only 1/2 a page of code. Its terrible, partly due to how it handles integers as some sort of container of digits or bytes or whatever it does there, and partly for other similar bad ideas. There are alternate pythons that are better, eg (if named right, I put python down a few months ago) cython, scithon, but you have to write for those to get the speedups... and its still slower than c++ even then, but some of the worst of it is cleaned up in those.
Topic archived. No new replies allowed.