diff --git a/.vscode/settings.json b/.vscode/settings.json index b7cf94a..f73175e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -121,6 +121,8 @@ "source_location": "cpp", "span": "cpp", "stop_token": "cpp", - "*.ipp": "cpp" + "*.ipp": "cpp", + "queue": "cpp", + "resumable": "cpp" } } \ No newline at end of file diff --git a/README.md b/README.md index e3115aa..6e37fe0 100644 --- a/README.md +++ b/README.md @@ -6,4 +6,36 @@ - `client`和`server`均为命令行端程序,无GUI。 - `client`从`server`下载文件,如果本地有重复则覆盖。 -- 工作方式为`client A`端提交待传输的文件列表到`server`,`client B`端从`server`获取有哪些客户机提交的哪些任务,可以从中下载。 \ No newline at end of file +- 工作方式为`client A`端提交待传输的文件列表到`server`,`client B`端从`server`获取有哪些客户机提交的哪些任务,可以从中下载。 + +## 格式(开发用) + +通讯协议中的`DATA`部分,对于`type`为`1`的类型来讲,统一以下格式(`command`和`param`内容中不能含有`|`): + +**command|param|data** (传输格式) + +**command(空格)param1,param2,param3..** (cmd输入格式) + +### 1.命令 + +`type`:1 + +**GetTaskList:** 获取当前挂载到服务器的任务单。 + +**DownTask:** 下载指定的任务清单,`param`为`GetTaskList`中列出的名称。 + +**UpTask:** 上载任务单,`param`为文件或者文件夹路径,可多个,使用`,`分隔。 + +### 2.数据 + +`type`:2 + +当`mark`为`0`时表示数据的最后一包,其他数据表示非最后一包。 + +`type`: 199 + +特殊标记,表示询问在线客户端。 + +`type`: 198 + +特殊标记,表示询问挂载任务。 \ No newline at end of file diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index eb83093..e771b63 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 3.16) project(transmc LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 17) if (MSVC) add_definitions(-D_WIN32_WINNT=0x0601) diff --git a/client/client.cpp b/client/client.cpp index b7a0df8..1b65936 100644 --- a/client/client.cpp +++ b/client/client.cpp @@ -19,7 +19,7 @@ void CClient::run() logger_->info("{} connect err.", __FUNCTION__); return; } - client_->register_func([&](CFrameBuffer* buf) { handle_data(buf); }); + client_->register_func([&](CFrameBuffer* buf) { handle_frame(buf); }); client_->async_recv(); std::thread thread([&]() { io_context_.run(); }); char line[512]{}; @@ -53,7 +53,7 @@ bool CClient::get_task_list() buf->data_ = new char[512]{}; auto flen = std::snprintf(buf->data_, 512, "%s", gGetTaskList); buf->len_ = flen; - buf->type_ = 1; + buf->type_ = 199; if (!CTransProtocal::pack(buf.get(), &send, len)) { logger_->error("{} pack failed.", __FUNCTION__); return false; @@ -65,7 +65,12 @@ bool CClient::get_task_list() return true; } -void CClient::handle_data(CFrameBuffer* buf) +bool CClient::get_clients() +{ + return false; +} + +void CClient::handle_frame(CFrameBuffer* buf) { if (buf == nullptr) { logger_->error("{} nullptr.", __FUNCTION__); @@ -73,4 +78,8 @@ void CClient::handle_data(CFrameBuffer* buf) } logger_->debug("type: {}", buf->type_); logger_->debug("len: {}", buf->len_); + + if (buf->type_ == 199) { + logger_->debug("data: {}", buf->data_); + } } diff --git a/client/client.h b/client/client.h index afb9600..621cdb6 100644 --- a/client/client.h +++ b/client/client.h @@ -15,9 +15,10 @@ public: public: bool get_task_list(); + bool get_clients(); private: - void handle_data(CFrameBuffer* buf); + void handle_frame(CFrameBuffer* buf); private: std::shared_ptr logger_; diff --git a/net/net_base.cpp b/net/net_base.cpp index d6e691a..95e530f 100644 --- a/net/net_base.cpp +++ b/net/net_base.cpp @@ -57,7 +57,13 @@ void CTcpClient::async_recv() auto self(shared_from_this()); socket_.async_read_some(asio::buffer(tmp_buf_), [this, self](std::error_code ec, std::size_t length) { // logger_->debug("{} {}", __FUNCTION__, ec.message()); - if (!ec) { + if (ec) { + if (ec == asio::error::eof) { + logger_->error("Remote Server Closed."); + return; + } + async_recv(); + } else { std::lock_guard lock(mutex_); buffer_.push(tmp_buf_.data(), length); auto* frame = CTransProtocal::parse(buffer_); diff --git a/ofen b/ofen index 1427bf4..44fb416 160000 --- a/ofen +++ b/ofen @@ -1 +1 @@ -Subproject commit 1427bf4bca2c0f52429fe751c4c205acfd78b326 +Subproject commit 44fb416ca166aa9c4ce270ff71bea6b3df703d6e diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 315e6f0..b3689b3 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 3.16) project(transms LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 17) if (MSVC) add_compile_options(/source-charset:utf-8) diff --git a/server/main.cpp b/server/main.cpp index fa63176..3076384 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -7,7 +7,7 @@ int main() g_Logger = get_logger("server", "server.log"); asio::io_context io_context; CTcpServer server(io_context, g_Logger); - if (!server.Start(8080)) { + if (!server.start(8080)) { return -1; } io_context.run(); diff --git a/server/server.cpp b/server/server.cpp index 22155ff..343cd28 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -3,12 +3,22 @@ CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) : io_context_(io_context), logger_(logger), acceptor_(io_context) { + th_run_ = true; + handle_pool_ = std::make_shared(1); + send_pool_ = std::make_shared(1); + handle_pool_->init(); + send_pool_->init(); + handle_pool_->submit([&]() { handle_frame(); }); + send_pool_->submit([&]() { send_simple_buf(); }); } CTcpServer::~CTcpServer() { + th_run_ = false; + handle_pool_->close_wait_all(); + send_pool_->close_wait_all(); } -bool CTcpServer::Start(unsigned short port) +bool CTcpServer::start(unsigned short port) { asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { @@ -20,16 +30,15 @@ bool CTcpServer::Start(unsigned short port) logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } - - Accept(); + accept_client(); logger_->info("Server started on port {}", port); return true; } -void CTcpServer::Stop() +void CTcpServer::stop() { acceptor_.close(); - std::lock_guard lock(mutex_); + std::lock_guard lock(cli_mut_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); @@ -38,15 +47,110 @@ void CTcpServer::Stop() client_threads_.clear(); } -void CTcpServer::handle_data(CFrameBuffer* buf) +std::vector CTcpServer::get_clients() { - if (buf == nullptr) { - logger_->error("{} nullptr.", __FUNCTION__); - return; + std::vector result; + std::lock_guard lock(cli_mut_); + for (const auto& item : client_map_) { + result.push_back(item.first); + } + return result; +} + +SimpleBuffer* CTcpServer::get_client_list() +{ + CFrameBuffer* buf = new CFrameBuffer(); + buf->type_ = 199; + + auto vec = get_clients(); + std::string msg; + for (const auto& item : vec) { + if (msg.empty()) { + msg.append(item); + } else { + msg.append("|" + item); + } + } + buf->data_ = new char[msg.size() + 1]; + buf->len_ = static_cast(msg.size() + 1); + std::snprintf(buf->data_, buf->len_, "%s", msg.data()); + + SimpleBuffer* sbuf = new SimpleBuffer(); + if (!CTransProtocal::pack(buf, &sbuf->data_, sbuf->len_)) { + logger_->error("{} pack faile.", __FUNCTION__); + delete sbuf; + return nullptr; + } + return sbuf; +} + +bool CTcpServer::push_frame(CFrameBuffer* buf) +{ + std::lock_guard lock(buf_mut_); + cache_.push(buf); + return true; +} + +void CTcpServer::handle_frame() +{ + CFrameBuffer* buf = nullptr; + while (th_run_) { + { + std::lock_guard lock(buf_mut_); + if (cache_.size() > 0) { + buf = cache_.front(); + cache_.pop(); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + } + + // 拿到该包后,要看转发给谁或者处理 + if (buf->type_ == 199) { // 询问在线客户端 + auto* sbuf = get_client_list(); + if (sbuf == nullptr) { + continue; + } + sbuf->id_ = buf->id_; + std::lock_guard lock(sbuf_mut_); + scache_.push(sbuf); + continue; + } + delete buf; } } -void CTcpServer::Accept() +void CTcpServer::send_simple_buf() +{ + SimpleBuffer* buf = nullptr; + while (th_run_) { + { + std::lock_guard slock(sbuf_mut_); + if (scache_.size() > 0) { + buf = scache_.front(); + scache_.pop(); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + } + std::shared_ptr socket = nullptr; + { + std::lock_guard clock(cli_mut_); + if (!client_map_.count(buf->id_)) { + logger_->warn("{} abandon {}'s data.", __FUNCTION__, buf->id_); + delete buf; + continue; + } + socket = client_map_[buf->id_]->socket_; + } + socket->send(asio::buffer(buf->data_, buf->len_)); + delete buf; + } +} + +void CTcpServer::accept_client() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { @@ -56,22 +160,46 @@ void CTcpServer::Accept() logger_->info("New connection from {}", client_key); { - std::lock_guard lock(mutex_); - client_map_[client_key] = std::make_shared(); + std::lock_guard lock(cli_mut_); + auto cache = std::make_shared(); + cache->socket_ = socket; + client_map_[client_key] = cache; } - client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key); + client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key); } - Accept(); + accept_client(); }); } -void CTcpServer::HandleClient(std::shared_ptr socket, const std::string& client_key) +void CTcpServer::th_client(std::shared_ptr socket, const std::string& client_key) { + std::shared_ptr deleter(new int(0), [&](int* p) { + std::lock_guard lock(cli_mut_); + delete p; + client_map_.erase(client_key); + if (client_threads_.find(client_key) != client_threads_.end()) { + client_threads_.at(client_key).detach(); + client_threads_.erase(client_key); + } + logger_->warn("{} client {} exit.", __FUNCTION__, client_key); + }); + try { + std::shared_ptr cache = nullptr; + + { + std::lock_guard lock(cli_mut_); + if (!client_map_.count(client_key)) { + logger_->error("Not Find Client{} in cache.", client_key); + return; + } + cache = client_map_[client_key]; + } + while (true) { asio::error_code error; - size_t length = socket->read_some(asio::buffer(client_map_[client_key]->tmp_buf_), error); + size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error); if (error == asio::error::eof) { logger_->info("Connection closed by client: {}", client_key); break; @@ -79,25 +207,14 @@ void CTcpServer::HandleClient(std::shared_ptr socket, con throw asio::system_error(error); } - client_map_[client_key]->buffer_.push(client_map_[client_key]->tmp_buf_.data(), length); - auto* frame = CTransProtocal::parse(client_map_[client_key]->buffer_); - frame->id_ = client_key; + cache->buffer_.push(cache->tmp_buf_.data(), length); + auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { - handle_data(frame); - delete frame; + frame->id_ = client_key; + push_frame(frame); } - - // auto relen = socket->send(asio::buffer(data, length)); - // logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen); } } catch (std::exception& e) { logger_->error("Error with client {}: {}", client_key, e.what()); } - - std::lock_guard lock(mutex_); - client_map_.erase(client_key); - if (client_threads_.find(client_key) != client_threads_.end()) { - client_threads_.at(client_key).detach(); - client_threads_.erase(client_key); - } } \ No newline at end of file diff --git a/server/server.h b/server/server.h index cc4c492..70b0a7f 100644 --- a/server/server.h +++ b/server/server.h @@ -1,10 +1,14 @@ #pragma once #include +#include +#include #include #include #include +using namespace ofen; struct ClientCache { + std::shared_ptr socket_; CMutBuffer buffer_; std::array tmp_buf_; }; @@ -16,21 +20,34 @@ public: ~CTcpServer(); public: - bool Start(unsigned short port); - void Stop(); + bool start(unsigned short port); + void stop(); private: - void handle_data(CFrameBuffer* buf); + std::vector get_clients(); + SimpleBuffer* get_client_list(); private: - void Accept(); - void HandleClient(std::shared_ptr socket, const std::string& client_key); + bool push_frame(CFrameBuffer* buf); + void handle_frame(); + void send_simple_buf(); private: + void accept_client(); + void th_client(std::shared_ptr socket, const std::string& client_key); + +private: + bool th_run_{false}; asio::io_context& io_context_; asio::ip::tcp::acceptor acceptor_; std::shared_ptr logger_; std::map> client_map_; std::map client_threads_; - std::mutex mutex_; + std::mutex cli_mut_; + std::queue cache_; + std::queue scache_; + std::mutex buf_mut_; + std::mutex sbuf_mut_; + std::shared_ptr handle_pool_; + std::shared_ptr send_pool_; }; \ No newline at end of file diff --git a/util/util.cpp b/util/util.cpp index 92d1d49..079c751 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -58,6 +58,8 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer) result = new CFrameBuffer(); result->data_ = new char[len]; result->len_ = len; + result->mark_ = mark; + result->type_ = type; std::memset(result->data_, 0x0, len); std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4, len); buffer.remove_of(0, tail_index + 2); @@ -90,4 +92,10 @@ CFrameBuffer::~CFrameBuffer() { delete[] data_; len_ = 0; -} \ No newline at end of file +} + +SimpleBuffer::~SimpleBuffer() +{ + delete[] data_; + len_ = 0; +} diff --git a/util/util.h b/util/util.h index 9f2e79c..0506697 100644 --- a/util/util.h +++ b/util/util.h @@ -15,6 +15,7 @@ class CFrameBuffer public: CFrameBuffer(); ~CFrameBuffer(); + public: std::string id_{}; @@ -24,6 +25,17 @@ public: int len_{}; char mark_{}; }; +class SimpleBuffer +{ +public: + SimpleBuffer() = default; + ~SimpleBuffer(); + +public: + std::string id_; + char* data_{}; + int len_{}; +}; using ExFun_t = std::function; /* 【 transm TCP 数据协议 】