Boost acceptor on existing live sockets


I am new to network programming. I've started by creating a class to accept connections using Boost asio. I'd like to know how to detect if a connection is already established and therefore not create a new socket.
Currently the acceptor creates a new socket.

This is the code for the acceptor:
header:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#ifndef TCP_ACCEPTOR_H
#define TCP_ACCEPTOR_H
#include <boost/asio.hpp>
#include <atomic>
#include <memory>
#include "tcp_service.h"

using namespace boost;

class tcp_acceptor {
public:
  tcp_acceptor(asio::io_service& ios, unsigned short port);
  void start();
  void stop();

private:
  void init_accept();
  void on_accept(const boost::system::error_code& ec
                ,std::shared_ptr<asio::ip::tcp::socket> sock);
private:
  asio::io_service& m_ios;
  asio::ip::tcp::acceptor acc;
  std::atomic<bool> inactive;
};

#endif // TCP_ACCEPTOR_H 

definitions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include "tcp_acceptor.h"
#include "logger.h"

extern logger exception_log;

tcp_acceptor::tcp_acceptor(asio::io_service& ios, unsigned short port) :
                   m_ios(ios)
                  ,acc(m_ios
                      ,asio::ip::tcp::endpoint(asio::ip::address_v4::any()
                                              ,port)
                       )
                  ,inactive(false)  { }

void tcp_acceptor::start() {
  acc.listen();
  init_accept();
}

void tcp_acceptor::stop() {
  inactive.store(true);
}

void tcp_acceptor::init_accept() {
    std::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(m_ios));
    acc.async_accept(*sock.get(),
                    [this, sock](const boost::system::error_code& ec)
                                   { on_accept(ec, sock); }
                     );
}

void tcp_acceptor::on_accept(const boost::system::error_code& ec
                            ,std::shared_ptr<asio::ip::tcp::socket> sock) {
  if (ec == 0)
    (new tcp_service(sock))->start_handling();
  else
      excep_log("Error code = " + std::to_string(ec.value()) + ": "
                                + ec.message());
  if (!inactive.load())
    init_accept();
  else
    acc.close();
}


So the following std::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(m_ios)); creates a new socket, and then following that, a new tcp_service is created in the on_accept function.

I would like tcp_service to act like a session between client and server, so the client can send multiple messages as and when, and the server responds upon each client message sent. However, the acceptor creates a new socket, so the question is how can the acceptor detect if a connection has a live socket?

Maybe I am misunderstanding how networking is supposed to work, since all the examples I have looked at, the acceptors create new sockets.

Currently I cannot send more than a single message from the client without creating a new connection.
Maybe I am misunderstanding how networking is supposed to work,
Kinda.

TCP is a stream protocol. Partitioning of data over it is application defined, not protocol defined. So a TCP server side handler is concerned with accepting a new connection, reading/writing over it, then closing it. It doesn't care that the payload may be HTTP or that the server returns HTML ...

thanks for the reminder, I guess the question is somewhat broader. Basically, I'd like my client to do something like this:

pseudo code
1
2
3
4
5
6
7
8
client.Connect()
client.Perform_Some_Action("Message 1");

// wait for response 

client.Perform_Some_Action("Message 2");
// wait for another response 
client.close();

The code I have currently closes the connection after the first message, similar to the echo server, one of the Boost example.

I've noticed that not many Boost questions get answered on this forum, I think it might be that the samples are too big and the same applies to my code.
Thought I'd post a cut-down version of the async server, just in case somebody can possibly help out?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <boost/asio.hpp>

#include <thread>
#include <atomic>
#include <memory>
#include <iostream>

using namespace boost;

class tcp_service {
private:
	std::shared_ptr<asio::ip::tcp::socket> sk;
	std::string response;
	asio::streambuf buf;
	void on_request_received(const boost::system::error_code& ec, std::size_t bytes_transferred) {
		if (ec != 0) {
			std::cout << "Error code = " << ec.value()	<< " : " << ec.message();
			terminate();
			return;
		}
		// Process the request.
		response = process_request(buf);
		// Initiate asynchronous write operation.
		asio::async_write(*sk.get(), asio::buffer(response),
      [this] (const boost::system::error_code& ec, std::size_t bytes_transferred)
            { on_response_sent(ec, bytes_transferred); });
	}

	std::string process_request(asio::streambuf & request) {
		// Do some work.....
		std::this_thread::sleep_for(std::chrono::milliseconds(300));

    boost::asio::streambuf::const_buffers_type bufs = request.data();
    std::string str(boost::asio::buffers_begin(bufs), boost::asio::buffers_begin(bufs) + request.size());
    
		std::string ret { "Response message " + str + "\n"};
		return ret;
	}

	void on_response_sent(const boost::system::error_code& ec, std::size_t bytes_transferred) {
		if (ec != 0) {
			std::cout << "Error code = " << ec.value()	<< " : " << ec.message();
		}
		// terminate();
	}

	void terminate() { delete this; } // Cleanup

public:
	tcp_service(std::shared_ptr<asio::ip::tcp::socket> sock) : sk(sock)	{ }
	void start_handling() {
		asio::async_read_until(*sk.get(), buf, '\n',
                   [this]( const boost::system::error_code& ec, std::size_t bytes_transferred)
                              { on_request_received(ec, bytes_transferred); }
                          );
	}
};

