"Boost.Asio C ++ Network Programming." Chapter 4: Client and Server

  • Tutorial
Hello!
I continue to translate John Torjo's book Boost.Asio C ++ Network Programming.

Content:


In this chapter, we are going to delve deeper into creating non-trivial client / server applications using Boost.Asio. You can run and test them, and as soon as you understand them, you can use them as a basis for creating your own applications.


In the following applications:
  • The client logs on to the server with a username (no password)
  • All connections are initiated by the client, where the client requests a response from the server
  • All requests and responses to them end with the character '\ n'
  • The server disconnects any client that does not respond within 5 seconds

The client can make the following requests:
  • Get a list of all connected clients
  • The client can ping, and when it pings the server to respond either ping_ok or ping client_list_chaned (in the latter case, the client re-requests the list of connected clients).

For fun, add a few tricks:
  • Each client application includes 6 connected users, such as John, James, Lucy, Tracy Frank and Abby.
  • Each client checks the connection with the server at a random time (once every 1-7 seconds, so from time to time the connection with the server will be disconnected)


Synchronous Server / Client


First, we implement a synchronous application. You will see that the code is simple and clear to digest. However, the network part must be performed in a separate thread, since all network calls are blocked.

Synchronous client

The synchronous client, as you expected, does everything sequentially; connects to the server, logs on to it, and then performs a communication cycle, namely fall asleep, make a request, read the server’s response, fall asleep again, and so on.



Since we are doing a synchronous version, this allows us to make some things simpler. First, connect to the server; let's do it in a loop, for example, like this:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void run_client(const std::string & client_name)
{
	talk_to_svr client(client_name);
	try
	{
		client.connect(ep);
		client.loop();
	}
	catch(boost::system::system_error & err)
	{
		std::cout << "client terminated " << std::endl;
	}
}

The following example is a class talk_to_svr :

struct talk_to_svr
{
	talk_to_svr(const std::string & username): sock_(service), started_(true), username_(username) {}
	void connect(ip::tcp::endpoint ep)
	{
		sock_.connect(ep);
	}
	void loop()
	{
		write("login " + username_ + "\n");
		read_answer();
		while ( started_)
		{
			write_request();
			read_answer();
			boost::this_thread::sleep(millisec(rand() % 7000));
		}
	}
	std::string username() const { return username_; }
	...
private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	int already_read_;
	char buff_[max_msg];
	bool started_;
	std::string username_;
};

In the loop, we just respond, read the response from the server and fall asleep. We fall asleep for an indefinite time (sometimes more than 5 seconds), so at some point the server will turn us off:

void write_request()
{
	write("ping\n");
}
void read_answer()
{
	already_read_ = 0;
	read(sock_, buffer(buff_),
	boost::bind(&talk_to_svr::read_complete, this, _1, _2));
	process_msg();
}
void process_msg()
{
	std::string msg(buff_, already_read_);
	if ( msg.find("login ") == 0) on_login();
	else if ( msg.find("ping") == 0) on_ping(msg);
	else if ( msg.find("clients ") == 0) on_clients(msg);
	else std::cerr << "invalid msg " << msg << std::endl;
}

To read the answer, we use read_complete (which was discussed a lot in the last chapter) to make sure that we read up to the character '\ n'. The logic is in a function process_msg() where we read the client’s response and direct it to the correct function:

void on_login() { do_ask_clients(); }
void on_ping(const std::string & msg)
{
	std::istringstream in(msg);
	std::string answer;
	in >> answer >> answer;
	if ( answer == "client_list_changed")
	do_ask_clients();
}
void on_clients(const std::string & msg)
{
	std::string clients = msg.substr(8);
	std::cout << username_ << ", new client list:" << clients;
}
void do_ask_clients()
{
	write("ask_clients\n");
	read_answer();
}
void write(const std::string & msg) { sock_.write_some(buffer(msg)); }
size_t read_complete(const boost::system::error_code & err, size_t bytes)
{
	// ... same as before
}

When reading the response from the server in our ping, if we receive client_list_changed , then we again make a request to receive a list of clients.

Synchronous server

