server:结构更改。

This commit is contained in:
taynpg 2024-12-12 23:11:55 +08:00
parent 8298f05a86
commit 3309980938
6 changed files with 123 additions and 136 deletions

View File

@ -1,91 +1,5 @@
#include "net_base.h" #include "net_base.h"
CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: io_context_(io_context), logger_(logger), acceptor_(io_context)
{
}
CTcpServer::~CTcpServer()
{
}
bool CTcpServer::Start(unsigned short port)
{
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
Accept();
logger_->info("Server started on port {}", port);
return true;
}
void CTcpServer::Stop()
{
acceptor_.close();
std::lock_guard<std::mutex> lock(mutex_);
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
void CTcpServer::Accept()
{
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
logger_->info("New connection from {}", client_key);
{
std::lock_guard<std::mutex> lock(mutex_);
client_map_[client_key] = 0; // Initial value as 0
}
client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key);
}
Accept();
});
}
void CTcpServer::HandleClient(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key)
{
try {
char data[1024];
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(data), error);
if (error == asio::error::eof) {
logger_->info("Connection closed by client: {}", client_key);
break;
} else if (error) {
throw asio::system_error(error);
}
auto relen = socket->send(asio::buffer(data, length));
logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen);
}
} catch (std::exception& e) {
logger_->error("Error with client {}: {}", client_key, e.what());
}
std::lock_guard<std::mutex> lock(mutex_);
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
}
CTcpClient::CTcpClient(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger) CTcpClient::CTcpClient(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: logger_(logger), io_context_(io_context), socket_(io_context_) : logger_(logger), io_context_(io_context), socket_(io_context_)
{ {
@ -142,7 +56,7 @@ void CTcpClient::async_recv()
{ {
auto self(shared_from_this()); auto self(shared_from_this());
socket_.async_read_some(asio::buffer(tmp_buf_), [this, self](std::error_code ec, std::size_t length) { socket_.async_read_some(asio::buffer(tmp_buf_), [this, self](std::error_code ec, std::size_t length) {
logger_->debug("{} {}", __FUNCTION__, ec.message()); // logger_->debug("{} {}", __FUNCTION__, ec.message());
if (!ec) { if (!ec) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
buffer_.push(tmp_buf_.data(), length); buffer_.push(tmp_buf_.data(), length);

View File

@ -2,35 +2,10 @@
#include "util.h" #include "util.h"
#include <asio.hpp> #include <asio.hpp>
#include <functional>
#include <mutex> #include <mutex>
#include <of_util.h> #include <of_util.h>
using namespace ofen; using namespace ofen;
using ExFun_t = std::function<void(CFrameBuffer* buf)>;
class CTcpServer
{
public:
CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger);
~CTcpServer();
public:
bool Start(unsigned short port);
void Stop();
private:
void Accept();
void HandleClient(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key);
private:
asio::io_context& io_context_;
asio::ip::tcp::acceptor acceptor_;
std::shared_ptr<spdlog::logger> logger_;
std::map<std::string, int> client_map_;
std::map<std::string, std::thread> client_threads_;
std::mutex mutex_;
};
class CTcpClient : public std::enable_shared_from_this<CTcpClient> class CTcpClient : public std::enable_shared_from_this<CTcpClient>
{ {
public: public:

View File

@ -5,7 +5,11 @@ std::shared_ptr<spdlog::logger> g_Logger = nullptr;
int main() int main()
{ {
g_Logger = get_logger("server", "server.log"); g_Logger = get_logger("server", "server.log");
CServer server(g_Logger); asio::io_context io_context;
server.run(); CTcpServer server(io_context, g_Logger);
if (!server.Start(8080)) {
return -1;
}
io_context.run();
return 0; return 0;
} }

View File

@ -1,24 +1,103 @@
#include "server.h" #include "server.h"
CServer::CServer(const std::shared_ptr<spdlog::logger>& logger) CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: logger_(logger) : io_context_(io_context), logger_(logger), acceptor_(io_context)
{ {
server_ = std::make_shared<CTcpServer>(io_context_, logger);
} }
CTcpServer::~CTcpServer()
CServer::~CServer()
{ {
} }
void CServer::run() bool CTcpServer::Start(unsigned short port)
{ {
if (!server_->Start(8080)) { asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
Accept();
logger_->info("Server started on port {}", port);
return true;
}
void CTcpServer::Stop()
{
acceptor_.close();
std::lock_guard<std::mutex> lock(mutex_);
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
void CTcpServer::handle_data(CFrameBuffer* buf)
{
if (buf == nullptr) {
logger_->error("{} nullptr.", __FUNCTION__);
return; return;
} }
io_context_.run();
} }
bool CServer::get_task_list() void CTcpServer::Accept()
{ {
return false; auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
logger_->info("New connection from {}", client_key);
{
std::lock_guard<std::mutex> lock(mutex_);
client_map_[client_key] = std::make_shared<ClientCache>();
}
client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key);
}
Accept();
});
}
void CTcpServer::HandleClient(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key)
{
try {
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(client_map_[client_key]->tmp_buf_), error);
if (error == asio::error::eof) {
logger_->info("Connection closed by client: {}", client_key);
break;
} else if (error) {
throw asio::system_error(error);
}
client_map_[client_key]->buffer_.push(client_map_[client_key]->tmp_buf_.data(), length);
auto* frame = CTransProtocal::parse(client_map_[client_key]->buffer_);
frame->id_ = client_key;
if (frame) {
handle_data(frame);
delete frame;
}
// auto relen = socket->send(asio::buffer(data, length));
// logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen);
}
} catch (std::exception& e) {
logger_->error("Error with client {}: {}", client_key, e.what());
}
std::lock_guard<std::mutex> lock(mutex_);
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
} }

View File

@ -1,24 +1,36 @@
#pragma once #pragma once
#include <net_base.h> #include <net_base.h>
#include <string>
#include <util.h> #include <util.h>
#include <vector> #include <vector>
#include <string>
class CServer struct ClientCache {
CMutBuffer buffer_;
std::array<char, 1024> tmp_buf_;
};
class CTcpServer
{ {
public: public:
CServer(const std::shared_ptr<spdlog::logger>& logger); CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger);
~CServer(); ~CTcpServer();
public: public:
void run(); bool Start(unsigned short port);
void Stop();
public:
bool get_task_list();
private: private:
void handle_data(CFrameBuffer* buf);
private:
void Accept();
void HandleClient(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key);
private:
asio::io_context& io_context_;
asio::ip::tcp::acceptor acceptor_;
std::shared_ptr<spdlog::logger> logger_; std::shared_ptr<spdlog::logger> logger_;
asio::io_context io_context_; std::map<std::string, std::shared_ptr<ClientCache>> client_map_;
std::shared_ptr<CTcpServer> server_; std::map<std::string, std::thread> client_threads_;
std::vector<std::string> supported_; std::mutex mutex_;
}; };

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "of_util.h" #include "of_util.h"
#include <cstdint> #include <cstdint>
#include <functional>
#include <spdlog/sinks/rotating_file_sink.h> #include <spdlog/sinks/rotating_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h> #include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@ -14,6 +15,8 @@ class CFrameBuffer
public: public:
CFrameBuffer(); CFrameBuffer();
~CFrameBuffer(); ~CFrameBuffer();
public:
std::string id_{};
public: public:
int16_t type_{}; int16_t type_{};
@ -21,7 +24,7 @@ public:
int len_{}; int len_{};
char mark_{}; char mark_{};
}; };
using ExFun_t = std::function<void(CFrameBuffer* buf)>;
/* /*
transm TCP transm TCP
header 2 char: 0xFF 0xFE header 2 char: 0xFF 0xFE