Condition Variable problem with multiple threads

I am trying to use Boost Conditional variable in my application to synchronize two different threads as following:

The main thread, will create a TCP server and instance of object called MIH-User and register a callback to an event_handler.

Main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Default MIH event handler.
*
* @param msg Received message.
* @param ec Error code.
*/
void event_handler(odtone::mih::message &msg, const boost::system::error_code &ec)
{
if (ec)
{
    log_(0, __FUNCTION__, " error: ", ec.message());
    return;
}

switch (msg.mid())
{
    // Source Server received HO Complete Message
    case odtone::mih::indication::n2n_ho_complete:
    {
        if (ec)
        {
            log_(0, __FUNCTION__, " error: ", ec.message());
            return;
        }

        mih::id             mobile_id;          // Mobile node MIHF ID TLV
        mih::link_tuple_id  source_id;          // Source Link ID TLV
        mih::link_tuple_id  target_id;          // Target Link ID TLV
        mih::ho_result      ho_res;             // Handover Result TLV

        // Deserialize received MIH message "N2N Handover Complete Indication"
        msg >> mih::indication()
            & mih::tlv_mobile_node_mihf_id(mobile_id)
            & mih::tlv_link_identifier(source_id)
            & mih::tlv_new_link_identifier(target_id)
            & mih::tlv_ho_result(ho_res);

        log_(0, "has received a N2N_HO_Complete.Indication with HO-Result=", ho_res.get(), 
            " from ", msg.source().to_string(), ", for Mobile-IP=", mobile_id.to_string());

        // Find the source transaction which corresponds to this Indication
        src_transaction_ptr t;
        tpool->find(msg.source(), mobile_id.to_string(), t);
        {
            boost::lock_guard<boost::mutex> lock(t->mut);
            t->response_received = true;
            t->ho_complete_result = ho_res;
            t->tid = msg.tid();
        }
        t->cond.notify_one();
    }
    break;

}
}

int main(int argc, char **argv)
{
        odtone::setup_crash_handler();
        boost::asio::io_service ios;

    sap::user usr(cfg, ios, boost::bind(&event_handler, _1, _2));
    mMihf = &usr;

    // Register the MIH-Usr with the local MIHF
    register_mih_user(cfg);

    // Pool of pending transactions with peer mihfs
    ho_transaction_pool pool(ios);
    tpool = &pool;

    // The io_service object provides I/O services, such as sockets, 
    // that the server object will use.
    tcp_server server(ios, cfg.get<ushort>(kConf_Server_Port));
}


The TCP server will listen for new incoming connections and upon the reception of a new connection it will create a new thread corresponding to a source transaction machine also it will add it to a common transaction pool as following:

TCP Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void handle_request(std::string arg1,std::string arg2)
{
    src_transaction_ptr t(new src_transaction(arg1, arg2));
    tpool->add(t);
    t->run();
}

void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
{
    if (!error)
    {
        // Split received message defining ";" as a delimiter
        std::vector<std::string> strs;
        boost::split(strs, mMessage, boost::is_any_of(":"));
        log_(0, "Received Message from TCP Client: ", mMessage);

        // The first value is the HO Command Initiation message
        if ((strs.at(0).compare("INIT") == 0) && (strs.size() == 3))
        {
            // The second value is the MIHF ID and the third is the IP address
            // Start Source transaction if we receive "Init-Message"
            boost::thread thrd(&tcp_connection::handle_request, this, strs.at(1), strs.at(2));
        }
        else if ((strs.at(0).compare("TEST") == 0) && (strs.size() == 3))
        {
            int max_iterations = atoi(strs.at(2).c_str());
            for (int i = 1; i <= max_iterations; i++)
            {
                boost::thread thrd(&tcp_connection::handle_request,
                    this, strs.at(1), boost::lexical_cast<std::string>(i));
            }
        }
        else
            log_(0, "Error: Unrecognized message.");

        memset(&mMessage[0], 0, max_length);

        mSocket.async_read_some(boost::asio::buffer(mMessage, max_length),
            boost::bind(&tcp_connection::handle_read, shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    }
}


The source transaction machine will move between different states and in one of the states it will have to freeze the execution until it receives an indication through the main thread which is the "n2n_ho_complete" at this time, it will set the response_received to ture as following:

Source Transaction Machine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Run Source State Machine transaction.
*/
void src_transaction::run()
{        
// Previuos states.

wait_ho_complete_indication_state:
{
    log_(1, "is in SRC_WAIT_HO_COMPLETE_INDICATION State for Mobile IP=", ip_address);
    mState = SRC_WAIT_HO_COMPLETE_INDICATION;

    boost::unique_lock<boost::mutex> lock(mut);
    while (!response_received)
    {
        cond.wait(lock);
    }

    response_received = false;

    // Do some stuff
}

    // Other states

return;
}


The response_received is a public variable and each instance of the class has its own variable. When an indication is received through the main thread, it will look for the source transaction that matches that indication and sets its response_received to true.

So my problem is: whenever I try to execute the code, the whole program hangs on the wait_ho_complete_indication_state ,and the program doesn't respond to anything. And for example if I request the creation of a 10 threads for a source transaction. The program will create all of them and they start to work concurrently, until one of them reaches the wait_ho_complete_indication_state , then everything freezes. Even the main thread doesn't respond at all, even if it received an indication throught the event_handler.

So is my code correct for using the conditional variable?

Please help with this issue.

Thanks a lot.
Topic archived. No new replies allowed.