378 lines
12 KiB
C++
378 lines
12 KiB
C++
#include "server.h"
|
|
|
|
#include <of_str.h>
|
|
#include <of_util.h>
|
|
#include <version.h>
|
|
|
|
using namespace ofen;
|
|
constexpr int check_idle_percycle = 1000 * 30; // 毫秒
|
|
constexpr int remove_after_time = 60; // 秒
|
|
CTcpServer::CTcpServer(asio::io_context& io_context) : io_context_(io_context), acceptor_(io_context)
|
|
{
|
|
th_run_ = true;
|
|
sleep_.set_timeout(check_idle_percycle);
|
|
}
|
|
|
|
CTcpServer::~CTcpServer()
|
|
{
|
|
th_run_ = false;
|
|
sleep_.contiune();
|
|
if (th_monitor_idle_.joinable()) {
|
|
th_monitor_idle_.join();
|
|
}
|
|
}
|
|
|
|
bool CTcpServer::start(unsigned short port)
|
|
{
|
|
asio::ip::tcp::resolver resolver(io_context_);
|
|
asio::ip::tcp::resolver::query query(asio::ip::host_name(), "");
|
|
TLOGI("version: {}", VERSION_NUM);
|
|
TLOGI("opensource: {}", VERSION_URL);
|
|
try {
|
|
auto it = resolver.resolve(query);
|
|
TLOGD("Here are the local IP addresses you may use.");
|
|
TLOGD("===========================================");
|
|
int i = 1;
|
|
while (!it.empty()) {
|
|
asio::ip::address addr = it->endpoint().address();
|
|
TLOGI("({}){}", i, addr.to_string());
|
|
++it;
|
|
++i;
|
|
}
|
|
TLOGD("===========================================");
|
|
} catch (const std::exception& e) {
|
|
TLOGW("{}", e.what());
|
|
TLOGI("will not show local IP.");
|
|
}
|
|
|
|
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
|
|
try {
|
|
acceptor_.open(endpoint.protocol());
|
|
// acceptor_.set_option(asio::socket_base::reuse_address(true));
|
|
acceptor_.bind(endpoint);
|
|
acceptor_.listen();
|
|
} catch (const asio::system_error& e) {
|
|
TLOGE("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
|
|
return false;
|
|
}
|
|
auto bound_endpoint = acceptor_.local_endpoint();
|
|
server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
|
|
accept_client();
|
|
th_monitor_idle_ = std::thread([this]() { monitor_idle(); });
|
|
TLOGI("Server started on port {}", port);
|
|
return true;
|
|
}
|
|
|
|
void CTcpServer::stop()
|
|
{
|
|
acceptor_.close();
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
for (auto& [key, thread] : client_threads_) {
|
|
if (thread.joinable()) {
|
|
thread.join();
|
|
}
|
|
}
|
|
client_threads_.clear();
|
|
}
|
|
|
|
std::vector<TaskList> CTcpServer::get_clients()
|
|
{
|
|
std::vector<TaskList> result;
|
|
std::shared_lock<std::shared_mutex> lock(cli_mut_);
|
|
for (const auto& item : client_map_) {
|
|
TaskList t;
|
|
t.id_ = item.first;
|
|
t.task_ = item.second->task_;
|
|
t.task_time_ = item.second->task_time_;
|
|
t.online_time_ = item.second->online_time_;
|
|
result.push_back(t);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void CTcpServer::get_client_list(CMessageInfo& msg_info)
|
|
{
|
|
auto vec = get_clients();
|
|
std::string msg;
|
|
int index = 1;
|
|
for (const auto& item : vec) {
|
|
msg.append(fmt::format("[{}][{}][{}]", index, item.id_, item.online_time_));
|
|
auto files = COfStr::split(item.task_, "|");
|
|
for (const auto& file : files) {
|
|
msg.append("\n" + file);
|
|
}
|
|
msg.append("\n");
|
|
++index;
|
|
}
|
|
msg_info.str = msg;
|
|
}
|
|
|
|
void CTcpServer::trans_data(CFrameBuffer* buf)
|
|
{
|
|
std::shared_ptr<ClientCache> fcli = nullptr;
|
|
std::shared_ptr<ClientCache> tcli = nullptr;
|
|
|
|
{
|
|
std::shared_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (client_map_.count(buf->fid_)) {
|
|
fcli = client_map_[buf->fid_];
|
|
}
|
|
if (client_map_.count(buf->tid_)) {
|
|
tcli = client_map_[buf->tid_];
|
|
}
|
|
}
|
|
|
|
switch (buf->type_) {
|
|
case TYPE_GET_LIST: {
|
|
TLOGI("[{}] GetList.", buf->fid_);
|
|
CMessageInfo msg_info;
|
|
get_client_list(msg_info);
|
|
if (fcli) {
|
|
msg_info.uuid = fcli->uuid;
|
|
}
|
|
serialize(msg_info, &buf->data_, buf->len_);
|
|
if (fcli && !send_frame(fcli->socket_, buf)) {
|
|
TLOGE("GetList send failed.");
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_UP_LIST: {
|
|
CMessageInfo msg_info;
|
|
if (!deserialize(buf->data_, buf->len_, msg_info)) {
|
|
TLOGE("{} GetList deserialize failed.", __LINE__);
|
|
break;
|
|
}
|
|
if (fcli) {
|
|
fcli->task_ = msg_info.str;
|
|
fcli->uuid = msg_info.uuid;
|
|
fcli->task_time_ = OfUtil::now_time();
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_CANCEL_LIST: {
|
|
TLOGI("[{}] Cancle Task.", buf->fid_);
|
|
if (fcli) {
|
|
fcli->task_.clear();
|
|
fcli->task_time_.clear();
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_GET_ID: {
|
|
buf->tid_ = buf->fid_;
|
|
send_frame(fcli->socket_, buf);
|
|
break;
|
|
}
|
|
case TYPE_JUDGE_ACTIVE: {
|
|
if (fcli && tcli) {
|
|
break;
|
|
}
|
|
if (fcli && tcli == nullptr) {
|
|
buf->type_ = TYPE_OFFLINE;
|
|
std::swap(buf->fid_, buf->tid_);
|
|
send_frame(fcli->socket_, buf);
|
|
break;
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) {
|
|
TLOGE("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr<ClientCache>& fcli,
|
|
std::shared_ptr<ClientCache>& tcli)
|
|
{
|
|
std::shared_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (client_map_.count(buf->fid_)) {
|
|
fcli = client_map_[buf->fid_];
|
|
}
|
|
if (client_map_.count(buf->tid_)) {
|
|
tcli = client_map_[buf->tid_];
|
|
}
|
|
if (fcli == nullptr && tcli) {
|
|
buf->type_ = TYPE_OFFLINE;
|
|
TLOGW("A Notic {} That {} Offline.", buf->tid_, buf->fid_);
|
|
send_frame(tcli->socket_, buf);
|
|
return false;
|
|
}
|
|
if (tcli == nullptr && fcli) {
|
|
std::swap(buf->fid_, buf->tid_);
|
|
buf->type_ = TYPE_OFFLINE;
|
|
TLOGW("B Notic {} That {} Offline.", buf->tid_, buf->fid_);
|
|
send_frame(fcli->socket_, buf);
|
|
return false;
|
|
}
|
|
if (tcli == nullptr && fcli == nullptr) {
|
|
TLOGW("Both Offline.", buf->fid_, buf->tid_);
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void CTcpServer::accept_client()
|
|
{
|
|
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
|
|
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
|
|
if (!error) {
|
|
auto endpoint = socket->remote_endpoint();
|
|
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
|
|
|
|
bool can = false;
|
|
{
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (client_map_.size() >= 100) {
|
|
TLOGI("Max client connections reached. Closing connection from {}", client_key);
|
|
socket->close();
|
|
} else {
|
|
TLOGI("New connection from {}", client_key);
|
|
auto cache = std::make_shared<ClientCache>();
|
|
cache->socket_ = socket;
|
|
cache->online_time_ = OfUtil::now_time();
|
|
cache->last_active_time_ = std::chrono::high_resolution_clock::now();
|
|
client_map_[client_key] = cache;
|
|
can = true;
|
|
}
|
|
}
|
|
if (!can) {
|
|
std::this_thread::sleep_for(std::chrono::minutes(1));
|
|
} else {
|
|
client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
|
|
}
|
|
}
|
|
accept_client();
|
|
});
|
|
}
|
|
|
|
void CTcpServer::th_client(const std::shared_ptr<asio::ip::tcp::socket>& socket,
|
|
const std::string& client_key)
|
|
{
|
|
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
delete p;
|
|
client_map_.erase(client_key);
|
|
if (client_threads_.find(client_key) != client_threads_.end()) {
|
|
client_threads_.at(client_key).detach();
|
|
client_threads_.erase(client_key);
|
|
}
|
|
TLOGW("th_client deleter client {} exit.", client_key);
|
|
});
|
|
|
|
try {
|
|
std::shared_ptr<ClientCache> cache = nullptr;
|
|
|
|
{
|
|
std::shared_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (!client_map_.count(client_key)) {
|
|
TLOGE("Not Find Client{} in cache.", client_key);
|
|
return;
|
|
}
|
|
cache = client_map_[client_key];
|
|
}
|
|
|
|
while (true) {
|
|
asio::error_code error;
|
|
size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
|
|
if (error == asio::error::eof) {
|
|
break;
|
|
} else if (error) {
|
|
throw asio::system_error(error);
|
|
}
|
|
|
|
cache->buffer_.push(cache->tmp_buf_.data(), static_cast<int>(length));
|
|
while (true) {
|
|
auto* frame = CTransProtocal::parse(cache->buffer_);
|
|
if (frame) {
|
|
if (frame->type_ == TYPE_HEARTS) {
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (client_map_.count(client_key)) {
|
|
auto& cli = client_map_[client_key];
|
|
cli->last_active_time_ = std::chrono::high_resolution_clock::now();
|
|
}
|
|
delete frame;
|
|
continue;
|
|
}
|
|
if (frame->type_ == TYPE_GET_ID) {
|
|
CMessageInfo msg_info;
|
|
if (!deserialize(frame->data_, frame->len_, msg_info)) {
|
|
TLOGE("{} GetId deserialize failed.", __LINE__);
|
|
delete frame;
|
|
continue;
|
|
}
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
if (client_map_.count(client_key)) {
|
|
auto& cli = client_map_[client_key];
|
|
cli->uuid = msg_info.uuid;
|
|
}
|
|
}
|
|
frame->fid_ = client_key;
|
|
// 直接转发,不加入缓存。
|
|
trans_data(frame);
|
|
delete frame;
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
} catch (std::exception& e) {
|
|
TLOGE("Error with client {}: {}", client_key, e.what());
|
|
}
|
|
}
|
|
|
|
bool CTcpServer::send_frame(const std::shared_ptr<asio::ip::tcp::socket>& socket, CFrameBuffer* buf)
|
|
{
|
|
char* out_buf{};
|
|
int out_len{};
|
|
if (buf->fid_.empty()) {
|
|
buf->fid_ = server_ip_;
|
|
}
|
|
|
|
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
|
|
TLOGE("{} pack failed.", __FUNCTION__);
|
|
return false;
|
|
}
|
|
try {
|
|
if (!socket->send(asio::buffer(out_buf, out_len))) {
|
|
TLOGE("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__, static_cast<int>(buf->type_),
|
|
buf->fid_, buf->tid_);
|
|
delete[] out_buf;
|
|
return false;
|
|
}
|
|
} catch (const std::exception& e) {
|
|
TLOGE("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast<int>(buf->type_), buf->fid_,
|
|
buf->tid_, buf->mark_);
|
|
}
|
|
|
|
delete[] out_buf;
|
|
return true;
|
|
}
|
|
|
|
void CTcpServer::monitor_idle()
|
|
{
|
|
while (th_run_) {
|
|
sleep_.sleep();
|
|
if (!th_run_) {
|
|
break;
|
|
}
|
|
std::vector<std::string> remove_vec;
|
|
std::unique_lock<std::shared_mutex> lock(cli_mut_);
|
|
for (auto& item : client_map_) {
|
|
auto now = std::chrono::high_resolution_clock::now();
|
|
auto duration =
|
|
std::chrono::duration_cast<std::chrono::seconds>(now - item.second->last_active_time_)
|
|
.count();
|
|
if (duration >= remove_after_time) {
|
|
TLOGW("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first);
|
|
remove_vec.push_back(item.first);
|
|
item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both);
|
|
item.second->socket_->close();
|
|
}
|
|
}
|
|
for (const auto& item : remove_vec) {
|
|
client_map_.erase(item);
|
|
}
|
|
}
|
|
}
|