#include "server.h"

#include <of_str.h>
#include <of_util.h>
#include <version.h>

using namespace ofen;
constexpr int check_idle_percycle = 1000 * 30;   // 毫秒
constexpr int remove_after_time = 60;            // 秒
CTcpServer::CTcpServer(asio::io_context& io_context) : io_context_(io_context), acceptor_(io_context)
{
    th_run_ = true;
    sleep_.set_timeout(check_idle_percycle);
}

CTcpServer::~CTcpServer()
{
    th_run_ = false;
    sleep_.contiune();
    if (th_monitor_idle_.joinable()) {
        th_monitor_idle_.join();
    }
}

bool CTcpServer::start(unsigned short port)
{
    asio::ip::tcp::resolver resolver(io_context_);
    asio::ip::tcp::resolver::query query(asio::ip::host_name(), "");
    TLOGI("version: {}", VERSION_NUM);
    TLOGI("opensource: {}", VERSION_URL);
    try {
        auto it = resolver.resolve(query);
        TLOGD("Here are the local IP addresses you may use.");
        TLOGD("===========================================");
        int i = 1;
        while (!it.empty()) {
            asio::ip::address addr = it->endpoint().address();
            TLOGI("({}){}", i, addr.to_string());
            ++it;
            ++i;
        }
        TLOGD("===========================================");
    } catch (const std::exception& e) {
        TLOGW("{}", e.what());
        TLOGI("will not show local IP.");
    }

    asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
    try {
        acceptor_.open(endpoint.protocol());
        // acceptor_.set_option(asio::socket_base::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
    } catch (const asio::system_error& e) {
        TLOGE("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
        return false;
    }
    auto bound_endpoint = acceptor_.local_endpoint();
    server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
    accept_client();
    th_monitor_idle_ = std::thread([this]() { monitor_idle(); });
    TLOGI("Server started on port {}", port);
    return true;
}

void CTcpServer::stop()
{
    acceptor_.close();
    std::unique_lock<std::shared_mutex> lock(cli_mut_);
    for (auto& [key, thread] : client_threads_) {
        if (thread.joinable()) {
            thread.join();
        }
    }
    client_threads_.clear();
}

void CTcpServer::get_client_list(CMessageInfo& msg_info)
{
    struct TmpInfo {
        std::string id;
        std::string online_time;
        std::string uuid;
        std::string task;
        uint64_t timestamp;
    };
    std::vector<TmpInfo> vec;
    std::string msg;

    {
        std::shared_lock<std::shared_mutex> lock(cli_mut_);
        for (const auto& item : client_map_) {
            TmpInfo tmp;
            tmp.id = item.first;
            tmp.online_time = item.second->online_time_;
            tmp.timestamp = item.second->timestamp;
            tmp.uuid = item.second->uuid;
            tmp.task = item.second->task_;
            vec.push_back(tmp);
        }
    }

    // 排序 vec 根据 client->timestamp
    std::sort(vec.begin(), vec.end(),
              [](const TmpInfo& a, const TmpInfo& b) { return a.timestamp < b.timestamp; });

    int index = 1;
    for (const auto& item : vec) {
        msg.append(fmt::format("[{}][{}][{}][{}]", index, item.id, item.online_time, item.uuid));
        auto files = COfStr::split(item.task, "|");
        for (const auto& file : files) {
            msg.append("\n" + file);
        }
        msg.append("\n");
        ++index;
    }
    msg_info.str = msg;
}

void CTcpServer::trans_data(CFrameBuffer* buf)
{
    std::shared_ptr<ClientCache> fcli = nullptr;
    std::shared_ptr<ClientCache> tcli = nullptr;

    {
        std::shared_lock<std::shared_mutex> lock(cli_mut_);
        if (client_map_.count(buf->fid_)) {
            fcli = client_map_[buf->fid_];
        }
        if (client_map_.count(buf->tid_)) {
            tcli = client_map_[buf->tid_];
        }
    }

    switch (buf->type_) {
    case TYPE_GET_LIST: {
        TLOGI("[{}] GetList.", buf->fid_);
        CMessageInfo msg_info;
        get_client_list(msg_info);
        serialize(msg_info, &buf->data_, buf->len_);
        if (fcli && !send_frame(fcli->socket_, buf)) {
            TLOGE("GetList send failed.");
        }
        break;
    }
    case TYPE_UP_LIST: {
        CMessageInfo msg_info;
        if (!deserialize(buf->data_, buf->len_, msg_info)) {
            TLOGE("{} GetList deserialize failed.", __LINE__);
            break;
        }
        if (fcli) {
            fcli->task_ = msg_info.str;
            fcli->uuid = msg_info.uuid;
            fcli->task_time_ = OfUtil::now_time();
        }
        break;
    }
    case TYPE_CANCEL_LIST: {
        TLOGI("[{}] Cancle Task.", buf->fid_);
        if (fcli) {
            fcli->task_.clear();
            fcli->task_time_.clear();
        }
        break;
    }
    case TYPE_GET_ID: {
        buf->tid_ = buf->fid_;
        send_frame(fcli->socket_, buf);
        break;
    }
    case TYPE_JUDGE_ACTIVE: {
        if (fcli && tcli) {
            break;
        }
        if (fcli && tcli == nullptr) {
            buf->type_ = TYPE_OFFLINE;
            std::swap(buf->fid_, buf->tid_);
            send_frame(fcli->socket_, buf);
            break;
        }
        break;
    }
    default:
        if (check_double(buf, fcli, tcli) && tcli && !send_frame(tcli->socket_, buf)) {
            TLOGE("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_);
        }
        break;
    }
}

bool CTcpServer::check_double(CFrameBuffer* buf, std::shared_ptr<ClientCache>& fcli,
                              std::shared_ptr<ClientCache>& tcli)
{
    std::shared_lock<std::shared_mutex> lock(cli_mut_);
    if (client_map_.count(buf->fid_)) {
        fcli = client_map_[buf->fid_];
    }
    if (client_map_.count(buf->tid_)) {
        tcli = client_map_[buf->tid_];
    }
    if (fcli == nullptr && tcli) {
        buf->type_ = TYPE_OFFLINE;
        TLOGW("A Notic {} That {} Offline.", buf->tid_, buf->fid_);
        send_frame(tcli->socket_, buf);
        return false;
    }
    if (tcli == nullptr && fcli) {
        std::swap(buf->fid_, buf->tid_);
        buf->type_ = TYPE_OFFLINE;
        TLOGW("B Notic {} That {} Offline.", buf->tid_, buf->fid_);
        send_frame(fcli->socket_, buf);
        return false;
    }
    if (tcli == nullptr && fcli == nullptr) {
        TLOGW("Both Offline.", buf->fid_, buf->tid_);
        return false;
    }
    return true;
}

void CTcpServer::accept_client()
{
    auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
    acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
        if (!error) {
            auto endpoint = socket->remote_endpoint();
            std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());

            bool can = false;
            {
                std::unique_lock<std::shared_mutex> lock(cli_mut_);
                if (client_map_.size() >= 100) {
                    TLOGI("Max client connections reached. Closing connection from {}", client_key);
                    socket->close();
                } else {
                    TLOGI("New connection from {}", client_key);
                    auto cache = std::make_shared<ClientCache>();
                    cache->socket_ = socket;
                    cache->online_time_ = OfUtil::now_time();
                    cache->timestamp = OfUtil::get_timestamp_ms();
                    cache->last_active_time_ = std::chrono::high_resolution_clock::now();
                    client_map_[client_key] = cache;
                    can = true;
                }
            }
            if (!can) {
                std::this_thread::sleep_for(std::chrono::minutes(1));
            } else {
                client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
            }
        }
        accept_client();
    });
}

