diff --git a/net/net_base.cpp b/net/net_base.cpp index f36a91f..d6e691a 100644 --- a/net/net_base.cpp +++ b/net/net_base.cpp @@ -1,91 +1,5 @@ #include "net_base.h" -CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) - : io_context_(io_context), logger_(logger), acceptor_(io_context) -{ -} -CTcpServer::~CTcpServer() -{ -} - -bool CTcpServer::Start(unsigned short port) -{ - asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); - try { - acceptor_.open(endpoint.protocol()); - acceptor_.set_option(asio::socket_base::reuse_address(true)); - acceptor_.bind(endpoint); - acceptor_.listen(); - } catch (const asio::system_error& e) { - logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); - return false; - } - - Accept(); - logger_->info("Server started on port {}", port); - return true; -} - -void CTcpServer::Stop() -{ - acceptor_.close(); - std::lock_guard lock(mutex_); - for (auto& [key, thread] : client_threads_) { - if (thread.joinable()) { - thread.join(); - } - } - client_threads_.clear(); -} - -void CTcpServer::Accept() -{ - auto socket = std::make_shared(io_context_); - acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { - if (!error) { - auto endpoint = socket->remote_endpoint(); - std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); - logger_->info("New connection from {}", client_key); - - { - std::lock_guard lock(mutex_); - client_map_[client_key] = 0; // Initial value as 0 - } - - client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key); - } - Accept(); - }); -} - -void CTcpServer::HandleClient(std::shared_ptr socket, const std::string& client_key) -{ - try { - char data[1024]; - while (true) { - asio::error_code error; - size_t length = socket->read_some(asio::buffer(data), error); - if (error == asio::error::eof) { - logger_->info("Connection closed by client: {}", client_key); - break; - } else if (error) { - throw asio::system_error(error); - } - auto relen = socket->send(asio::buffer(data, length)); - logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen); - } - } catch (std::exception& e) { - logger_->error("Error with client {}: {}", client_key, e.what()); - } - - std::lock_guard lock(mutex_); - client_map_.erase(client_key); - if (client_threads_.find(client_key) != client_threads_.end()) { - client_threads_.at(client_key).detach(); - client_threads_.erase(client_key); - } -} - CTcpClient::CTcpClient(asio::io_context& io_context, const std::shared_ptr& logger) : logger_(logger), io_context_(io_context), socket_(io_context_) { @@ -142,7 +56,7 @@ void CTcpClient::async_recv() { auto self(shared_from_this()); socket_.async_read_some(asio::buffer(tmp_buf_), [this, self](std::error_code ec, std::size_t length) { - logger_->debug("{} {}", __FUNCTION__, ec.message()); + // logger_->debug("{} {}", __FUNCTION__, ec.message()); if (!ec) { std::lock_guard lock(mutex_); buffer_.push(tmp_buf_.data(), length); diff --git a/net/net_base.h b/net/net_base.h index fef0cff..a46d045 100644 --- a/net/net_base.h +++ b/net/net_base.h @@ -2,35 +2,10 @@ #include "util.h" #include -#include #include #include using namespace ofen; -using ExFun_t = std::function; -class CTcpServer -{ -public: - CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger); - ~CTcpServer(); - -public: - bool Start(unsigned short port); - void Stop(); - -private: - void Accept(); - void HandleClient(std::shared_ptr socket, const std::string& client_key); - -private: - asio::io_context& io_context_; - asio::ip::tcp::acceptor acceptor_; - std::shared_ptr logger_; - std::map client_map_; - std::map client_threads_; - std::mutex mutex_; -}; - class CTcpClient : public std::enable_shared_from_this { public: diff --git a/server/main.cpp b/server/main.cpp index 7cb0076..fa63176 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -5,7 +5,11 @@ std::shared_ptr g_Logger = nullptr; int main() { g_Logger = get_logger("server", "server.log"); - CServer server(g_Logger); - server.run(); + asio::io_context io_context; + CTcpServer server(io_context, g_Logger); + if (!server.Start(8080)) { + return -1; + } + io_context.run(); return 0; } \ No newline at end of file diff --git a/server/server.cpp b/server/server.cpp index 1599a46..22155ff 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -1,24 +1,103 @@ #include "server.h" -CServer::CServer(const std::shared_ptr& logger) - : logger_(logger) +CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) + : io_context_(io_context), logger_(logger), acceptor_(io_context) { - server_ = std::make_shared(io_context_, logger); } - -CServer::~CServer() +CTcpServer::~CTcpServer() { } -void CServer::run() +bool CTcpServer::Start(unsigned short port) { - if (!server_->Start(8080)) { + asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); + try { + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(asio::socket_base::reuse_address(true)); + acceptor_.bind(endpoint); + acceptor_.listen(); + } catch (const asio::system_error& e) { + logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); + return false; + } + + Accept(); + logger_->info("Server started on port {}", port); + return true; +} + +void CTcpServer::Stop() +{ + acceptor_.close(); + std::lock_guard lock(mutex_); + for (auto& [key, thread] : client_threads_) { + if (thread.joinable()) { + thread.join(); + } + } + client_threads_.clear(); +} + +void CTcpServer::handle_data(CFrameBuffer* buf) +{ + if (buf == nullptr) { + logger_->error("{} nullptr.", __FUNCTION__); return; } - io_context_.run(); } -bool CServer::get_task_list() +void CTcpServer::Accept() { - return false; + auto socket = std::make_shared(io_context_); + acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { + if (!error) { + auto endpoint = socket->remote_endpoint(); + std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); + logger_->info("New connection from {}", client_key); + + { + std::lock_guard lock(mutex_); + client_map_[client_key] = std::make_shared(); + } + + client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key); + } + Accept(); + }); } + +void CTcpServer::HandleClient(std::shared_ptr socket, const std::string& client_key) +{ + try { + while (true) { + asio::error_code error; + size_t length = socket->read_some(asio::buffer(client_map_[client_key]->tmp_buf_), error); + if (error == asio::error::eof) { + logger_->info("Connection closed by client: {}", client_key); + break; + } else if (error) { + throw asio::system_error(error); + } + + client_map_[client_key]->buffer_.push(client_map_[client_key]->tmp_buf_.data(), length); + auto* frame = CTransProtocal::parse(client_map_[client_key]->buffer_); + frame->id_ = client_key; + if (frame) { + handle_data(frame); + delete frame; + } + + // auto relen = socket->send(asio::buffer(data, length)); + // logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen); + } + } catch (std::exception& e) { + logger_->error("Error with client {}: {}", client_key, e.what()); + } + + std::lock_guard lock(mutex_); + client_map_.erase(client_key); + if (client_threads_.find(client_key) != client_threads_.end()) { + client_threads_.at(client_key).detach(); + client_threads_.erase(client_key); + } +} \ No newline at end of file diff --git a/server/server.h b/server/server.h index 363603c..cc4c492 100644 --- a/server/server.h +++ b/server/server.h @@ -1,24 +1,36 @@ #pragma once #include +#include #include #include -#include -class CServer +struct ClientCache { + CMutBuffer buffer_; + std::array tmp_buf_; +}; + +class CTcpServer { public: - CServer(const std::shared_ptr& logger); - ~CServer(); + CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger); + ~CTcpServer(); public: - void run(); - -public: - bool get_task_list(); + bool Start(unsigned short port); + void Stop(); private: + void handle_data(CFrameBuffer* buf); + +private: + void Accept(); + void HandleClient(std::shared_ptr socket, const std::string& client_key); + +private: + asio::io_context& io_context_; + asio::ip::tcp::acceptor acceptor_; std::shared_ptr logger_; - asio::io_context io_context_; - std::shared_ptr server_; - std::vector supported_; + std::map> client_map_; + std::map client_threads_; + std::mutex mutex_; }; \ No newline at end of file diff --git a/util/util.h b/util/util.h index d34b4b1..9f2e79c 100644 --- a/util/util.h +++ b/util/util.h @@ -1,6 +1,7 @@ #pragma once #include "of_util.h" #include +#include #include #include #include @@ -14,6 +15,8 @@ class CFrameBuffer public: CFrameBuffer(); ~CFrameBuffer(); +public: + std::string id_{}; public: int16_t type_{}; @@ -21,7 +24,7 @@ public: int len_{}; char mark_{}; }; - +using ExFun_t = std::function; /* 【 transm TCP 数据协议 】 header 2 char: 0xFF 0xFE