Loosing memory, where does it go?

Hello,

I have made a class to send UDP-messages to a specific ip-address and port. Every time I have send a message I should receive a message back. This works as intended, but I notice that the memory consumed by the executable keeps increasing.
I have reduced the program to the point where the main function creates an UdpInterface object and calls sendMessage 999999 times in a for-loop with a 10ms delay, so I am fairly certain that the memory is lost in the UdpInterface.
I have also checked if the memory is released after the loop is finished, this did not happen, so it does not seem to be the result of a backlog.

Still, I can't find a leak and I can't find a way to keep the memory use of this class stable. What am I doing wrong?

Kind regards, Nico

UdpInterfacer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#ifndef UdpInterfacer_H
#define UdpInterfacer_H

#ifndef WIN32_LEAN_AND_MEAN
    #define WIN32_LEAN_AND_MEAN // prevent problems with multiple includes of winsock2.h and their order
#endif // WIN32_LEAN_AND_MEAN

#include <winsock2.h>                           // socket communication
#include <iostream>                             // std::cout
#include <string>                               // std::string
#include <thread>                               // std::thread
#include <mutex>                                // std::mutex, std::unique_lock<std::mutex>

/** \brief This class will send messages to the receiver over TCP.
 * Remember to link to libws2_32.a or #pragma comment(lib, "ws2_32.lib")
 */
class UdpInterfacer
{
    public:
        UdpInterfacer(std::string ipAddress, unsigned int port);
        virtual ~UdpInterfacer();
        void connect();
        void disconnect();
        bool connected();
        std::string sendMessage(std::string message);
		std::string sendMessage(std::string message, int bufferSize);

    private:
        WSADATA                                             wsaData;
        SOCKET                                              sending_socket;
        SOCKET	                                            receiving_socket;
        SOCKADDR_IN                                         receiverAddr;
        SOCKADDR_IN                                         server;
        bool                                                keepListening;
        std::string                                         ip;
        std::string                                         replyMessageString;
        unsigned int                                        portNumber;
        std::mutex                                          rpcSenderMutex;
        std::mutex                                          rpcReceiverMutex;
        std::thread                                         rpcReceiverThread;
        void receiveMessages();
        void processMessages(std::string message);
};
#endif

UdpInterfacer.cpp
[code]#include "UdpInterfacer.h"

UdpInterfacer::UdpInterfacer(std::string ipAddress, unsigned int port)
{
    keepListening = true;
    ip = ipAddress;
    {
        std::unique_lock<std::mutex> locker(rpcReceiverMutex);
        replyMessageString = "";
    }
    portNumber = port;
    connect();
}

void UdpInterfacer::connect()
{
    try
    {
        std::unique_lock<std::mutex> locker(rpcSenderMutex);
        int ret = WSAStartup(0x101,&wsaData);
        if(ret != 0)
        {
            WSACleanup();
            return;
        }
        sending_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        if(sending_socket == INVALID_SOCKET)
        {
            closesocket(sending_socket);
            WSACleanup();
            return;
        }
        receiving_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        if(receiving_socket == INVALID_SOCKET)
        {
            closesocket(sending_socket);
            closesocket(receiving_socket);
            WSACleanup();
            return;
        }
        receiverAddr.sin_family = AF_INET;
        receiverAddr.sin_port = htons(portNumber);                // The port where we listen for messages
        receiverAddr.sin_addr.s_addr = htonl(INADDR_ANY);         // From any address (0.0.0.0)
        if (bind(receiving_socket, (SOCKADDR *)&receiverAddr, sizeof(receiverAddr)) == SOCKET_ERROR)
        {
            closesocket(sending_socket);
            closesocket(receiving_socket);
            WSACleanup();
            return;
        }
        server.sin_addr.s_addr = inet_addr(ip.c_str());
        server.sin_family = AF_INET;
        server.sin_port = htons(portNumber + 1);
        rpcReceiverThread = std::thread(&UdpInterfacer::receiveMessages, this);
    }
    catch (std::system_error const& e)
    {
        disconnect();
        std::cerr << "RpcSender::connect std::system_error=" << std::to_string(e.code().value()) << " message=" << std::string(e.what()) << " while sending connecting to socket.\n";
    }
    catch(std::exception ex)
    {
        disconnect();
        std::cerr << "RpcSender::connect exception while connecting to socket. \"" << ex.what() << "\".\n";
    }
    catch (...)
    {
        disconnect();
        std::cerr << "RpcSender::connect unknown error has occurred while connecting to socket.\n";
    }
}

