#include "server.h" #include #include #include using namespace ofen; constexpr int check_idle_percycle = 1000 * 30; // 毫秒 constexpr int remove_after_time = 60; // 秒 TransmServer::TransmServer(asio::io_context& io_context) : io_context_(io_context), acceptor_(io_context) { th_run_ = true; sleep_.set_timeout(check_idle_percycle); } TransmServer::~TransmServer() { th_run_ = false; sleep_.contiune(); if (th_monitor_idle_.joinable()) { th_monitor_idle_.join(); } } bool TransmServer::start(unsigned short port) { asio::ip::tcp::resolver resolver(io_context_); asio::ip::tcp::resolver::query query(asio::ip::host_name(), ""); TLOGI("version: {}", VERSION_NUM); TLOGI("opensource: {}", VERSION_URL); try { auto it = resolver.resolve(query); TLOGD("Here are the local IP addresses you may use."); TLOGD("==========================================="); int i = 1; while (!it.empty()) { asio::ip::address addr = it->endpoint().address(); TLOGI("({}){}", i, addr.to_string()); ++it; ++i; } TLOGD("==========================================="); } catch (const std::exception& e) { TLOGW("{}", e.what()); TLOGI("will not show local IP."); } asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { acceptor_.open(endpoint.protocol()); // acceptor_.set_option(asio::socket_base::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); } catch (const asio::system_error& e) { TLOGE("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } auto bound_endpoint = acceptor_.local_endpoint(); server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port()); accept_client(); th_monitor_idle_ = std::thread([this]() { monitor_idle(); }); TLOGI("Server started on port {}", port); return true; } void TransmServer::stop() { acceptor_.close(); std::unique_lock lock(cli_mut_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); } } client_threads_.clear(); } void TransmServer::get_client_list(CMessageInfo& msg_info) { struct TmpInfo { std::string id; std::string online_time; std::string uuid; std::string task; uint64_t timestamp{}; }; std::vector vec; std::string msg; { std::shared_lock lock(cli_mut_); for (const auto& item : client_map_) { TmpInfo tmp; tmp.id = item.first; tmp.online_time = item.second->online_time_; tmp.timestamp = item.second->timestamp; tmp.uuid = item.second->uuid; tmp.task = item.second->task_; vec.push_back(tmp); } } // 排序 vec 根据 client->timestamp std::sort(vec.begin(), vec.end(), [](const TmpInfo& a, const TmpInfo& b) { return a.timestamp < b.timestamp; }); int index = 1; for (const auto& item : vec) { msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id, item.online_time, item.uuid)); auto files = COfStr::split(item.task, "|"); for (const auto& file : files) { msg.append("\n" + file); } msg.append("\n"); ++index; } msg_info.str = msg; } void TransmServer::trans_data(CFrameBuffer* buf) { std::shared_ptr fcli = nullptr; std::shared_ptr tcli = nullptr; { std::shared_lock lock(cli_mut_); if (client_map_.count(buf->fid_)) { fcli = client_map_[buf->fid_]; } if (client_map_.count(buf->tid_)) { tcli = client_map_[buf->tid_]; } } switch (buf->type_) { case TYPE_GET_LIST: { TLOGI("[{}] GetList.", buf->fid_); CMessageInfo msg_info(server_ip_); get_client_list(msg_info); buf->fid_ = server_ip_; serialize(msg_info, &buf->data_, buf->len_); if (fcli && !send_frame(fcli->socket_, buf)) { TLOGE("GetList send failed."); } break; } case TYPE_UP_LIST: { CMessageInfo msg_info(buf->fid_); if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetList deserialize failed.", __LINE__); break; } if (fcli) { fcli->task_ = msg_info.str; fcli->uuid = msg_info.uuid; fcli->task_time_ = OfUtil::now_time(); } break; } case TYPE_CANCEL_LIST: { TLOGI("[{}] Cancle Task.", buf->fid_); if (fcli) { fcli->task_.clear(); fcli->task_time_.clear(); } break; } case TYPE_GET_ID: { buf->tid_ = buf->fid_; send_frame(fcli->socket_, buf); break; } case TYPE_JUDGE_ACTIVE: { if (fcli && tcli) { break; } if (fcli && tcli == nullptr) { buf->type_ = TYPE_OFFLINE; std::swap(buf->fid_, buf->tid_); send_frame(fcli->socket_, buf); break; } break; } default: if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) { TLOGE("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_); } break; } } bool TransmServer::check_double(CFrameBuffer* buf, std::shared_ptr& fcli, std::shared_ptr& tcli) { std::shared_lock lock(cli_mut_); if (client_map_.count(buf->fid_)) { fcli = client_map_[buf->fid_]; } if (client_map_.count(buf->tid_)) { tcli = client_map_[buf->tid_]; } if (fcli == nullptr && tcli) { buf->type_ = TYPE_OFFLINE; TLOGW("A Notic {} That {} Offline.", buf->tid_, buf->fid_); send_frame(tcli->socket_, buf); return false; } if (tcli == nullptr && fcli) { std::swap(buf->fid_, buf->tid_); buf->type_ = TYPE_OFFLINE; TLOGW("B Notic {} That {} Offline.", buf->tid_, buf->fid_); send_frame(fcli->socket_, buf); return false; } if (tcli == nullptr && fcli == nullptr) { TLOGW("Both Offline.", buf->fid_, buf->tid_); return false; } return true; } void TransmServer::accept_client() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { if (!error) { auto endpoint = socket->remote_endpoint(); std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); bool can = false; { std::unique_lock lock(cli_mut_); if (client_map_.size() >= 100) { TLOGI("Max client connections reached. Closing connection from {}", client_key); socket->close(); } else { TLOGI("New connection from {}", client_key); auto cache = std::make_shared(); cache->socket_ = socket; cache->online_time_ = OfUtil::now_time(); cache->timestamp = OfUtil::get_timestamp_ms(); cache->last_active_time_ = std::chrono::high_resolution_clock::now(); client_map_[client_key] = cache; can = true; } } if (!can) { std::this_thread::sleep_for(std::chrono::minutes(1)); } else { client_threads_[client_key] = std::thread(&TransmServer::th_client, this, socket, client_key); } } accept_client(); }); } void TransmServer::th_client(const std::shared_ptr& socket, const std::string& client_key) { std::shared_ptr deleter(new int(0), [&](int* p) { std::unique_lock lock(cli_mut_); delete p; client_map_.erase(client_key); if (client_threads_.find(client_key) != client_threads_.end()) { client_threads_.at(client_key).detach(); client_threads_.erase(client_key); } TLOGW("th_client deleter client {} exit.", client_key); }); try { std::shared_ptr cache = nullptr; { std::shared_lock lock(cli_mut_); if (!client_map_.count(client_key)) { TLOGE("Not Find Client{} in cache.", client_key); return; } cache = client_map_[client_key]; } while (true) { asio::error_code error; size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error); if (error == asio::error::eof) { break; } else if (error) { throw asio::system_error(error); } cache->buffer_.push(cache->tmp_buf_.data(), static_cast(length)); while (true) { auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { if (frame->type_ == TYPE_HEARTS) { std::unique_lock lock(cli_mut_); if (client_map_.count(client_key)) { auto& cli = client_map_[client_key]; cli->last_active_time_ = std::chrono::high_resolution_clock::now(); } delete frame; continue; } if (frame->type_ == TYPE_GET_ID) { CMessageInfo msg_info(""); if (!deserialize(frame->data_, frame->len_, msg_info)) { TLOGE("{} GetId deserialize failed.", __LINE__); delete frame; continue; } std::unique_lock lock(cli_mut_); if (client_map_.count(client_key)) { auto& cli = client_map_[client_key]; cli->uuid = msg_info.uuid; } } frame->fid_ = client_key; // 直接转发,不加入缓存。 trans_data(frame); delete frame; continue; } break; } } } catch (std::exception& e) { TLOGE("Error with client {}: {}", client_key, e.what()); } } bool TransmServer::send_frame(const std::shared_ptr& socket, CFrameBuffer* buf) { char* out_buf{}; int out_len{}; if (buf->fid_.empty()) { buf->fid_ = server_ip_; } if (!CTransProtocal::pack(buf, &out_buf, out_len)) { TLOGE("{} pack failed.", __FUNCTION__); return false; } try { if (!socket->send(asio::buffer(out_buf, out_len))) { TLOGE("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__, static_cast(buf->type_), buf->fid_, buf->tid_); delete[] out_buf; return false; } } catch (const std::exception& e) { TLOGE("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast(buf->type_), buf->fid_, buf->tid_, buf->mark_); } delete[] out_buf; return true; } void TransmServer::monitor_idle() { while (th_run_) { sleep_.sleep(); if (!th_run_) { break; } std::vector remove_vec; std::unique_lock lock(cli_mut_); for (auto& item : client_map_) { auto now = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(now - item.second->last_active_time_).count(); if (duration >= remove_after_time) { TLOGW("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first); remove_vec.push_back(item.first); item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both); item.second->socket_->close(); } } for (const auto& item : remove_vec) { client_map_.erase(item); } } }