#include "server.h" #include #include #include using namespace ofen; constexpr int check_idle_percycle = 1000 * 30; // 毫秒 constexpr int remove_after_time = 60; // 秒 CTcpServer::CTcpServer(asio::io_context& io_context) : io_context_(io_context), acceptor_(io_context) { th_run_ = true; sleep_.set_timeout(check_idle_percycle); } CTcpServer::~CTcpServer() { th_run_ = false; sleep_.contiune(); if (th_monitor_idle_.joinable()) { th_monitor_idle_.join(); } } bool CTcpServer::start(unsigned short port) { asio::ip::tcp::resolver resolver(io_context_); asio::ip::tcp::resolver::query query(asio::ip::host_name(), ""); mpinfo("version: {}", VERSION_NUM); mpinfo("opensource: {}", VERSION_URL); try { auto it = resolver.resolve(query); mpdebug("Here are the local IP addresses you may use."); mpdebug("==========================================="); int i = 1; while (!it.empty()) { asio::ip::address addr = it->endpoint().address(); mpinfo("({}){}", i, addr.to_string()); ++it; ++i; } mpdebug("==========================================="); } catch (const std::exception& e) { mpwarn("{}", e.what()); mpinfo("will not show local IP."); } asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { acceptor_.open(endpoint.protocol()); acceptor_.set_option(asio::socket_base::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); } catch (const asio::system_error& e) { mperror("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } auto bound_endpoint = acceptor_.local_endpoint(); server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port()); accept_client(); th_monitor_idle_ = std::thread([this]() { monitor_idle(); }); mpinfo("Server started on port {}", port); return true; } void CTcpServer::stop() { acceptor_.close(); std::unique_lock lock(cli_mut_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); } } client_threads_.clear(); } std::vector CTcpServer::get_clients() { std::vector result; std::shared_lock lock(cli_mut_); for (const auto& item : client_map_) { TaskList t; t.id_ = item.first; t.task_ = item.second->task_; t.task_time_ = item.second->task_time_; t.online_time_ = item.second->online_time_; result.push_back(t); } return result; } void CTcpServer::get_client_list(CFrameBuffer** buf) { CFrameBuffer* tbuf = *buf; auto vec = get_clients(); std::string msg; int index = 1; for (const auto& item : vec) { msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id_, item.online_time_, item.task_time_)); auto files = COfStr::split(item.task_, "|"); for (const auto& file : files) { msg.append("\n" + file); } msg.append("\n"); ++index; } tbuf->data_ = new char[msg.size() + 1]; std::memset(tbuf->data_, 0x0, msg.size() + 1); tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data()); } void CTcpServer::trans_data(CFrameBuffer* buf) { std::shared_ptr fcli = nullptr; std::shared_ptr tcli = nullptr; { std::shared_lock lock(cli_mut_); if (client_map_.count(buf->fid_)) { fcli = client_map_[buf->fid_]; } if (client_map_.count(buf->tid_)) { tcli = client_map_[buf->tid_]; } } switch (buf->type_) { case TYPE_GET_LIST: { mpinfo("[{}] GetList.", buf->fid_); get_client_list(&buf); if (fcli && !send_frame(fcli->socket_, buf)) { mperror("GetList send failed."); } break; } case TYPE_UP_LIST: { std::string file_list = std::string(buf->data_, buf->len_); if (fcli) { fcli->task_ = file_list; fcli->task_time_ = OfUtil::now_time(); } break; } case TYPE_CANCEL_LIST: { mpinfo("[{}] Cancle Task.", buf->fid_); if (fcli) { fcli->task_.clear(); fcli->task_time_.clear(); } break; } case TYPE_GET_ID: { buf->tid_ = buf->fid_; send_frame(fcli->socket_, buf); break; } case TYPE_JUDGE_ACTIVE: { if (fcli && tcli) { break; } if (fcli && tcli == nullptr) { buf->type_ = TYPE_OFFLINE; std::swap(buf->fid_, buf->tid_); send_frame(fcli->socket_, buf); break; } break; } default: if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) { mperror("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_); } break; } } bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr& fcli, std::shared_ptr& tcli) { std::shared_lock lock(cli_mut_); if (client_map_.count(buf->fid_)) { fcli = client_map_[buf->fid_]; } if (client_map_.count(buf->tid_)) { tcli = client_map_[buf->tid_]; } if (fcli == nullptr && tcli) { buf->type_ = TYPE_OFFLINE; mpwarn("A Notic {} That {} Offline.", buf->tid_, buf->fid_); send_frame(tcli->socket_, buf); return false; } if (tcli == nullptr && fcli) { std::swap(buf->fid_, buf->tid_); buf->type_ = TYPE_OFFLINE; mpwarn("B Notic {} That {} Offline.", buf->tid_, buf->fid_); send_frame(fcli->socket_, buf); return false; } if (tcli == nullptr && fcli == nullptr) { mpwarn("Both Offline.", buf->fid_, buf->tid_); return false; } return true; } void CTcpServer::accept_client() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { if (!error) { auto endpoint = socket->remote_endpoint(); std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); bool can = false; { std::unique_lock lock(cli_mut_); if (client_map_.size() >= 100) { mpinfo("Max client connections reached. Closing connection from {}", client_key); socket->close(); } else { mpinfo("New connection from {}", client_key); auto cache = std::make_shared(); cache->socket_ = socket; cache->online_time_ = OfUtil::now_time(); cache->last_active_time_ = std::chrono::high_resolution_clock::now(); client_map_[client_key] = cache; can = true; } } if (!can) { std::this_thread::sleep_for(std::chrono::minutes(1)); } else { client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key); } } accept_client(); }); } void CTcpServer::th_client(const std::shared_ptr& socket, const std::string& client_key) { std::shared_ptr deleter(new int(0), [&](int* p) { std::unique_lock lock(cli_mut_); delete p; client_map_.erase(client_key); if (client_threads_.find(client_key) != client_threads_.end()) { client_threads_.at(client_key).detach(); client_threads_.erase(client_key); } mpwarn("th_client deleter client {} exit.", client_key); }); try { std::shared_ptr cache = nullptr; { std::shared_lock lock(cli_mut_); if (!client_map_.count(client_key)) { mperror("Not Find Client{} in cache.", client_key); return; } cache = client_map_[client_key]; } while (true) { asio::error_code error; size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error); if (error == asio::error::eof) { break; } else if (error) { throw asio::system_error(error); } cache->buffer_.push(cache->tmp_buf_.data(), static_cast(length)); while (true) { auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { if (frame->type_ == TYPE_HEARTS) { std::unique_lock lock(cli_mut_); if (client_map_.count(client_key)) { auto& cli = client_map_[client_key]; cli->last_active_time_ = std::chrono::high_resolution_clock::now(); } delete frame; continue; } frame->fid_ = client_key; // 直接转发,不加入缓存。 trans_data(frame); delete frame; continue; } break; } } } catch (std::exception& e) { mperror("Error with client {}: {}", client_key, e.what()); } } bool CTcpServer::send_frame(const std::shared_ptr& socket, CFrameBuffer* buf) { char* out_buf{}; int out_len{}; if (buf->fid_.empty()) { buf->fid_ = server_ip_; } if (!CTransProtocal::pack(buf, &out_buf, out_len)) { mperror("{} pack failed.", __FUNCTION__); return false; } try { if (!socket->send(asio::buffer(out_buf, out_len))) { mperror("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__, static_cast(buf->type_), buf->fid_, buf->tid_); delete[] out_buf; return false; } } catch (const std::exception& e) { mperror("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast(buf->type_), buf->fid_, buf->tid_, buf->mark_); } delete[] out_buf; return true; } void CTcpServer::monitor_idle() { while (th_run_) { sleep_.sleep(); if (!th_run_) { break; } std::vector remove_vec; std::unique_lock lock(cli_mut_); for (auto& item : client_map_) { auto now = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(now - item.second->last_active_time_) .count(); if (duration >= remove_after_time) { mpwarn("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first); remove_vec.push_back(item.first); item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both); item.second->socket_->close(); } } for (const auto& item : remove_vec) { client_map_.erase(item); } } }