void CTcpServer::th_client(const std::shared_ptr<asio::ip::tcp::socket>& socket,
                           const std::string& client_key)
{
    std::shared_ptr<int> deleter(new int(0), [&](int* p) {
        std::unique_lock<std::shared_mutex> lock(cli_mut_);
        delete p;
        client_map_.erase(client_key);
        if (client_threads_.find(client_key) != client_threads_.end()) {
            client_threads_.at(client_key).detach();
            client_threads_.erase(client_key);
        }
        TLOGW("th_client deleter client {} exit.", client_key);
    });

    try {
        std::shared_ptr<ClientCache> cache = nullptr;

        {
            std::shared_lock<std::shared_mutex> lock(cli_mut_);
            if (!client_map_.count(client_key)) {
                TLOGE("Not Find Client{} in cache.", client_key);
                return;
            }
            cache = client_map_[client_key];
        }

        while (true) {
            asio::error_code error;
            size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
            if (error == asio::error::eof) {
                break;
            } else if (error) {
                throw asio::system_error(error);
            }

            cache->buffer_.push(cache->tmp_buf_.data(), static_cast<int>(length));
            while (true) {
                auto* frame = CTransProtocal::parse(cache->buffer_);
                if (frame) {
                    if (frame->type_ == TYPE_HEARTS) {
                        std::unique_lock<std::shared_mutex> lock(cli_mut_);
                        if (client_map_.count(client_key)) {
                            auto& cli = client_map_[client_key];
                            cli->last_active_time_ = std::chrono::high_resolution_clock::now();
                        }
                        delete frame;
                        continue;
                    }
                    if (frame->type_ == TYPE_GET_ID) {
                        CMessageInfo msg_info;
                        if (!deserialize(frame->data_, frame->len_, msg_info)) {
                            TLOGE("{} GetId deserialize failed.", __LINE__);
                            delete frame;
                            continue;
                        }
                        std::unique_lock<std::shared_mutex> lock(cli_mut_);
                        if (client_map_.count(client_key)) {
                            auto& cli = client_map_[client_key];
                            cli->uuid = msg_info.uuid;
                        }
                    }
                    frame->fid_ = client_key;
                    // 直接转发,不加入缓存。
                    trans_data(frame);
                    delete frame;
                    continue;
                }
                break;
            }
        }
    } catch (std::exception& e) {
        TLOGE("Error with client {}: {}", client_key, e.what());
    }
}

