#include "server.h"

CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
    : io_context_(io_context), logger_(logger), acceptor_(io_context)
{
    th_run_ = true;
    handle_pool_ = std::make_shared<CThreadPool>(1);
    send_pool_ = std::make_shared<CThreadPool>(1);
    handle_pool_->init();
    send_pool_->init();
    handle_pool_->submit([&]() { handle_frame(); });
    send_pool_->submit([&]() { send_simple_buf(); });
}
CTcpServer::~CTcpServer()
{
    th_run_ = false;
    handle_pool_->close_wait_all();
    send_pool_->close_wait_all();
}

bool CTcpServer::start(unsigned short port)
{
    asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
    try {
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(asio::socket_base::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
    } catch (const asio::system_error& e) {
        logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
        return false;
    }
    accept_client();
    logger_->info("Server started on port {}", port);
    return true;
}

void CTcpServer::stop()
{
    acceptor_.close();
    std::lock_guard<std::mutex> lock(cli_mut_);
    for (auto& [key, thread] : client_threads_) {
        if (thread.joinable()) {
            thread.join();
        }
    }
    client_threads_.clear();
}

std::vector<std::string> CTcpServer::get_clients()
{
    std::vector<std::string> result;
    std::lock_guard<std::mutex> lock(cli_mut_);
    for (const auto& item : client_map_) {
        result.push_back(item.first);
    }
    return result;
}

SimpleBuffer* CTcpServer::get_client_list()
{
    CFrameBuffer* buf = new CFrameBuffer();
    buf->type_ = 199;

    auto vec = get_clients();
    std::string msg;
    for (const auto& item : vec) {
        if (msg.empty()) {
            msg.append(item);
        } else {
            msg.append("|" + item);
        }
    }
    buf->data_ = new char[msg.size() + 1];
    buf->len_ = static_cast<int>(msg.size() + 1);
    std::snprintf(buf->data_, buf->len_, "%s", msg.data());

    SimpleBuffer* sbuf = new SimpleBuffer();
    if (!CTransProtocal::pack(buf, &sbuf->data_, sbuf->len_)) {
        logger_->error("{} pack faile.", __FUNCTION__);
        delete sbuf;
        return nullptr;
    }
    return sbuf;
}

bool CTcpServer::push_frame(CFrameBuffer* buf)
{
    std::lock_guard<std::mutex> lock(buf_mut_);
    cache_.push(buf);
    return true;
}

void CTcpServer::handle_frame()
{
    CFrameBuffer* buf = nullptr;
    while (th_run_) {
        {
            std::lock_guard<std::mutex> lock(buf_mut_);
            if (cache_.size() > 0) {
                buf = cache_.front();
                cache_.pop();
            }
        }
        if (!buf) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            continue;
        }

        // 拿到该包后,要看转发给谁或者处理
        if (buf->type_ == 199) {   // 询问在线客户端
            auto* sbuf = get_client_list();
            if (sbuf == nullptr) {
                continue;
            }
            sbuf->id_ = buf->id_;
            std::lock_guard<std::mutex> lock(sbuf_mut_);
            scache_.push(sbuf);
            continue;
        }
        delete buf;
    }
}

void CTcpServer::send_simple_buf()
{
    SimpleBuffer* buf = nullptr;
    while (th_run_) {
        {
            std::lock_guard<std::mutex> slock(sbuf_mut_);
            if (scache_.size() > 0) {
                buf = scache_.front();
                scache_.pop();
            }
        }
        if (!buf) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            continue;
        }

        std::shared_ptr<asio::ip::tcp::socket> socket = nullptr;
        {
            std::lock_guard<std::mutex> clock(cli_mut_);
            if (!client_map_.count(buf->id_)) {
                logger_->warn("{} abandon {}'s data.", __FUNCTION__, buf->id_);
                delete buf;
                continue;
            }
            socket = client_map_[buf->id_]->socket_;
        }
        socket->send(asio::buffer(buf->data_, buf->len_));
        delete buf;
    }
}

void CTcpServer::accept_client()
{
    auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
    acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
        if (!error) {
            auto endpoint = socket->remote_endpoint();
            std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
            logger_->info("New connection from {}", client_key);

            {
                std::lock_guard<std::mutex> lock(cli_mut_);
                auto cache = std::make_shared<ClientCache>();
                cache->socket_ = socket;
                client_map_[client_key] = cache;
            }

            client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
        }
        accept_client();
    });
}

void CTcpServer::th_client(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key)
{
    std::shared_ptr<int> deleter(new int(0), [&](int* p) {
        std::lock_guard<std::mutex> lock(cli_mut_);
        delete p;
        client_map_.erase(client_key);
        if (client_threads_.find(client_key) != client_threads_.end()) {
            client_threads_.at(client_key).detach();
            client_threads_.erase(client_key);
        }
        logger_->warn("{} client {} exit.", __FUNCTION__, client_key);
    });

    try {
        std::shared_ptr<ClientCache> cache = nullptr;

        {
            std::lock_guard<std::mutex> lock(cli_mut_);
            if (!client_map_.count(client_key)) {
                logger_->error("Not Find Client{} in cache.", client_key);
                return;
            }
            cache = client_map_[client_key];
        }

        while (true) {
            asio::error_code error;
            size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
            if (error == asio::error::eof) {
                logger_->info("Connection closed by client: {}", client_key);
                break;
            } else if (error) {
                throw asio::system_error(error);
            }

            cache->buffer_.push(cache->tmp_buf_.data(), length);
            auto* frame = CTransProtocal::parse(cache->buffer_);
            if (frame) {
                frame->id_ = client_key;
                push_frame(frame);
            }
        }
    } catch (std::exception& e) {
        logger_->error("Error with client {}: {}", client_key, e.what());
    }
}