class tcp_acceptor {
public:
	tcp_acceptor(asio::io_service& ios, short port) :
          m_ios(ios),
          acc(m_ios,	asio::ip::tcp::endpoint( asio::ip::address_v4::any(), port)),
          inactive(false)	{ }

	// Start accepting connection requests.
	void start() {
		acc.listen();
		init_accept();
	}

	// Stop accepting connection requests.
	void stop() {
		inactive.store(true);
	}

private:
	void init_accept() {
		std::shared_ptr<asio::ip::tcp::socket> sk(new asio::ip::tcp::socket(m_ios));
		acc.async_accept(*sk.get(),
			[this, sk](	const boost::system::error_code& ec) {	on_accept(ec, sk); });
	}

	void on_accept(const boost::system::error_code& ec, std::shared_ptr<asio::ip::tcp::socket> sk)	{
		if (ec == 0)
			(new tcp_service(sk))->start_handling();
		else
			std::cout << "Error code = "	<< ec.value()	<< " : " << ec.message();

		if (!inactive.load())
			init_accept(); // Initialise accept
		else
			acc.close(); // Stop accepting connections and free resources.
	}

private:
	asio::io_service& m_ios;
	asio::ip::tcp::acceptor acc;
	std::atomic<bool> inactive;
};

class tcp_server {
public:
	tcp_server() { work.reset(new asio::io_service::work(ios) ); }

	// Start the server.
	void start(short port, short thread_pool_size) {
		// Create and start tcp_acceptor.
		acc.reset(new tcp_acceptor(ios, port));
		acc->start();

		// Create threads and add to the pool.
		for (short i{0}; i < thread_pool_size; i++) {
			std::unique_ptr<std::thread> th(new std::thread([this]() { ios.run(); }));
			thread_pool.push_back(std::move(th));
		}
	}
	
	void stop() { // Stop server.
		acc->stop();
		ios.stop();

		for (auto& th : thread_pool)
			th->join();
	}

private:
	asio::io_service ios;
	std::unique_ptr<asio::io_service::work> work;
	std::unique_ptr<tcp_acceptor> acc;
	std::vector<std::unique_ptr<std::thread>> thread_pool;
};

int main() {
	short port = 3333;
	try {
		tcp_server srv;

		short thread_pool_size {2};
		srv.start(port, thread_pool_size);

                std::string input_str;
                while(input_str != "q" && input_str != "Q")
                  std::cin >> input_str;
		srv.stop();
	}
	catch (system::system_error &e) {
		std::cout << "Error code = "	<< e.code() << ". Message: " << e.what();
	}
	return 0;
}

and the client code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <boost/asio.hpp>
#include <iostream>

using namespace boost;

class tcp_client {
public:
	tcp_client(const std::string& ip_addr,	short port) :	m_ep(asio::ip::address::from_string(ip_addr),	port)
                                                         ,m_sock(m_ios) {
    m_sock.open(m_ep.protocol());
  }

	void connect() {
		m_sock.connect(m_ep);
	}

	void close() {
		m_sock.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
		m_sock.close();
	}

	std::string do_some_work(int x) {
		std::string request = "Request#: " + std::to_string(x) + "\n";
		send_request(request);
		return receive_response();
	};

private:
	void send_request(const std::string& request) {
		asio::write(m_sock, asio::buffer(request), boost::asio::transfer_all());
	}

	std::string receive_response() {
		asio::streambuf buf;
		asio::read_until(m_sock, buf, '\n');

		std::istream input(&buf);
		std::string response;
		std::getline(input, response);

		return response;
	}

private:
	asio::io_service m_ios;
	asio::ip::tcp::endpoint m_ep;
	asio::ip::tcp::socket m_sock;
};

int main() {
	const std::string ip_addr = "127.0.0.1";
	const short port = 3333;

	try {
		tcp_client client(ip_addr, port);

		client.connect(); // Synchronous connect.

                for (int i{0}; i<10; i++) {
                  std::cout << "Sending request to the server... " << std::endl;
                  std::string response = client.do_some_work(i);
                  std::cout << "Response received: " << response << std::endl;
                }
		client.close();
	}
	catch (system::system_error &e) {
		std::cout << "Error code = " << e.code()	<< " : " << e.what();
		return e.code().value();
	}
	return 0;
}
Last edited on
I'd like my client to do something like this:
That's quite reasonable, and it is the way browsers work, they cache the connection and reuse it for different requests.

But that shouldn't be conflated with asynchronous i/o, I think that's where your difficulty lies. Seperate the two issues. Essentially, you have a connection handler in the server that must do:
1
2
3
4
5
while (client is connected)
    read client_request
    process client_request and produce client_reply
    send client_reply
endwhile
thanks, helpful.
Thought I'd post one last update, since I found a very helpful description of how to keep a client connected, once a connection has been established:

https://stackoverflow.com/questions/7039057/boost-asio-for-sync-server-keeping-tcp-session-open-with-google-proto-buffers

It is an async server acceptor issue I was facing and the description in the post describes how to handle such a scenario.
Thanks.
Topic archived. No new replies allowed.