void UdpInterfacer::disconnect()
{
    keepListening = false;
    rpcReceiverThread.join();
    closesocket(sending_socket);
    closesocket(receiving_socket);
    WSACleanup();
}

UdpInterfacer::~UdpInterfacer()
{
    disconnect();
}

bool UdpInterfacer::connected()
{
    return true;
}

std::string UdpInterfacer::sendMessage(std::string message)
{
    return UdpInterfacer::sendMessage(message, 65536);
}

std::string UdpInterfacer::sendMessage(std::string message, int bufferSize)
{
    try
    {
        std::cout << "Sending: " << message << "\n";
        {
            std::unique_lock<std::mutex> locker(rpcReceiverMutex);
            replyMessageString = ""; // prepare t receive a response
            sendto(sending_socket, message.c_str(), message.size(), 0, (SOCKADDR *)&server, sizeof(server));
        }
        // Wait for the response
        while (replyMessageString.size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(5));
        // return the response
        return replyMessageString;
    }
    catch (std::system_error const& e)
    {
        disconnect();
        std::cerr << "RpcSender::sendMessage  std::system_error=" << std::to_string(e.code().value()) << " message=" << std::string(e.what()) << " while sending \"" << message << "\".\n";
    }
    catch(std::exception ex)
    {
        disconnect();
        std::cerr << "RpcSender::sendMessage  exception while sending \"" << message << "\". \"" << ex.what() << "\"\n";
    }
    catch (...)
    {
        disconnect();
        std::cerr << "RpcSender::sendMessage  unknown error.\n";
    }
    return "";
}

void UdpInterfacer::receiveMessages()
{
    SOCKADDR            senderAddr;
    int                 senderAddrSize = sizeof(senderAddr);

    while (keepListening)
    {
        char receive_buff[65537];
        int byteCount1 = recvfrom(receiving_socket, receive_buff, sizeof(receive_buff), 0, &senderAddr, &senderAddrSize); // call will block until something is received (what happens rather frequently)
        if (byteCount1 != SOCKET_ERROR)
        {
                byteCount1 = std::min(byteCount1, (int) sizeof(receive_buff)-1);
                receive_buff[byteCount1] = '\0';
                std::cout << "Received: " << receive_buff << "\n";
                std::string message(receive_buff);
                std::thread t = std::thread(&UdpInterfacer::processMessages, this, message);
                t.detach();
        }
        else /// communication error occurred, invalid message received
        {
            std::cout << "Socket error while listening for incomming messages " << std::to_string(WSAGetLastError());
            disconnect();
        }
    }
}

void UdpInterfacer::processMessages(std::string message)
{
    if ( message.find("message")!= std::string::npos )
    {
        std::unique_lock<std::mutex> locker(rpcReceiverMutex);
        replyMessageString = message;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(5));
}


main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <thread>
#include <chrono>

#include "src/network/UdpInterfacer.h"

int main()
{
    UdpInterfacer udpInterface("127.0.0.1",11220);
    for (int i=0; i<999999; i++)
    {
        udpInterface.sendMessage("New value is "+std::to_string(i));
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    return 0;
}
I don't know if it has anything to do with the issue you describe but I noticed that some of the variables can be read and modified simultaneously by different threads without the protection of a mutex.

The while loop in sendMessage need to lock rpcReceiverMutex while reading the size of replyMessageString.

 
while (replyMessageString.size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(5));

The keepListening variable also need some protection.

 
keepListening = true;
 
while (keepListening)


Also note that UDP is unreliable. Packets can be lost, duplicated, or arrive out of order so it is not a good idea to just send a message and then wait for a response. It might work flawlessly if you're sending and receive from the same computer, or even on the same LAN, but over the Internet you'll definitely be in trouble.
Topic archived. No new replies allowed.