my thrift server suddenly not working

I have a C++ based Thrift Server coupled with a Python Client (v0.16.0) : https://github.com/apache/thrift

Recently I dusted off the project and started updating some of it's C++ dependencies.. making sure everything was compiling and running.

I am now trying to test the end points.. and getting right up to the point where my server should be returning the object(s) of interest.. but something goes wrong and I am not sure why...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    if (batch_size == 1) {
      std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() Queries list is less than # of clients\n";
      std::future<std::vector<twsapithrift::IBContractDescription>> single_future_result;
      SymbolsDataFetcher single_fetcher = SymbolsDataFetcher(*this->tws_conn_pool);
      single_future_result = std::async(std::launch::async, &SymbolsDataFetcher::process_data, single_fetcher, std::ref(queries_list), 0, queries_list.size());

      std::string concetenated_query;
      for (unsigned int i = 0; i < queries_list.size(); i++) {
        concetenated_query += queries_list[i] + ", ";
      }

      MatchingSymbolResponse response_obj;
      response_obj.request_query = concetenated_query;
      response_obj.matches;
      
      std::vector<twsapithrift::IBContractDescription> result_obj = single_future_result.get();
      for (unsigned int j = 0; j < result_obj.size(); j++) {
        std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() PUSHING RESULT INTO RESPONSE\n";
        response_obj.matches.push_back(result_obj[j]);
      }
      _return.push_back(response_obj);
      std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() Setting response_obj\n";
      return;
    }


Did this code every work? Yes I thought it did.. but now looking at it with fresh eyes I don't see anything wrong here..

Here's what the output looks like

1
2
3
4
5
6
[hndlr-1] request_matching_symbols() PUSHING RESULT INTO RESPONSE
[hndlr-1] request_matching_symbols() PUSHING RESULT INTO RESPONSE
[hndlr-1] request_matching_symbols() Setting response_obj
Thrift: Tue Aug 16 08:21:08 2022 TSocket::write_partial() send() <Host: 127.0.0.1 Port: 42112>: Broken pipe
Thrift: Tue Aug 16 08:21:08 2022 TConnectedClient died: write() send(): Broken pipe
Thrift: Tue Aug 16 08:21:08 2022 TConnectedClient output close failed: Called write on non-open socket


On the otherside the response is empty.. instead of containing what I expect..
Last edited on
Two things:

1. Do you really need a std::future? It looks like you use it like a normal function.

2. When queries_list is modified within the async it is problematic to use it in the loop on line 8 without protection.

According to the log: It looks like you close the connection before all necessary processes are finished. This might be a thread problem.
Thank you @coder777

I ended up discovered I had hardcoded the number of connections in the connection pool... at 32! way too high for a macbook air from 2013 (2 cores 4 threads).

but even when I turned down the # of threads to open to 4 and run on the MBA... it still crashes..

I had, out of laziness, kept the promises in use.. but maybe when the batches == 1 I should just avoid futures... but I am a bit worried why it won't properly halt and wait for the futures `.get()` then..?

I will look into your #2 suggestion as well.. thank you

EDIT: bit of background but the queries list is only read.. never modified.. don't know if that helps ease your mind on its usage here.. but I will bump the code building the concatenation to before the async, thanks!
Last edited on
I ended up discovered I had hardcoded the number of connections in the connection pool... at 32! way too high for a macbook air from 2013 (2 cores 4 threads)
No, I don't think so. Hardware threads are different from software threads. So 32 connections (even more) shouldn't be a problem.

Using thread carelessly might very well cause all kinds of misbehavior.

bit of background but the queries list is only read.. never modified.. don't know if that helps ease your mind on its usage here.. but I will bump the code building the concatenation to before the async
You actually do not provide enough code to tell what is going wrong. But calling process_data of single_fetcher directly is certainly a good idea.

