transm/server/server.cpp

387 lines
13 KiB
C++

#include "server.h"
#include <of_str.h>
#include <of_util.h>
#include <version.h>
using namespace ofen;
constexpr int check_idle_percycle = 1000 * 30; // 毫秒
constexpr int remove_after_time = 60; // 秒
CTcpServer::CTcpServer(asio::io_context& io_context) : io_context_(io_context), acceptor_(io_context)
{
th_run_ = true;
sleep_.set_timeout(check_idle_percycle);
}
CTcpServer::~CTcpServer()
{
th_run_ = false;
sleep_.contiune();
if (th_monitor_idle_.joinable()) {
th_monitor_idle_.join();
}
}
bool CTcpServer::start(unsigned short port)
{
asio::ip::tcp::resolver resolver(io_context_);
asio::ip::tcp::resolver::query query(asio::ip::host_name(), "");
TLOGI("version: {}", VERSION_NUM);
TLOGI("opensource: {}", VERSION_URL);
try {
auto it = resolver.resolve(query);
TLOGD("Here are the local IP addresses you may use.");
TLOGD("===========================================");
int i = 1;
while (!it.empty()) {
asio::ip::address addr = it->endpoint().address();
TLOGI("({}){}", i, addr.to_string());
++it;
++i;
}
TLOGD("===========================================");
} catch (const std::exception& e) {
TLOGW("{}", e.what());
TLOGI("will not show local IP.");
}
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
// acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
TLOGE("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
auto bound_endpoint = acceptor_.local_endpoint();
server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
accept_client();
th_monitor_idle_ = std::thread([this]() { monitor_idle(); });
TLOGI("Server started on port {}", port);
return true;
}
void CTcpServer::stop()
{
acceptor_.close();
std::unique_lock<std::shared_mutex> lock(cli_mut_);
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
void CTcpServer::get_client_list(CMessageInfo& msg_info)
{
struct TmpInfo {
std::string id;
std::string online_time;
std::string uuid;
std::string task;
uint64_t timestamp;
};
std::vector<TmpInfo> vec;
std::string msg;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
for (const auto& item : client_map_) {
TmpInfo tmp;
tmp.id = item.first;
tmp.online_time = item.second->online_time_;
tmp.timestamp = item.second->timestamp;
tmp.uuid = item.second->uuid;
tmp.task = item.second->task_;
vec.push_back(tmp);
}
}
// 排序 vec 根据 client->timestamp
std::sort(vec.begin(), vec.end(),
[](const TmpInfo& a, const TmpInfo& b) { return a.timestamp < b.timestamp; });
int index = 1;
for (const auto& item : vec) {
msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id, item.online_time, item.uuid));
auto files = COfStr::split(item.task, "|");
for (const auto& file : files) {
msg.append("\n" + file);
}
msg.append("\n");
++index;
}
msg_info.str = msg;
}
void CTcpServer::trans_data(CFrameBuffer* buf)
{
std::shared_ptr<ClientCache> fcli = nullptr;
std::shared_ptr<ClientCache> tcli = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
}
}
switch (buf->type_) {
case TYPE_GET_LIST: {
TLOGI("[{}] GetList.", buf->fid_);
CMessageInfo msg_info(server_ip_);
get_client_list(msg_info);
buf->fid_ = server_ip_;
serialize(msg_info, &buf->data_, buf->len_);
if (fcli && !send_frame(fcli->socket_, buf)) {
TLOGE("GetList send failed.");
}
break;
}
case TYPE_UP_LIST: {
CMessageInfo msg_info(buf->fid_);
if (!deserialize(buf->data_, buf->len_, msg_info)) {
TLOGE("{} GetList deserialize failed.", __LINE__);
break;
}
if (fcli) {
fcli->task_ = msg_info.str;
fcli->uuid = msg_info.uuid;
fcli->task_time_ = OfUtil::now_time();
}
break;
}
case TYPE_CANCEL_LIST: {
TLOGI("[{}] Cancle Task.", buf->fid_);
if (fcli) {
fcli->task_.clear();
fcli->task_time_.clear();
}
break;
}
case TYPE_GET_ID: {
buf->tid_ = buf->fid_;
send_frame(fcli->socket_, buf);
break;
}
case TYPE_JUDGE_ACTIVE: {
if (fcli && tcli) {
break;
}
if (fcli && tcli == nullptr) {
buf->type_ = TYPE_OFFLINE;
std::swap(buf->fid_, buf->tid_);
send_frame(fcli->socket_, buf);
break;
}
break;
}
default:
if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) {
TLOGE("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_);
}
break;
}
}
bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr<ClientCache>& fcli,
std::shared_ptr<ClientCache>& tcli)
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
fcli = client_map_[buf->fid_];
}
if (client_map_.count(buf->tid_)) {
tcli = client_map_[buf->tid_];
}
if (fcli == nullptr && tcli) {
buf->type_ = TYPE_OFFLINE;
TLOGW("A Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(tcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli) {
std::swap(buf->fid_, buf->tid_);
buf->type_ = TYPE_OFFLINE;
TLOGW("B Notic {} That {} Offline.", buf->tid_, buf->fid_);
send_frame(fcli->socket_, buf);
return false;
}
if (tcli == nullptr && fcli == nullptr) {
TLOGW("Both Offline.", buf->fid_, buf->tid_);
return false;
}
return true;
}
void CTcpServer::accept_client()
{
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
bool can = false;
{
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.size() >= 100) {
TLOGI("Max client connections reached. Closing connection from {}", client_key);
socket->close();
} else {
TLOGI("New connection from {}", client_key);
auto cache = std::make_shared<ClientCache>();
cache->socket_ = socket;
cache->online_time_ = OfUtil::now_time();
cache->timestamp = OfUtil::get_timestamp_ms();
cache->last_active_time_ = std::chrono::high_resolution_clock::now();
client_map_[client_key] = cache;
can = true;
}
}
if (!can) {
std::this_thread::sleep_for(std::chrono::minutes(1));
} else {
client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
}
}
accept_client();
});
}
void CTcpServer::th_client(const std::shared_ptr<asio::ip::tcp::socket>& socket,
const std::string& client_key)
{
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
delete p;
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
TLOGW("th_client deleter client {} exit.", client_key);
});
try {
std::shared_ptr<ClientCache> cache = nullptr;
{
std::shared_lock<std::shared_mutex> lock(cli_mut_);
if (!client_map_.count(client_key)) {
TLOGE("Not Find Client{} in cache.", client_key);
return;
}
cache = client_map_[client_key];
}
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
if (error == asio::error::eof) {
break;
} else if (error) {
throw asio::system_error(error);
}
cache->buffer_.push(cache->tmp_buf_.data(), static_cast<int>(length));
while (true) {
auto* frame = CTransProtocal::parse(cache->buffer_);
if (frame) {
if (frame->type_ == TYPE_HEARTS) {
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(client_key)) {
auto& cli = client_map_[client_key];
cli->last_active_time_ = std::chrono::high_resolution_clock::now();
}
delete frame;
continue;
}
if (frame->type_ == TYPE_GET_ID) {
CMessageInfo msg_info("");
if (!deserialize(frame->data_, frame->len_, msg_info)) {
TLOGE("{} GetId deserialize failed.", __LINE__);
delete frame;
continue;
}
std::unique_lock<std::shared_mutex> lock(cli_mut_);
if (client_map_.count(client_key)) {
auto& cli = client_map_[client_key];
cli->uuid = msg_info.uuid;
}
}
frame->fid_ = client_key;
// 直接转发,不加入缓存。
trans_data(frame);
delete frame;
continue;
}
break;
}
}
} catch (std::exception& e) {
TLOGE("Error with client {}: {}", client_key, e.what());
}
}
bool CTcpServer::send_frame(const std::shared_ptr<asio::ip::tcp::socket>& socket, CFrameBuffer* buf)
{
char* out_buf{};
int out_len{};
if (buf->fid_.empty()) {
buf->fid_ = server_ip_;
}
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
TLOGE("{} pack failed.", __FUNCTION__);
return false;
}
try {
if (!socket->send(asio::buffer(out_buf, out_len))) {
TLOGE("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__, static_cast<int>(buf->type_),
buf->fid_, buf->tid_);
delete[] out_buf;
return false;
}
} catch (const std::exception& e) {
TLOGE("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast<int>(buf->type_), buf->fid_,
buf->tid_, buf->mark_);
}
delete[] out_buf;
return true;
}
void CTcpServer::monitor_idle()
{
while (th_run_) {
sleep_.sleep();
if (!th_run_) {
break;
}
std::vector<std::string> remove_vec;
std::unique_lock<std::shared_mutex> lock(cli_mut_);
for (auto& item : client_map_) {
auto now = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(now - item.second->last_active_time_)
.count();
if (duration >= remove_after_time) {
TLOGW("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first);
remove_vec.push_back(item.first);
item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both);
item.second->socket_->close();
}
}
for (const auto& item : remove_vec) {
client_map_.erase(item);
}
}
}