remove:去除冗余。

This commit is contained in:
taynpg 2024-12-14 19:49:44 +08:00
parent 7250279ce0
commit a104868c47
4 changed files with 44 additions and 85 deletions

View File

@ -203,23 +203,24 @@ void CClient::handle_frame(CFrameBuffer* buf)
switch (t) { switch (t) {
case TYPE_GET_LIST: { case TYPE_GET_LIST: {
task_list_.clear(); task_list_.clear();
std::string source(buf->data_); std::string source(buf->data_, buf->len_);
auto vec = COfStr::split(source, "\n"); auto vec = COfStr::split(source, "\n");
int index = -1; int index = -1;
for (const auto& item : vec) { for (const auto& item : vec) {
if (item.empty()) { std::string real = COfStr::trim(item);
if (real.empty()) {
continue; continue;
} }
if (item.find("[") == std::string::npos) { if (real.find("[") == std::string::npos) {
logger_->info("FILE ==> {}", item); logger_->info("FILE ==> {}", real);
task_list_[index]->files.push_back(item); task_list_[index]->files.push_back(real);
} else { } else {
auto a = item.find_first_of("[") + 1; auto a = real.find_first_of("[") + 1;
auto b = item.find_first_of("]"); auto b = real.find_first_of("]");
std::string str_index = item.substr(a, b - a); std::string str_index = real.substr(a, b - a);
index = std::stoi(str_index); index = std::stoi(str_index);
std::string backup = item; std::string backup = real;
backup.erase(0, b + 1); backup.erase(0, b + 1);
auto aa = backup.find_first_of("[") + 1; auto aa = backup.find_first_of("[") + 1;
auto bb = backup.find_first_of("]"); auto bb = backup.find_first_of("]");
@ -231,7 +232,7 @@ void CClient::handle_frame(CFrameBuffer* buf)
} }
logger_->debug("***********************************************"); logger_->debug("***********************************************");
logger_->info("{}", item); logger_->info("{}", real);
} }
} }
break; break;

View File

@ -1,9 +1,17 @@
#include <iostream>
#include "server.h" #include "server.h"
#include <iostream>
// #ifdef _WIN32
// #define _CRTDBG_MAP_ALLOC
// #include <crtdbg.h>
// #endif
std::shared_ptr<spdlog::logger> g_Logger = nullptr; std::shared_ptr<spdlog::logger> g_Logger = nullptr;
int main() int main()
{ {
// #ifdef _WIN32
// _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
// #endif
g_Logger = get_logger("server", "server.log"); g_Logger = get_logger("server", "server.log");
asio::io_context io_context; asio::io_context io_context;
CTcpServer server(io_context, g_Logger); CTcpServer server(io_context, g_Logger);
@ -11,5 +19,8 @@ int main()
return -1; return -1;
} }
io_context.run(); io_context.run();
// #ifdef _WIN32
// _CrtDumpMemoryLeaks();
// #endif
return 0; return 0;
} }

View File

