//chat_message.h #ifndef CHAT_MESSAGE_HPP #define CHAT_MESSAGE_HPP #include "Header.h" #include
#include
#include
#include
#include
#include "stdafx.h" class chat_message { public: enum { header_length = sizeof(Header) }; enum { max_body_length = 512 }; chat_message(){} const char *data() const { return data_; } char *data() { return data_; } std::size_t length() const { return header_length + m_header.bodySize; } const char *body() const { return data_ + header_length; } char *body() { return data_ + header_length; } int type() const { return m_header.type; } std::size_t body_length() const { return m_header.bodySize; } void setMessage(int messageType, const void *buffer, size_t bufferSize) { assert(bufferSize <= max_body_length); m_header.bodySize = bufferSize; m_header.type = messageType; std::memcpy(body(), buffer, bufferSize); std::memcpy(data(), &m_header, sizeof(m_header)); } void setMessage(int messageType, const std::string& buffer) { setMessage(messageType, buffer.data(), buffer.size()); } bool decode_header() { std::memcpy(&m_header, data(), header_length); if (m_header.bodySize > max_body_length) { std::cout << "body size " << m_header.bodySize << " " << m_header.type << std::endl; return false; } return true; } private: char data_[header_length + max_body_length]; Header m_header; }; #endif
//Header.h #ifndef FND_STRUCT_HEADER_H #define FND_STRUCT_HEADER_H #include
struct Header{ int bodySize; int type; }; enum MessageType { MT_BIND_NAME = 1, MT_CHAT_INFO = 2, MT_ROOM_INFO = 3, }; struct BindName{ char name[32]; int nameLen; }; struct ChatInformation { char information[256]; int infoLen; }; struct RoomInformation { BindName name; ChatInformation chat; }; bool parseMessage(const std::string &input, int *type, std::string &outbuffer); #endif //Header.cpp #include "stdafx.h" #include "Header.h" #include
#include
#include
bool parseMessage(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; auto command = input.substr(0, pos); if (command == "BindName") { std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; BindName bindInfo; bindInfo.nameLen = name.size(); std::memcpy(&(bindInfo.name), name.data(), name.size()); auto buffer = reinterpret_cast
(&bindInfo); outbuffer.assign(buffer, buffer + sizeof(bindInfo)); return true; } else if (command == "Chat") { std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; ChatInformation info; info.infoLen = chat.size(); std::memcpy(&(info.information), chat.data(), chat.size()); auto buffer = reinterpret_cast
(&info); outbuffer.assign(buffer, buffer + sizeof(info)); if (type) *type = MT_CHAT_INFO; return true; } return false; }
//chat.cpp #include "chat_message.h" #include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "stdafx.h" using boost::asio::ip::tcp; typedef std::deque
chat_message_queue; std::chrono::system_clock::time_point base; class chat_session; typedef std::shared_ptr
chat_session_ptr; class chat_room { public: chat_room(boost::asio::io_service& io_service) : m_strand(io_service) {} public: void join(chat_session_ptr); void leave(chat_session_ptr); void deliver(const chat_message&); private: boost::asio::io_service::strand m_strand;//strand提供串行执行, 能够保证线程安全, 同时被post或dispatch的方法, 不会被并发的执行. std::set
participants_; enum { max_recent_msgs = 100 }; chat_message_queue recent_msgs_; }; class chat_session : public std::enable_shared_from_this
{ public: chat_session(tcp::socket socket, chat_room &room) : socket_(std::move(socket)),room_(room), m_strand(socket_.get_io_service()){} void start() { room_.join(shared_from_this()); //加入房间 do_read_header();//读取数据包头 } void deliver(const chat_message &msg){ m_strand.post([this,msg]{ bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { do_write(); } }); } private: void do_read_header(){ auto self(shared_from_this()); boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.data(),chat_message::header_length), //读取头数据8个字节 m_strand.wrap( [this,self](boost::system::error_code ec, std::size_t){ if ( !ec && read_msg_.decode_header()){ do_read_body();//合法的头部的话,进行内容的读取。 } else { std::cout << "player leave the room\n"; room_.leave(shared_from_this()); } })); } void do_read_body() { auto self(shared_from_this()); boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),//内容的读取,async_read相当于读取缓存数据,按长度一次读取大小多少,直到读取完成。 m_strand.wrap( [this,self](boost::system::error_code ec, std::size_t) { if (!ec) { handleMessage(); do_read_header(); } else { room_.leave(shared_from_this()); } })); } void handleMessage() { auto n = std::chrono::system_clock::now() - base; std::cout << "i'm in " << std::this_thread::get_id() << " time " << std::chrono::duration_cast
(n).count() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(3)); if (read_msg_.type() == MT_BIND_NAME) {//设定昵称 const BindName* bind = reinterpret_cast
(read_msg_.body()); m_name.assign(bind->name, bind->name + bind->nameLen); } else if (read_msg_.type() == MT_CHAT_INFO) { const ChatInformation* chat = reinterpret_cast
(read_msg_.body()); m_chatInformation.assign(chat->information, chat->information + chat->infoLen); auto rinfo = buildRoomInfo(); chat_message msg; msg.setMessage(MT_ROOM_INFO,&rinfo, sizeof(rinfo)); room_.deliver(msg); //对当前房间发送消息 } else { } } void do_write() { //发送数据 auto self(shared_from_this()); boost::asio::async_write( socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), m_strand.wrap( [this, self](boost::system::error_code ec, std::size_t) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { room_.leave(shared_from_this()); } })); } tcp::socket socket_; chat_room &room_; chat_message read_msg_; chat_message_queue write_msgs_; std::string m_name; std::string m_chatInformation; boost::asio::io_service::strand m_strand; RoomInformation buildRoomInfo() const { RoomInformation info; info.name.nameLen = m_name.size(); std::memcpy(info.name.name, m_name.data(), m_name.size()); info.chat.infoLen = m_chatInformation.size(); std::memcpy(info.chat.information, m_chatInformation.data(), m_chatInformation.size()); return info; } }; //加入房间 void chat_room::join(chat_session_ptr participant) { m_strand.post([this, participant]{ participants_.insert(participant); for (const auto& msg : recent_msgs_) participant->deliver(msg); //对所有端广播 }); } //离开房间 void chat_room::leave(chat_session_ptr participant) { m_strand.post([this,participant]{ participants_.erase(participant);}); } void chat_room::deliver(const chat_message &msg) { m_strand.post([this, msg]{ recent_msgs_.push_back(msg); while (recent_msgs_.size() > max_recent_msgs) recent_msgs_.pop_front(); for (auto& participant : participants_) participant->deliver(msg); }); } class chat_server { public: chat_server(boost::asio::io_service &io_service, const tcp::endpoint &endpoint) : acceptor_(io_service, endpoint), socket_(io_service), room_(io_service) { do_accept(); } private: void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) { auto session = std::make_shared
(std::move(socket_), room_); session->start();//新客户端开始接收数据包 } do_accept(); //创建新连接进行等待 }); } tcp::acceptor acceptor_; tcp::socket socket_; chat_room room_; }; int _tmain(int argc, char *argv[]) { try { base = std::chrono::system_clock::now(); boost::asio::io_service io_service; std::list
servers; tcp::endpoint endpoint(tcp::v4(), 9000); servers.emplace_back(io_service, endpoint); std::vector
threadGroup; for(int i = 0; i < 5; ++i) { threadGroup.emplace_back([&io_service, i]{ std::cout << i << " name is " << std::this_thread::get_id() << std::endl; io_service.run();}); } io_service.run(); for(auto& v : threadGroup) v.join(); } catch (std::exception &e) { std::cout << "Exception: " << e.what(); } return 0; }
客户端:
//client.cpp #include "chat_message.h" #include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "stdafx.h" using boost::asio::ip::tcp; typedef std::deque
chat_message_queue; class chat_client { public: chat_client(boost::asio::io_service &io_service, tcp::resolver::iterator endpoint_iterator) : io_service_(io_service), socket_(io_service) { do_connect(endpoint_iterator); } void write(const chat_message &msg) { io_service_.post([this, msg]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { do_write(); } }); } void close() { io_service_.post([this]() { socket_.close(); }); } private: void do_connect(tcp::resolver::iterator endpoint_iterator) { boost::asio::async_connect( socket_, endpoint_iterator, [this](boost::system::error_code ec, tcp::resolver::iterator) { if (!ec) { do_read_header(); } }); } void do_read_header() { boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.data(), chat_message::header_length), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec && read_msg_.decode_header()) { do_read_body(); } else { socket_.close(); } }); } void do_read_body() { boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { if (read_msg_.body_length() == sizeof(RoomInformation) && read_msg_.type() == MT_ROOM_INFO) { const RoomInformation *info = reinterpret_cast
(read_msg_.body()); std::cout << "client: '"; assert(info->name.nameLen <= sizeof(info->name.name)); std::cout.write(info->name.name, info->name.nameLen); std::cout << "' says '"; assert(info->chat.infoLen <= sizeof(info->chat.information)); std::cout.write(info->chat.information, info->chat.infoLen); std::cout << "'\n"; } do_read_header(); } else { socket_.close(); } }); } void do_write() { boost::asio::async_write( socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { socket_.close(); } }); } private: boost::asio::io_service &io_service_; tcp::socket socket_; chat_message read_msg_; chat_message_queue write_msgs_; }; int _tmain(int argc, char* argv[]) { try { boost::asio::io_service io_service; tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query query("localhost", "9000"); auto endpoint_iterator = resolver.resolve(query); chat_client c(io_service, endpoint_iterator); std::thread t([&io_service]() { io_service.run(); }); char line[chat_message::max_body_length + 1]; // ctrl-d while (std::cin.getline(line, chat_message::max_body_length + 1)) { chat_message msg; auto type = 0; std::string input(line, line + std::strlen(line)); std::string output; if(parseMessage(input, &type, output)) { msg.setMessage(type, output.data(), output.size()); c.write(msg); std::cout << "write message for server " << output.size() << std::endl; } } c.close(); t.join(); } catch (std::exception &e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }
运行结果:

图片440.png
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/213825.html原文链接:https://javaforall.net