bool CTcpServer::send_frame(const std::shared_ptr<asio::ip::tcp::socket>& socket, CFrameBuffer* buf)
{
    char* out_buf{};
    int out_len{};
    if (buf->fid_.empty()) {
        buf->fid_ = server_ip_;
    }

    if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
        TLOGE("{} pack failed.", __FUNCTION__);
        return false;
    }
    try {
        if (!socket->send(asio::buffer(out_buf, out_len))) {
            TLOGE("{} send failed, buf type:{}, fid:{}, tid:{}", __FUNCTION__, static_cast<int>(buf->type_),
                  buf->fid_, buf->tid_);
            delete[] out_buf;
            return false;
        }
    } catch (const std::exception& e) {
        TLOGE("send failed, type:{}, fid:{}, tid:{}, mark:{}", static_cast<int>(buf->type_), buf->fid_,
              buf->tid_, buf->mark_);
    }

    delete[] out_buf;
    return true;
}

void CTcpServer::monitor_idle()
{
    while (th_run_) {
        sleep_.sleep();
        if (!th_run_) {
            break;
        }
        std::vector<std::string> remove_vec;
        std::unique_lock<std::shared_mutex> lock(cli_mut_);
        for (auto& item : client_map_) {
            auto now = std::chrono::high_resolution_clock::now();
            auto duration =
                std::chrono::duration_cast<std::chrono::seconds>(now - item.second->last_active_time_)
                    .count();
            if (duration >= remove_after_time) {
                TLOGW("OnLine Time [{}] sec, Proactively disconnect:{}", duration, item.first);
                remove_vec.push_back(item.first);
                item.second->socket_->shutdown(asio::ip::tcp::socket::shutdown_both);
                item.second->socket_->close();
            }
        }
        for (const auto& item : remove_vec) {
            client_map_.erase(item);
        }
    }
}