#include "server.h" #include using namespace ofen; constexpr int g_ParseThreadNum = 1; CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) : io_context_(io_context), logger_(logger), acceptor_(io_context) { th_run_ = true; handle_pool_ = std::make_shared(g_ParseThreadNum); handle_pool_->init(); for (int i = 0; i < g_ParseThreadNum; ++i) { handle_pool_->submit([&]() { handle_frame(); }); } } CTcpServer::~CTcpServer() { th_run_ = false; handle_pool_->close_wait_all(); } bool CTcpServer::start(unsigned short port) { asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { acceptor_.open(endpoint.protocol()); acceptor_.set_option(asio::socket_base::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); } catch (const asio::system_error& e) { logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } auto bound_endpoint = acceptor_.local_endpoint(); server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port()); accept_client(); logger_->info("Server started on port {}", port); return true; } void CTcpServer::stop() { acceptor_.close(); std::lock_guard lock(cli_mut_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); } } client_threads_.clear(); } std::vector CTcpServer::get_clients() { std::vector result; std::lock_guard lock(cli_mut_); for (const auto& item : client_map_) { TaskList t; t.id_ = item.first; t.task_ = item.second->task_; t.time_ = item.second->time_; result.push_back(t); } return result; } void CTcpServer::get_client_list(CFrameBuffer** buf) { CFrameBuffer* tbuf = *buf; auto vec = get_clients(); std::string msg; int index = 1; for (const auto& item : vec) { msg.append(fmt::format("[{}][{}][{}]", index, item.id_, item.time_)); auto files = COfStr::split(item.task_, "|"); for (const auto& file : files) { msg.append("\n" + file); } msg.append("\n"); ++index; } tbuf->data_ = new char[msg.size() + 1]; std::memset(tbuf->data_, 0x0, msg.size() + 1); tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data()); } void CTcpServer::handle_frame() { CFrameBuffer* buf = nullptr; while (th_run_) { { std::lock_guard lock(buf_mut_); if (!cache_.empty()) { buf = cache_.front(); cache_.pop_front(); } } if (!buf) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } switch (buf->type_) { case TYPE_GET_LIST: { logger_->info("[{}] GetList.", buf->fid_); get_client_list(&buf); std::lock_guard lock(cli_mut_); if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; if (!send_frame(cli->socket_, buf)) { logger_->error("GetList send failed."); } } break; } case TYPE_UP_LIST: { logger_->info("[{}] UpList. {}", buf->fid_, std::string(buf->data_, buf->len_)); std::lock_guard lock(cli_mut_); if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; cli->task_ = std::string(buf->data_, buf->len_); cli->time_ = OfUtil::now_time(); } break; } case TYPE_CANCEL_LIST: { logger_->info("[{}] Cancle Task.", buf->fid_); std::lock_guard lock(cli_mut_); if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; cli->task_.clear(); } break; } // 两边发送OPEN case TYPE_OPEN_FILE: { std::lock_guard lock(cli_mut_); if (client_map_.count(buf->tid_)) { auto& cli = client_map_[buf->tid_]; if (!send_frame(cli->socket_, buf)) { logger_->error("[{}] turn tid_ ailed to {}", buf->fid_, buf->tid_); } } if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; buf->mark_ = 1; buf->fid_ = buf->tid_; if (!send_frame(cli->socket_, buf)) { logger_->error("[{}] turn fid_ failed to {}", buf->fid_, buf->tid_); } } break; }; case TYPE_TRANS_DONE: case TYPE_READY_TRANS: case TYPE_TRANS_FILE: { std::lock_guard lock(cli_mut_); if (client_map_.count(buf->tid_)) { auto& cli = client_map_[buf->tid_]; if (!send_frame(cli->socket_, buf)) { logger_->error("[{}] turn failed to {}", buf->fid_, buf->tid_); } } break; } default: break; } delete buf; buf = nullptr; } } void CTcpServer::accept_client() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { if (!error) { auto endpoint = socket->remote_endpoint(); std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); logger_->info("New connection from {}", client_key); { std::lock_guard lock(cli_mut_); auto cache = std::make_shared(); cache->socket_ = socket; client_map_[client_key] = cache; } client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key); } accept_client(); }); } void CTcpServer::th_client(std::shared_ptr socket, const std::string& client_key) { std::shared_ptr deleter(new int(0), [&](int* p) { std::lock_guard lock(cli_mut_); delete p; client_map_.erase(client_key); if (client_threads_.find(client_key) != client_threads_.end()) { client_threads_.at(client_key).detach(); client_threads_.erase(client_key); } logger_->warn("{} client {} exit.", __FUNCTION__, client_key); }); try { std::shared_ptr cache = nullptr; { std::lock_guard lock(cli_mut_); if (!client_map_.count(client_key)) { logger_->error("Not Find Client{} in cache.", client_key); return; } cache = client_map_[client_key]; } while (true) { asio::error_code error; size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error); if (error == asio::error::eof) { logger_->info("Connection closed by client: {}", client_key); break; } else if (error) { throw asio::system_error(error); } cache->buffer_.push(cache->tmp_buf_.data(), length); auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { frame->fid_ = client_key; std::lock_guard lock(buf_mut_); cache_.push_back(frame); } } } catch (std::exception& e) { logger_->error("Error with client {}: {}", client_key, e.what()); } } bool CTcpServer::send_frame(std::shared_ptr socket, CFrameBuffer* buf) { char* out_buf{}; int out_len{}; if (buf->fid_.empty()) { buf->fid_ = server_ip_; } if (!CTransProtocal::pack(buf, &out_buf, out_len)) { logger_->error("{} pack failed.", __FUNCTION__); return false; } if (!socket->send(asio::buffer(out_buf, out_len))) { logger_->error("{} send failed.", __FUNCTION__); delete[] out_buf; return false; } delete[] out_buf; return true; }