The synchronous server is also quite simple. It needs two threads, one to listen to new clients, the other to handle existing ones. It cannot use one thread, waiting for a new client is a blocking operation, so we need an additional thread to process existing clients.



As expected, writing a server is a bit more complicated than writing a client. On the one hand, it must manage all connected clients. Since we are writing a synchronous version of the server, we need at least two threads, one of which accepts new clients (as a accept() blocking operation), and the other is responsible for existing ones:

void accept_thread() 
{
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
while ( true) 
{
	client_ptr new_( new talk_to_client);
	acceptor.accept(new_->sock());
	boost::recursive_mutex::scoped_lock lk(cs);
	clients.push_back(new_);
}
}
void handle_clients_thread() 
{
	while ( true) 
	{
		boost::this_thread::sleep( millisec(1));
		boost::recursive_mutex::scoped_lock lk(cs);
		for(array::iterator b = clients.begin(),e = clients.end(); b != e; ++b) 
		(*b)->answer_to_client();
		// erase clients that timed out
		clients.erase(std::remove_if(clients.begin(), clients.end(), 
		boost::bind(&talk_to_client::timed_out,_1)), clients.end());
	}
}
int main(int argc, char* argv[]) 
{
	boost::thread_group threads;
	threads.create_thread(accept_thread);
	threads.create_thread(handle_clients_thread);
	threads.join_all();
}

We need a list of customers to process incoming requests from them.
Each instance talk_to_client has a socket. It does not have a copy constructor, so if you want to cram it into std::vector , then you need to have a shared pointer on it. There are two ways to do this: either talk_to_client add a shared pointer to the socket inside talk_to_client , and then make an array of instances talk_to_client or when there is an instance of talk_to_client with a socket by value and add an array of shared pointers to talk_to_client. I chose the latter, but you can go the other way:

typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;
boost::recursive_mutex cs; // thread-safe access to clients array

The main code is talk_to_client as follows:

struct talk_to_client : boost::enable_shared_from_this<talk_to_client> 
{
	talk_to_client() { ... } 
	std::string username() const { return username_; }
	void answer_to_client() 
	{
	try 
	{
		read_request();
		process_request();
	} 
	catch ( boost::system::system_error&) 
	{
		stop();
	}
	if ( timed_out()) 
		stop();
	}
	void set_clients_changed() { clients_changed_ = true; }
	ip::tcp::socket & sock() { return sock_; }
	bool timed_out() const 
	{
		ptime now = microsec_clock::local_time();
		long long ms = (now - last_ping).total_milliseconds();
		return ms > 5000 ;
	}
	void stop() 
	{
		boost::system::error_code err; sock_.close(err);
	}
	void read_request() 
	{
		if ( sock_.available())
			already_read_ += sock_.read_some(
		buffer(buff_ + already_read_, max_msg - already_read_));
	}
	...
private:
	// ... same as in Synchronous Client
	bool clients_changed_;
	ptime last_ping;
};

The code above is pretty obvious. The most important function is this read_request() . Reading will occur only if there is data, so the server will never be blocked:

void process_request() 
{
	bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_;
	if ( !found_enter)
		return; // message is not full
	// process the msg
	last_ping = microsec_clock::local_time();
	size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_;
	std::string msg(buff_, pos);
	std::copy(buff_ + already_read_, buff_ + max_msg, buff_);
	already_read_ -= pos + 1;
	if ( msg.find("login ") == 0) on_login(msg);
	else if ( msg.find("ping") == 0) on_ping();
	else if ( msg.find("ask_clients") == 0) on_clients();
	else std::cerr << "invalid msg " << msg << std::endl;
}
void on_login(const std::string & msg) 
{
	std::istringstream in(msg);
	in >> username_ >> username_;
	write("login ok\n");
	update_clients_changed();
}
void on_ping() 
{
	write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
	clients_changed_ = false;
}
void on_clients() 
{
	std::string msg;
	{ 
		boost::recursive_mutex::scoped_lock lk(cs);
		for( array::const_iterator b = clients.begin(), e = clients.end() ;b != e; ++b)
			msg += (*b)->username() + " "; 
	}
	write("clients " + msg + "\n");
}
void write(const std::string & msg) { sock_.write_some(buffer(msg)); }