@ -4,28 +4,20 @@
using namespace ofen; using namespace ofen;
constexpr int g_ParseThreadNum = 1; constexpr int g_ParseThreadNum = 1;
constexpr int g_SendThreadNum = 1;
CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger) CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: io_context_(io_context), logger_(logger), acceptor_(io_context) : io_context_(io_context), logger_(logger), acceptor_(io_context)
{ {
th_run_ = true; th_run_ = true;
handle_pool_ = std::make_shared<CThreadPool>(g_ParseThreadNum); handle_pool_ = std::make_shared<CThreadPool>(g_ParseThreadNum);
send_pool_ = std::make_shared<CThreadPool>(g_SendThreadNum);
handle_pool_->init(); handle_pool_->init();
send_pool_->init();
for (int i = 0; i < g_ParseThreadNum; ++i) { for (int i = 0; i < g_ParseThreadNum; ++i) {
handle_pool_->submit([&]() { handle_frame(); }); handle_pool_->submit([&]() { handle_frame(); });
} }
for (int i = 0; i < g_ParseThreadNum; ++i) {
send_pool_->submit([&]() { send_simple_buf(); });
}
} }
CTcpServer::~CTcpServer() CTcpServer::~CTcpServer()
{ {
th_run_ = false; th_run_ = false;
handle_pool_->close_wait_all(); handle_pool_->close_wait_all();
send_pool_->close_wait_all();
} }
bool CTcpServer::start(unsigned short port) bool CTcpServer::start(unsigned short port)
@ -73,11 +65,9 @@ std::vector<TaskList> CTcpServer::get_clients()
return result; return result;
} }
SimpleBuffer* CTcpServer::get_client_list() void CTcpServer::get_client_list(CFrameBuffer** buf)
{ {
CFrameBuffer* buf = new CFrameBuffer(); CFrameBuffer* tbuf = *buf;
buf->type_ = TYPE_GET_LIST;
auto vec = get_clients(); auto vec = get_clients();
std::string msg; std::string msg;
int index = 1; int index = 1;
@ -90,17 +80,9 @@ SimpleBuffer* CTcpServer::get_client_list()
msg.append("\n"); msg.append("\n");
++index; ++index;
} }
buf->data_ = new char[msg.size() + 1]; tbuf->data_ = new char[msg.size() + 1];
buf->len_ = static_cast<int>(msg.size() + 1); std::memset(tbuf->data_, 0x0, msg.size() + 1);
std::snprintf(buf->data_, buf->len_, "%s", msg.data()); tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data());
SimpleBuffer* sbuf = new SimpleBuffer();
if (!CTransProtocal::pack(buf, &sbuf->data_, sbuf->len_)) {
logger_->error("{} pack faile.", __FUNCTION__);
delete sbuf;
return nullptr;
}
return sbuf;
} }
bool CTcpServer::push_frame(CFrameBuffer* buf) bool CTcpServer::push_frame(CFrameBuffer* buf)
@ -116,7 +98,7 @@ void CTcpServer::handle_frame()
while (th_run_) { while (th_run_) {
{ {
std::lock_guard<std::mutex> lock(buf_mut_); std::lock_guard<std::mutex> lock(buf_mut_);
if (cache_.size() > 0) { if (!cache_.empty()) {
buf = cache_.front(); buf = cache_.front();
cache_.pop(); cache_.pop();
} }
@ -130,13 +112,14 @@ void CTcpServer::handle_frame()
switch (t) { switch (t) {
case TYPE_GET_LIST: { case TYPE_GET_LIST: {
logger_->info("[{}] GetList.", buf->fid_); logger_->info("[{}] GetList.", buf->fid_);
auto* sbuf = get_client_list(); get_client_list(&buf);
if (sbuf == nullptr) { std::lock_guard<std::mutex> lock(cli_mut_);
break; if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
if (!send_frame(cli->socket_, buf)) {
logger_->error("GetList send failed.");
}
} }
sbuf->id_ = buf->fid_;
std::lock_guard<std::mutex> lock(sbuf_mut_);
scache_.push(sbuf);
break; break;
} }
case TYPE_UP_LIST: { case TYPE_UP_LIST: {
@ -197,38 +180,6 @@ void CTcpServer::handle_frame()
} }
} }
void CTcpServer::send_simple_buf()
{
SimpleBuffer* buf = nullptr;
while (th_run_) {
{
std::lock_guard<std::mutex> slock(sbuf_mut_);
if (scache_.size() > 0) {
buf = scache_.front();
scache_.pop();
}
}
if (!buf) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
std::shared_ptr<asio::ip::tcp::socket> socket = nullptr;
{
std::lock_guard<std::mutex> clock(cli_mut_);
if (!client_map_.count(buf->id_)) {
logger_->warn("{} abandon {}'s data.", __FUNCTION__, buf->id_);
delete buf;
continue;
}
socket = client_map_[buf->id_]->socket_;
}
socket->send(asio::buffer(buf->data_, buf->len_));
delete buf;
buf = nullptr;
}
}
void CTcpServer::accept_client() void CTcpServer::accept_client()
{ {
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_); auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);

View File

@ -9,16 +9,16 @@
using namespace ofen; using namespace ofen;
struct ClientCache { struct ClientCache {
std::shared_ptr<asio::ip::tcp::socket> socket_; std::shared_ptr<asio::ip::tcp::socket> socket_;
CMutBuffer buffer_; CMutBuffer buffer_{};
std::array<char, 1024> tmp_buf_; std::array<char, 1024> tmp_buf_{};
std::string task_; std::string task_{};
std::string time_; std::string time_{};
FrameType cur_type_{TYPE_DEFAULT}; FrameType cur_type_{TYPE_DEFAULT};
}; };
struct TaskList { struct TaskList {
std::string id_; std::string id_{};
std::string task_; std::string task_{};
std::string time_; std::string time_{};
}; };
class CTcpServer class CTcpServer
@ -33,12 +33,11 @@ public:
private: private:
std::vector<TaskList> get_clients(); std::vector<TaskList> get_clients();
SimpleBuffer* get_client_list(); void get_client_list(CFrameBuffer** buf);
private: private:
bool push_frame(CFrameBuffer* buf); bool push_frame(CFrameBuffer* buf);
void handle_frame(); void handle_frame();
void send_simple_buf();
private: private:
void accept_client(); void accept_client();
@ -59,10 +58,7 @@ private:
std::map<std::string, std::thread> client_threads_; std::map<std::string, std::thread> client_threads_;
std::mutex cli_mut_; std::mutex cli_mut_;
std::queue<CFrameBuffer*> cache_; std::queue<CFrameBuffer*> cache_;
std::queue<SimpleBuffer*> scache_;
std::mutex buf_mut_; std::mutex buf_mut_;
std::mutex sbuf_mut_;
std::shared_ptr<CThreadPool> handle_pool_; std::shared_ptr<CThreadPool> handle_pool_;
std::shared_ptr<CThreadPool> send_pool_;
std::string server_ip_; std::string server_ip_;
}; };