Boost.Asio C++ 网络编程:TCP回显客户端/服务端
扫描二维码
随时随地手机看文章
回显就是服务端将接收到的任何内容回发给客户端显示,然后关闭客户端的连接。这个服务端可以处理任何数量的客户端。每个客户端连接之后发送一个消息,服务端接收到消息后把它发送回去。在那之后,服务端关闭连接。具体流程如下图所示。
对于TCP而言,我们需要一个额外的保证:每一个消息以换行符结束(‘n’)。编写一个同步回显服务端/客户端非常简单。下面我们分别实现同步客户端,同步服务端,异步客户端和异步服务端。
一.TCP同步客户端
#ifdef WIN32 #define _WIN32_WINNT 0x0501 #include#endif #include#include#include#include#includeusing namespace boost::asio; using boost::system::error_code; io_service service; size_t read_complete(char * buf, const error_code & err, size_t bytes) { if (err) return 0; bool found = std::find(buf, buf + bytes, 'n') < buf + bytes; // 一个一个字符的读取,直到回车, 不缓存 return found ? 0 : 1; } ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), 8001); void sync_echo(std::string msg) { msg += "n"; ip::tcp::socket sock(service); sock.connect(ep); sock.write_some(buffer(msg)); char buf[1024]; int bytes = read(sock, buffer(buf), boost::bind(read_complete, buf, _1, _2)); std::string copy(buf, bytes - 1); msg = msg.substr(0, msg.size() - 1); std::cout << "server echoed our " << msg << ": " << (copy == msg ? "OK" : "FAIL") << std::endl; sock.close(); } int main(int argc, char* argv[]) { // 连接多个客户端 char* messages[] = { "Can", "ge", "ge", "blog!", 0 }; boost::thread_group threads; for (char ** message = messages; *message; ++message) { threads.create_thread(boost::bind(sync_echo, *message)); boost::this_thread::sleep(boost::posix_time::millisec(100)); } threads.join_all(); system("pause"); }
你会发现,在读取时,我使用了自由函数(不属于socket类,属于命名空间asio)read(),因为我想要读‘n’之前的所有内容。sock.read_some()方法满足不了这个要求,因为它只会读可用的,不一定是整个的消息。
read(stream, buffer [, completion])这个方法同步地从一个流中读取数据。你可以选择指定一个完成处理方法。完成处理方法会在每次read操作调用成功之后调用,然后告诉read操作是否完成(如果没有完成,它会继续读取)。它的格式是:size_t completion(const boost::system::error_code& err, size_t bytes_transfered) 。当这个完成处理方法返回0时,我们认为read操作完成;如果它返回一个非0值,它表示了下一次sock.read_some操作需要从流中读取的字节数。
read_complete一个个的读取字符,直到回车,这是通过std::find方法控制的,std::find的行为大概如下。
templateInputIterator find (InputIterator first, InputIterator last, const T& val) { while (first!=last) { if (*first==val) return first; ++first; } return last; }
结合到上面客户端代码就是,如果没有找到回车'n',std::find始终返回buf+bytes,否则返回'n'的地址,也就是buf+bytes-1,此时'n'是已读取内容的最后一个字符。
注意:因为我们是同步的,所以不需要调用service.run()。
二.TCP同步服务端
#ifdef WIN32 #define _WIN32_WINNT 0x0501 #include#endif #include#include#include#includeusing namespace boost::asio; using namespace boost::posix_time; using boost::system::error_code; io_service service; size_t read_complete(char * buff, const error_code & err, size_t bytes) { if ( err) return 0; bool found = std::find(buff, buff + bytes, 'n') < buff + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; } void handle_connections() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001)); char buff[1024]; while ( true) { ip::tcp::socket sock(service); acceptor.accept(sock); int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2)); std::string msg(buff, bytes); sock.write_some(buffer(msg)); sock.close(); } } int main(int argc, char* argv[]) { handle_connections(); }
服务端的逻辑主要在handle_connections()。因为是单线程,它接受一个客户端请求,读取客户端发送的消息,然后回发给客户端,接着等待下一个连接。可以确定,当两个客户端同时连接时,第二个客户端需要等待服务端处理完第一个客户端的请求。
还是要注意因为我们是同步的,所以不需要调用service.run()。
下面是客户端回显的结果,当然要先启动服务端。
三.TCP异步客户端
#ifdef WIN32 #define _WIN32_WINNT 0x0501 #include#endif #include#include#include#include#includeusing namespace boost::asio; io_service service; #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z) class talk_to_svr : public boost::enable_shared_from_this , boost::noncopyable { typedef talk_to_svr self_type; talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN1(on_connect, _1)); } public: typedef boost::system::error_code error_code; typedef boost::shared_ptrptr; static ptr start(ip::tcp::endpoint ep, const std::string & message) { ptr new_(new talk_to_svr(message)); new_->start(ep); return new_; } void stop() { if (!started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { if (!err) do_write(message_ + "n"); else stop(); } void on_read(const error_code & err, size_t bytes) { if (!err) { std::string copy(read_buffer_, bytes - 1); std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl; } stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete, _1, _2), MEM_FN2(on_read, _1, _2)); } void do_write(const std::string & msg) { if (!started()) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), MEM_FN2(on_write, _1, _2)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { if (err) return 0; bool found = std::find(read_buffer_, read_buffer_ + bytes, 'n') < read_buffer_ + bytes; return found ? 0 : 1; } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string message_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), 8001); char* messages[] = { "Can", "ge", "ge", "blog", 0 }; for (char ** message = messages; *message; ++message) { talk_to_svr::start(ep, *message); boost::this_thread::sleep(boost::posix_time::millisec(100)); } service.run(); system("pause"); }
四.TCP异步服务端
#ifdef WIN32 #define _WIN32_WINNT 0x0501 #include#endif #include#include#include#includeusing namespace boost::asio; using namespace boost::posix_time; io_service service; #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z) class talk_to_client : public boost::enable_shared_from_this, boost::noncopyable { typedef talk_to_client self_type; talk_to_client() : sock_(service), started_(false) {} public: typedef boost::system::error_code error_code; typedef boost::shared_ptrptr; void start() { started_ = true; do_read(); } static ptr new_() { ptr new_(new talk_to_client); return new_; } void stop() { if (!started_) return; started_ = false; sock_.close(); } ip::tcp::socket & sock() { return sock_; } private: void on_read(const error_code & err, size_t bytes) { if (!err) { std::string msg(read_buffer_, bytes); // echo message back, and then stop do_write(msg + "n"); } stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete, _1, _2), MEM_FN2(on_read, _1, _2)); } void do_write(const std::string & msg) { std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), MEM_FN2(on_write, _1, _2)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { if (err) return 0; bool found = std::find(read_buffer_, read_buffer_ + bytes, 'n') < read_buffer_ + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); void handle_accept(talk_to_client::ptr client, const boost::system::error_code & err) { client->start(); talk_to_client::ptr new_client = talk_to_client::new_(); acceptor.async_accept(new_client->sock(), boost::bind(handle_accept, new_client, _1)); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acceptor.async_accept(client->sock(), boost::bind(handle_accept, client, _1)); service.run(); }
TCP异步客户端和异步服务端的关键是enable_shared_from_this模板类的使用,关于enable_shared_from_this详见:C++11新特性之十一:enable_shared_from_this,C++11和boost的enable_shared_from_this功能和原理一样。
客户端回显结果和同步时的一样,如下: