transm/server/server.cpp

357 lines
12 KiB
C++

#include "server.h"
#include <of_str.h>
#include <of_util.h>
using namespace ofen;
constexpr int g_MaxCacheLen = 1024 * 1024 * 50;
constexpr int check_idle_percycle = 1000 * 30; // 毫秒
constexpr int remove_after_time = 60; // 秒
CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: io_context_(io_context), acceptor_(io_context), logger_(logger)
{
th_run_ = true;
sleep_.set_timeout(check_idle_percycle);
}
CTcpServer::~CTcpServer()
{
th_run_ = false;
sleep_.contiune();
if (th_monitor_idle_.joinable()) {
th_monitor_idle_.join();
}
}
bool CTcpServer::start(unsigned short port)
{
asio::ip::tcp::resolver resolver(io_context_);
asio::ip::tcp::resolver::query query(asio::ip::host_name(), "");
asio::ip::tcp::resolver::iterator it = resolver.resolve(query);
logger_->debug("Here are the local IP addresses you may use.");
logger_->debug("===========================================");
int i = 1;
while (it != asio::ip::tcp::resolver::iterator()) {
asio::ip::address addr = it->endpoint().address();
logger_->info("({}){}", i, addr.to_string());
++it;
++i;
}
logger_->debug("===========================================");
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
auto bound_endpoint = acceptor_.local_endpoint();
server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
accept_client();
th_monitor_idle_ = std::thread([this]() { monitor_idle(); });
logger_->info("Server started on port {}", port);
return true;
}
void CTcpServer::stop()
{
acceptor_.close();
std::unique_lock<std::shared_mutex> lock(cli_mut_);
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
std::vector<TaskList> CTcpServer::get_clients()
{
std::vector<TaskList> result;
std::shared_lock<std::shared_mutex> lock(cli_mut_);
for (const auto& item : client_map_) {
TaskList t;
t.id_ = item.first;
t.task_ = item.second->task_;
t.task_time_ = item.second->task_time_;
t.online_time_ = item.second->online_time_;
result.push_back(t);
}
return result;
}
void CTcpServer::get_client_list(CFrameBuffer** buf)
{
CFrameBuffer* tbuf = *buf;
auto vec = get_clients();
std::string msg;
int index = 1;
for (const auto& item : vec) {
msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id_, item.online_time_, item.task_time_));
auto files = COfStr::split(item.task_, "|");
for (const auto& file : files) {
msg.append("\n" + file);
}
msg.append("\n");
++index;
}
tbuf->data_ = new char[msg.size() + 1];
std::memset(tbuf->data_, 0x0, msg.size() + 1);
tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data());
}
void CTcpServer::trans_data(CFrameBuffer* buf)
{
std::shared_ptr<ClientCache> fcli = nullptr;
std::shared_ptr<ClientCache> tcli = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
}
}
switch (buf->type_) {
case TYPE_GET_LIST: {
logger_->info("[{}] GetList.", buf->fid_);
get_client_list(&buf);
if (fcli && !send_frame(fcli->socket_, buf)) {
logger_->error("GetList send failed.");
}
break;
}
case TYPE_UP_LIST: {
std::string files_path = std::string(buf->data_, buf->len_);
#ifdef _WIN32
std::string turn_files_path = CCodec::u8_to_ansi(files_path);
#else
std::string turn_files_path(files_path);
#endif
logger_->info("[{}] UpList. {}", buf->fid_, turn_files_path);
if (fcli) {
fcli->task_ = files_path;
fcli->task_time_ = OfUtil::now_time();
}
break;
}
case TYPE_CANCEL_LIST: {
logger_->info("[{}] Cancle Task.", buf->fid_);
if (fcli) {
fcli->task_.clear();
fcli->task_time_.clear();
}
break;
}
case TYPE_GET_ID: {
buf->tid_ = buf->fid_;
send_frame(fcli->socket_, buf);
break;
}
case TYPE_JUDGE_ACTIVE: {
if (fcli && tcli) {
break;
}
if (fcli && tcli == nullptr) {
buf->type_ = TYPE_OFFLINE;
std::swap(buf->fid_, buf->tid_);
send_frame(fcli->socket_, buf);
break;
}
break;
}
default:
if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) {
logger_->error("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_);
}
break;
}
}
bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr<ClientCache>& fcli,
std::shared_ptr<ClientCache>& tcli)
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
}
if (fcli == nullptr && tcli) {
buf->type_ = TYPE_OFFLINE;
logger_->warn("A Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(tcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli) {
std::swap(buf->fid_, buf->tid_);
buf->type_ = TYPE_OFFLINE;
logger_->warn("B Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(fcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli == nullptr) {
logger_->warn("Both Offline.", buf->fid_, buf->tid_);
return false;
}
return true;
}
void CTcpServer::accept_client()
{
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
bool can = false;
{
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.size() >= 100) {
logger_->info("Max client connections reached. Closing connection from {}", client_key);
socket->close();
} else {
logger_->info("New connection from {}", client_key);
auto cache = std::make_shared<ClientCache>();
cache->socket_ = socket;
cache->online_time_ = OfUtil::now_time();
cache->last_active_time_ = std::chrono::high_resolution_clock::now();
client_map_[client_key] = cache;
can = true;
}
}
if (can == false) {
std::this_thread::sleep_for(std::chrono::minutes(1));
} else {
client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
}
}
accept_client();
});
}
void CTcpServer::th_client(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key)
{
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
delete p;
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
logger_->warn("{} client {} exit.", __FUNCTION__, client_key);
});
try {
std::shared_ptr<ClientCache> cache = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (!client_map_.count(client_key)) {
logger_->error("Not Find Client{} in cache.", client_key);
return;
}
cache = client_map_[client_key];
}
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
if (error == asio::error::eof) {
break;
} else if (error) {
throw asio::system_error(error);
}
cache->buffer_.push(cache->tmp_buf_.data(), length);
while (true) {
auto* frame = CTransProtocal::parse(cache->buffer_);
if (frame) {
if (frame->type_ == TYPE_HEARTS) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(client_key)) {
auto& cli = client_map_[client_key];
cli->last_active_time_ = std::chrono::high_resolution_clock::now();
}
delete frame;
continue;
}
frame->fid_ = client_key;
// 直接转发,不加入缓存。
trans_data(frame);
delete frame;
continue;
}
break;
}
}
} catch (std::exception& e) {
logger_->error("Error with client {}: {}", client_key, e.what());
}
}
bool CTcpServer::send_frame(std::shared_ptr<asio::ip::tcp::socket> socket, CFrameBuffer* buf)
{
char* out_buf{};
int out_len{};
if (buf->fid_.empty()) {
buf->fid_ = server_ip_;
}
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
logger_->error("{} pack failed.", __FUNCTION__);
return false;
}
try {
if (!socket->send(asio::buffer(out_buf, out_len))) {
logger_->error("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__,
static_cast<int>(buf->type_), buf->fid_, buf->tid_);
delete[] out_buf;
return false;
}
} catch (const std::exception& e) {
logger_->error("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast<int>(buf->type_),
buf->fid_, buf->tid_, buf->mark_);
}
delete[] out_buf;
return true;
}
void CTcpServer::monitor_idle()
{
while (th_run_) {
sleep_.sleep();
if (!th_run_) {
break;
}
std::vector<std::string> remove_vec;
std::unique_lock<std::shared_mutex> lock(cli_mut_);
for (auto& item : client_map_) {
auto now = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(now - item.second->last_active_time_)
.count();
if (duration >= remove_after_time) {
logger_->warn("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first);
remove_vec.push_back(item.first);
item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both);
item.second->socket_->close();
}
}
for (const auto& item : remove_vec) {
client_map_.erase(item);
}
}
}