Take a look at process_request() . After we read the data that was available, we must check whether we read the message to the end (if so, it will be found_enteris set to true). If this is the case, then we protect ourselves from reading, maybe more than one message (nothing will be saved to the buffer after the '\ n' character), and then we interpret the completely read message. The rest of the code is pretty simple.

Asynchronous Server / Client


And now for the most interesting (and difficult) part, let's go asynchronously.

Asynchronous client

Things will now be considered a little more complicated, but certainly manageable. And we will have an application that does not block.



You should already understand the following code:

#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable 
{
	typedef talk_to_svr self_type;
	talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username), timer_(service) {}
	void start(ip::tcp::endpoint ep) 
	{
		sock_.async_connect(ep, MEM_FN1(on_connect,_1));
	}
public:
	typedef boost::system::error_code error_code;
	typedef boost::shared_ptr<talk_to_svr> ptr;
	static ptr start(ip::tcp::endpoint ep, const std::string & username) 
	{
		ptr new_(new talk_to_svr(username));
		new_->start(ep);
		return new_;
	}
	void stop() 
	{
		if ( !started_) return;
		started_ = false;
		sock_.close();
	}
	bool started() { return started_; }
	...
private:
	size_t read_complete(const boost::system::error_code & err, size_t bytes) 
	{
		if ( err) return 0;
		bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes;
		return found ? 0 : 1;
	}
private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	char read_buffer_[max_msg];
	char write_buffer_[max_msg];
	bool started_;
	std::string username_;
	deadline_timer timer_;
};

You will see an additional timer function deadline_timer for pinging a server; and again, we will check the connection with the server at a random point in time.
Well, now let's see what the main logic of the class looks like:

void on_connect(const error_code & err) 
{
	if ( !err) do_write("login " + username_ + "\n");
	else stop();
}
void on_read(const error_code & err, size_t bytes) 
{
	if ( err) stop();
	if ( !started() ) return;
	// process the msg
	std::string msg(read_buffer_, bytes);
	if ( msg.find("login ") == 0) on_login();
	else if ( msg.find("ping") == 0) on_ping(msg);
	else if ( msg.find("clients ") == 0) on_clients(msg);
}
void on_login() 
{
	do_ask_clients();
}
void on_ping(const std::string & msg) 
{
	std::istringstream in(msg);
	std::string answer;
	in >> answer >> answer;
	if ( answer == "client_list_changed") do_ask_clients();
	else postpone_ping();
}
void on_clients(const std::string & msg) 
{
	std::string clients = msg.substr(8);
	std::cout << username_ << ", new client list:" << clients ;
	postpone_ping();
}

In on_read() the first two lines of the code, everything is done very beautifully. In the first line we check if there is an error, if so, then stop. In the second line, we check whether we stopped (before that or just), if so, we return. Otherwise, if all is well, we process the incoming message.
Finally, the functions are do_* as follows:

void do_ping() { do_write("ping\n"); }
void postpone_ping() 
{
	timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
	timer_.async_wait( MEM_FN(do_ping));
}
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() 
{
	async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) 
{
	if ( !started() ) return;
	std::copy(msg.begin(), msg.end(), write_buffer_);
	sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}

Note that each operation read causes ping:
  • When the operation is read completed, it is called on_read()
  • on_read() redirected to on_login(), on_ping() , or on_clients()
  • Each of the functions either postpones ping or requests clients
  • If we ask clients when the read operation received them, it will delay ping


Asynchronous server

The scheme is quite complicated, you see that four arrows go to Boost.Asio on_accept, on_read, on_write and on_check_ping . Basically, this means that you will never recognize by a call which of these asynchronous operations everything will end, but you know for sure that this will be one of them.



