transm/server/server.cpp

358 lines
12 KiB
C++
Raw Normal View History

#include "server.h"
#include <of_str.h>
#include <of_util.h>
2025-02-16 14:58:03 +08:00
#include <version.h>
using namespace ofen;
constexpr int check_idle_percycle = 1000 * 30; // 毫秒
constexpr int remove_after_time = 60; // 秒
CTcpServer::CTcpServer(asio::io_context& io_context, const Log_t& log)
: io_context_(io_context), acceptor_(io_context), log_(log)
2024-12-12 23:11:55 +08:00
{
th_run_ = true;
sleep_.set_timeout(check_idle_percycle);
2024-12-12 23:11:55 +08:00
}
2024-12-12 23:11:55 +08:00
CTcpServer::~CTcpServer()
{
th_run_ = false;
sleep_.contiune();
if (th_monitor_idle_.joinable()) {
th_monitor_idle_.join();
}
}
bool CTcpServer::start(unsigned short port)
{
asio::ip::tcp::resolver resolver(io_context_);
asio::ip::tcp::resolver::query query(asio::ip::host_name(), "");
log_->info("version: {}", VERSION_NUM);
log_->info("opensource: {}", VERSION_URL);
try {
auto it = resolver.resolve(query);
log_->debug("Here are the local IP addresses you may use.");
log_->debug("===========================================");
int i = 1;
2025-02-06 09:37:24 +08:00
while (!it.empty()) {
asio::ip::address addr = it->endpoint().address();
log_->info("({}){}", i, addr.to_string());
++it;
++i;
}
log_->debug("===========================================");
} catch (const std::exception& e) {
log_->warn("{}", e.what());
log_->info("will not show local IP.");
}
2024-12-12 23:11:55 +08:00
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
log_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
2024-12-12 23:11:55 +08:00
return false;
}
auto bound_endpoint = acceptor_.local_endpoint();
server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
accept_client();
th_monitor_idle_ = std::thread([this]() { monitor_idle(); });
log_->info("Server started on port {}", port);
2024-12-12 23:11:55 +08:00
return true;
}
void CTcpServer::stop()
{
2024-12-12 23:11:55 +08:00
acceptor_.close();
std::unique_lock<std::shared_mutex> lock(cli_mut_);
2024-12-12 23:11:55 +08:00
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
std::vector<TaskList> CTcpServer::get_clients()
{
std::vector<TaskList> result;
std::shared_lock<std::shared_mutex> lock(cli_mut_);
for (const auto& item : client_map_) {
TaskList t;
t.id_ = item.first;
t.task_ = item.second->task_;
t.task_time_ = item.second->task_time_;
t.online_time_ = item.second->online_time_;
result.push_back(t);
}
return result;
}
2024-12-14 19:49:44 +08:00
void CTcpServer::get_client_list(CFrameBuffer** buf)
{
2024-12-14 19:49:44 +08:00
CFrameBuffer* tbuf = *buf;
auto vec = get_clients();
std::string msg;
int index = 1;
for (const auto& item : vec) {
msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id_, item.online_time_, item.task_time_));
auto files = COfStr::split(item.task_, "|");
for (const auto& file : files) {
msg.append("\n" + file);
}
msg.append("\n");
++index;
}
2024-12-14 19:49:44 +08:00
tbuf->data_ = new char[msg.size() + 1];
std::memset(tbuf->data_, 0x0, msg.size() + 1);
tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data());
}
void CTcpServer::trans_data(CFrameBuffer* buf)
{
std::shared_ptr<ClientCache> fcli = nullptr;
std::shared_ptr<ClientCache> tcli = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
2024-12-13 12:39:34 +08:00
}
}
switch (buf->type_) {
case TYPE_GET_LIST: {
log_->info("[{}] GetList.", buf->fid_);
get_client_list(&buf);
if (fcli && !send_frame(fcli->socket_, buf)) {
log_->error("GetList send failed.");
}
break;
}
case TYPE_UP_LIST: {
std::string file_list = std::string(buf->data_, buf->len_);
if (fcli) {
fcli->task_ = file_list;
fcli->task_time_ = OfUtil::now_time();
}
break;
}
case TYPE_CANCEL_LIST: {
log_->info("[{}] Cancle Task.", buf->fid_);
if (fcli) {
fcli->task_.clear();
fcli->task_time_.clear();
}
break;
}
case TYPE_GET_ID: {
buf->tid_ = buf->fid_;
send_frame(fcli->socket_, buf);
break;
}
case TYPE_JUDGE_ACTIVE: {
if (fcli && tcli) {
break;
}
if (fcli && tcli == nullptr) {
buf->type_ = TYPE_OFFLINE;
std::swap(buf->fid_, buf->tid_);
send_frame(fcli->socket_, buf);
break;
}
break;
}
default:
2024-12-19 15:54:42 +08:00
if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) {
log_->error("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_);
}
break;
}
}
2024-12-19 15:54:42 +08:00
bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr<ClientCache>& fcli,
std::shared_ptr<ClientCache>& tcli)
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
2024-12-19 15:54:42 +08:00
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
}
if (fcli == nullptr && tcli) {
buf->type_ = TYPE_OFFLINE;
log_->warn("A Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(tcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli) {
std::swap(buf->fid_, buf->tid_);
buf->type_ = TYPE_OFFLINE;
log_->warn("B Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(fcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli == nullptr) {
log_->warn("Both Offline.", buf->fid_, buf->tid_);
return false;
}
return true;
}
void CTcpServer::accept_client()
{
2024-12-12 23:11:55 +08:00
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
bool can = false;
2024-12-12 23:11:55 +08:00
{
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.size() >= 100) {
log_->info("Max client connections reached. Closing connection from {}", client_key);
socket->close();
} else {
log_->info("New connection from {}", client_key);
auto cache = std::make_shared<ClientCache>();
cache->socket_ = socket;
cache->online_time_ = OfUtil::now_time();
cache->last_active_time_ = std::chrono::high_resolution_clock::now();
client_map_[client_key] = cache;
can = true;
}
}
2025-02-06 09:37:24 +08:00
if (!can) {
std::this_thread::sleep_for(std::chrono::minutes(1));
} else {
client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
2024-12-12 23:11:55 +08:00
}
}
accept_client();
2024-12-12 23:11:55 +08:00
});
}
2024-12-12 23:11:55 +08:00
2025-02-06 09:37:24 +08:00
void CTcpServer::th_client(const std::shared_ptr<asio::ip::tcp::socket>& socket,
const std::string& client_key)
2024-12-12 23:11:55 +08:00
{
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
delete p;
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
log_->warn("th_client deleter client {} exit.", client_key);
});
2024-12-12 23:11:55 +08:00
try {
std::shared_ptr<ClientCache> cache = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (!client_map_.count(client_key)) {
log_->error("Not Find Client{} in cache.", client_key);
return;
}
cache = client_map_[client_key];
}
2024-12-12 23:11:55 +08:00
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
2024-12-12 23:11:55 +08:00
if (error == asio::error::eof) {
break;
} else if (error) {
throw asio::system_error(error);
}
2025-02-06 09:37:24 +08:00
cache->buffer_.push(cache->tmp_buf_.data(), static_cast<int>(length));
while (true) {
auto* frame = CTransProtocal::parse(cache->buffer_);
if (frame) {
if (frame->type_ == TYPE_HEARTS) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(client_key)) {
auto& cli = client_map_[client_key];
cli->last_active_time_ = std::chrono::high_resolution_clock::now();
}
delete frame;
continue;
}
frame->fid_ = client_key;
// 直接转发,不加入缓存。
trans_data(frame);
delete frame;
continue;
}
break;
2024-12-12 23:11:55 +08:00
}
}
} catch (std::exception& e) {
log_->error("Error with client {}: {}", client_key, e.what());
2024-12-12 23:11:55 +08:00
}
}
2025-02-06 09:37:24 +08:00
bool CTcpServer::send_frame(const std::shared_ptr<asio::ip::tcp::socket>& socket, CFrameBuffer* buf)
{
char* out_buf{};
int out_len{};
if (buf->fid_.empty()) {
buf->fid_ = server_ip_;
}
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
log_->error("{} pack failed.", __FUNCTION__);
return false;
}
try {
if (!socket->send(asio::buffer(out_buf, out_len))) {
log_->error("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__,
static_cast<int>(buf->type_), buf->fid_, buf->tid_);
delete[] out_buf;
return false;
}
} catch (const std::exception& e) {
log_->error("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast<int>(buf->type_), buf->fid_,
buf->tid_, buf->mark_);
}
delete[] out_buf;
return true;
}
void CTcpServer::monitor_idle()
{
while (th_run_) {
sleep_.sleep();
if (!th_run_) {
break;
}
std::vector<std::string> remove_vec;
std::unique_lock<std::shared_mutex> lock(cli_mut_);
for (auto& item : client_map_) {
auto now = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(now - item.second->last_active_time_)
.count();
if (duration >= remove_after_time) {
log_->warn("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first);
remove_vec.push_back(item.first);
item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both);
item.second->socket_->close();
}
}
for (const auto& item : remove_vec) {
client_map_.erase(item);
}
}
}