Line 5 might be wrong because single_fetcher might be wrongfully passed (shouldn't it be pointer?).
So I took your advice.. modified to remove the future (since when batch_size == 1 , we don't need concurrency) ... and lo' and behold the bug STILL remains.. leading me to believe the problem is likely inside the data fetcher.. or the connection pool code

I may have to throw out the connection pool logic and start from scratch or think of another way to connect 32 concurrent clients

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

#pragma once

#ifndef DataFetchers_H
#define DataFetchers_H

#include <connection-pool/pool.h>
#include "gen-cpp/ibrokers.h"
#include "tws-client/TestCppClient.h"

struct SymbolsDataFetcher {

  SymbolsDataFetcher(cpool::ConnectionPool& tws_conn_pool): tws_conn_pool(&tws_conn_pool){}

  std::vector<twsapithrift::IBContractDescription> process_data(const std::vector<std::string> & query_vector, const int first_element, const int last_element){

    if (last_element < first_element) {
      throw std::runtime_error("Cannot agree with first and last element inputs");
    }

    auto proxy_conn = this->tws_conn_pool->get_connection();
    auto x = dynamic_cast<TWSConnection*>(proxy_conn.operator->());
    
    if (! x->is_healthy()){
      std::cout << "THE CONNECTION IS NOT HEALTHY" << std::endl;
    } else {
      std::cout << "THE CONNECTION IS HEALTHY" << std::endl;
    }

    std::vector<twsapithrift::IBContractDescription> empty_contract_vect;
    std::cout << "THE starting query" << std::endl;
    x->tws_client.query_matching_tickers(empty_contract_vect, query_vector, first_element, last_element); 
    std::cout << "THE query is started..." << std::endl;
    this->tws_conn_pool->release_connection(std::move(proxy_conn));
    
    return empty_contract_vect;
  }

  private:
    cpool::ConnectionPool * const tws_conn_pool;
};


but thank you for the help... I think my next goal is to learn to use gdb so I can see what is exactly happening
the modified call removing the use of futures is as follows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  void request_matching_symbols(std::vector<MatchingSymbolResponse> & _return, const std::vector<std::string> & queries_list) {
    std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() queries_list.size() == " << queries_list.size() << std::endl;

    int batch_size = calculate_batch_size(queries_list.size(), this->tws_conn_pool->size());
    int remainder  = calculate_remainder(queries_list.size(), this->tws_conn_pool->size());

    if (batch_size == 1) {
      std::string concetenated_query;
      for (unsigned int i = 0; i < queries_list.size(); i++) {
        concetenated_query += queries_list[i] + ", ";
      }

      std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() Queries list is less than # of clients\n";
      
      SymbolsDataFetcher single_fetcher = SymbolsDataFetcher(*this->tws_conn_pool);
      std::vector<twsapithrift::IBContractDescription> result_obj = single_fetcher.process_data(queries_list, 0, queries_list.size());
      
      MatchingSymbolResponse response_obj;
      response_obj.request_query = concetenated_query;
      response_obj.matches;


      for (unsigned int j = 0; j < result_obj.size(); j++) {
        std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() PUSHING RESULT INTO RESPONSE\n";
        response_obj.matches.push_back(result_obj[j]);
      }
      _return.push_back(response_obj);
      std::cout << "[hndlr-" << this->handler_num << "] request_matching_symbols() Setting response_obj\n";
      return;
    }
Last edited on
I think my next goal is to learn to use gdb so I can see what is exactly happening
For using a debugger I recommend using an IDE such as CodeBlocks.

Unfortunately debugger don't work well for thread issues. Instead you might want to increase the number of outputs. For determining the order of execution simple output like std::cout << "1\n!; would suffice.

It is quite ambitions to use a framework with a near to no documentation like this...

What are the types here:
1
2
    auto proxy_conn = this->tws_conn_pool->get_connection();
    auto x = dynamic_cast<TWSConnection*>(proxy_conn.operator->());
Particularly this proxy_conn.operator->() looks kind of weird...

What does get_connection() do? Getting the first free connection something?
When pointer are involved you should check whether they are valid. At least with an assert(x);.

This looks like a possible problem:
1
2
3
    x->tws_client.query_matching_tickers(empty_contract_vect, query_vector, first_element, last_element); 
    std::cout << "THE query is started..." << std::endl;
    this->tws_conn_pool->release_connection(std::move(proxy_conn));
When query_matching_tickers(...) uses a thread then release_connection(...) might end the process prematurely.

By the way: You usually (outside a template) don't need this->. The compiler does it for you.

and lo' and behold the bug STILL remains.
But at least you removed one bug....
Topic archived. No new replies allowed.