So, we work asynchronously, therefore we can work in one thread. Reception of clients is the easiest part, as shown in the following code fragment:

ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
void handle_accept(talk_to_client::ptr client, const error_code & err) 
{
	client->start();
	talk_to_client::ptr new_client = talk_to_client::new_();
	acceptor.async_accept(new_client->sock(), 
	boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) 
{
	talk_to_client::ptr client = talk_to_client::new_();
	acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
	service.run();
}

The code above will always wait for new clients asynchronously (each new client connection will cause another asynchronous wait).
We must monitor the event client list changed (a new client connected or one of the clients received a list and disconnected) and notify the rest of the clients when this happens. Thus, we must store an array of clients, otherwise there would be no need for this array if you would not want to know all the connected clients at a given time:

class talk_to_client; typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;

The skeleton of the class is connection as follows:

class talk_to_client : public boost::enable_shared_from_this<talk_to_
client>, boost::noncopyable 
{
	talk_to_client() { ... }
public:
	typedef boost::system::error_code error_code;
	typedef boost::shared_ptr<talk_to_client> ptr;
	void start() 
	{
		started_ = true;
		clients.push_back( shared_from_this());
		last_ping = boost::posix_time::microsec_clock::local_time();
		do_read(); // first, we wait for client to login
	}
	static ptr new_() { ptr new_(new talk_to_client); return new_; }
	void stop() 
	{
		if ( !started_) return;
		started_ = false;
		sock_.close();
		ptr self = shared_from_this();
		array::iterator it = std::find(clients.begin(), clients.end(), self);
		clients.erase(it);
		update_clients_changed();
	}
	bool started() const { return started_; }
	ip::tcp::socket & sock() { return sock_;}
	std::string username() const { return username_; }
	void set_clients_changed() { clients_changed_ = true; }
	...
private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	char read_buffer_[max_msg];
	char write_buffer_[max_msg];
	bool started_;
	std::string username_;
	deadline_timer timer_;
	boost::posix_time::ptime last_ping;
	bool clients_changed_;
};

I am calling talk_to_server from talk_to_client or talk_to_server out of class connection to clarify what I am saying.
We will have to use the previous code now; it is similar to what we used for the client application. We have an additional function stop() that removes the connected client from the array of clients.
The server continuously waits for asynchronous read operations:

void on_read(const error_code & err, size_t bytes) 
{
	if ( err) stop();
	if ( !started() ) return;
	std::string msg(read_buffer_, bytes);
	if ( msg.find("login ") == 0) on_login(msg);
	else if ( msg.find("ping") == 0) on_ping();
	else if ( msg.find("ask_clients") == 0) on_clients();
}
void on_login(const std::string & msg) 
{
	std::istringstream in(msg);
	in >> username_ >> username_;
	do_write("login ok\n");
	update_clients_changed();
}
void on_ping() 
{
	do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
	clients_changed_ = false;
}
void on_clients() 
{
	std::string msg;
	for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b)
	msg += (*b)->username() + " ";
	do_write("clients " + msg + "\n");
}

The code is pretty simple; One thing is that when a new customer enters the system, we call update_clients_changed() , which sets clients_changed_ in true for all customers.
As soon as he receives a request, he immediately answers it, as shown in the following code fragment:

void do_ping() { do_write("ping\n"); }
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() 
{
	async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
	post_check_ping();
}
void do_write(const std::string & msg) 
{
	if ( !started() ) return;
	std::copy(msg.begin(), msg.end(), write_buffer_);
	sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) 
{
	// ... as before
}

At the end of each write operation, it is called on_write() , which causes another asynchronous read, and thus waits for a request - answers it, the cycle continues until the client disconnects or the timer goes off.
Since each reading starts with an asynchronous wait for 5 seconds, you can see if the client will work a timer. If so, then we close the connection:

void on_check_ping() 
{
	ptime now = microsec_clock::local_time();
	if ( (now - last_ping).total_milliseconds() > 5000) stop();
	last_ping = boost::posix_time::microsec_clock::local_time();
}
void post_check_ping() 
{
	timer_.expires_from_now(boost::posix_time::millisec(5000));
	timer_.async_wait( MEM_FN(on_check_ping));
}

That's the whole server. You can start it and start working with it!

Summary


In this chapter, we looked at how to write some basic client / server applications. We avoided traps such as memory leaks and deadlocks. All programs are suitable as a basis for your future applications, they can be expanded and adapted.
In the next chapter, we will gain a deeper understanding of synchronous versus asynchronous, differences when using Boost.Asio; see how you can connect your own asynchronous operation.

Resources for this article: link
Thank you all for your attention, see you soon!