base:基本逻辑完成,准备初步测试。

This commit is contained in:
taynpg 2024-12-14 16:20:25 +08:00
parent 97c3e80230
commit edbd3400a8
10 changed files with 281 additions and 86 deletions

View File

@ -23,7 +23,7 @@ add_subdirectory(util)
add_subdirectory(server)
add_subdirectory(client)
add_executable(transm_test1 test1.cpp)
target_link_libraries(transm_test1 PRIVATE trans_net trans_util)
add_executable(transm_test2 test2.cpp)
target_link_libraries(transm_test2 PRIVATE trans_net trans_util)
# add_executable(transm_test1 test1.cpp)
# target_link_libraries(transm_test1 PRIVATE trans_net trans_util)
# add_executable(transm_test2 test2.cpp)
# target_link_libraries(transm_test2 PRIVATE trans_net trans_util)

View File

@ -1,6 +1,6 @@
# transm
以一个网点作为中转传输文件。
以一个简易的使用`Server`作为中转传输文件。
# 简要说明
@ -8,46 +8,6 @@
- `client``server`下载文件,如果本地有重复则覆盖。
- 工作方式为`client A`端提交待传输的文件列表到`server``client B`端从`server`获取有哪些客户机提交的哪些任务,可以从中下载。
## 格式(开发用)
mark == 0 表示,请求下载端的数据。
通讯协议中的`DATA`部分,对于`type``1`的类型来讲,统一以下格式(`command``param`内容中不能含有`|`):
**command|param|data** (传输格式)
**command(空格)param1,param2,param3..** (cmd输入格式)
### 1.命令
`type`:1
**Get:** 获取当前挂载到服务器的任务单。
**DownTask:** 下载指定的任务清单,`param``Get`中列出的名称。
**UpTask:** 上载任务单,`param`为文件或者文件夹路径,可多个,使用`,`分隔。
### 2.数据
`type`:2
`mark``0`时表示数据的最后一包,其他数据表示非最后一包。
`type`: 199,表示询问在线客户端及挂载任务。
`type`: 198,下载清单文件。
`type`: 197,上载清单。
`type`: 196,取消上载任务。
`type`: 195,请求打开文件,返回mark值为1表示OK,为0表示失败。
`type`: 194,可以传输文件。
`type`: 193,文件数据。
`type`: 192,传输结束。
`type`:191,异常中断传输。
`type`:190,没有此清单。
mark == 1 表示,服务客户端数据。

View File

@ -1,11 +1,16 @@
#include "client.h"
#include <filesystem>
#include <iostream>
#include <of_path.h>
#include <of_str.h>
using namespace ofen;
namespace fs = std::filesystem;
constexpr int g_SendPoolNum = 1;
CClient::CClient(const std::shared_ptr<spdlog::logger>& logger) : logger_(logger)
{
client_ = std::make_shared<CTcpClient>(io_context_, logger_);
send_pool_ = std::make_shared<CThreadPool>(g_SendPoolNum);
send_pool_->init();
supported_.push_back("Get");
}
@ -47,12 +52,7 @@ void CClient::run()
continue;
}
if (cmd == "Down") {
int key = param.empty() ? -1 : std::stoi(param);
if (task_list_.count(key)) {
down_task();
} else {
logger_->error("no task number find.");
}
down_task(vec[1]);
continue;
}
if (cmd == "Up") {
@ -77,10 +77,22 @@ bool CClient::get_task_list()
return send_frame(buf.get());
}
bool CClient::down_task()
bool CClient::down_task(const std::string& param)
{
// if (send_frame(198, ))
return false;
int id = std::stoi(param);
if (!task_list_.count(id)) {
logger_->error("No matched id[{}] in task list.", id);
return false;
}
down_ = std::make_shared<TransInfomation>();
const auto& vec = task_list_[id]->files;
// 开始传输文件
for (const auto& item : vec) {
logger_->warn("Start Down => {}", item);
down_one_file(task_list_[id]->id, item);
}
return true;
}
bool CClient::up_task(const std::string& cmd)
@ -100,9 +112,9 @@ bool CClient::up_task(const std::string& cmd)
}
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_UP_LIST;
buf->data_ = new char[msg.size()];
std::memset(buf->data_, 0x0, msg.size());
buf->len_ = std::snprintf(buf->data_, msg.size(), "%s", msg.data());
buf->data_ = new char[msg.size() + 1];
std::memset(buf->data_, 0x0, msg.size() + 1);
buf->len_ = std::snprintf(buf->data_, msg.size() + 1, "%s", msg.data());
return send_frame(buf.get());
}
@ -113,6 +125,56 @@ bool CClient::cancel_task()
return send_frame(buf.get());
}
bool CClient::down_one_file(const std::string& id, const std::string& file)
{
down_->cur_remote_id_ = id;
down_->cur_remote_file_ = file;
fs::path remote_file(down_->cur_remote_file_);
down_->cur_file_ = COfPath::to_full(remote_file.filename().string());
// 请求下载文件
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_OPEN_FILE;
buf->tid_ = id;
buf->data_ = new char[file.size() + 1];
buf->len_ = std::snprintf(buf->data_, file.size() + 1, "%s", file.data());
if (!send_frame(buf.get())) {
logger_->error("{} request open file [{}] send failed.", __FUNCTION__);
down_->cur_remote_id_.clear();
down_->cur_remote_file_.clear();
return false;
}
down_->trans_state_ = TRANS_REDAY;
return true;
}
void CClient::cancel_trans_file(const std::string& key)
{
std::shared_ptr<TransInfomation> t = nullptr;
if (key.empty()) {
t = down_;
} else {
if (up_.count(key)) {
t = up_[key];
}
}
if (t == nullptr) {
return;
}
t->trans_state_ = TRANS_FAILE;
if (t->file_) {
fclose(t->file_);
if (key.empty()) {
fs::remove(t->cur_file_);
}
}
t->cur_remote_file_.clear();
t->cur_remote_id_.clear();
}
bool CClient::send_frame(CFrameBuffer* buf)
{
char* out_buf{};
@ -136,27 +198,130 @@ void CClient::handle_frame(CFrameBuffer* buf)
logger_->error("{} nullptr.", __FUNCTION__);
return;
}
// logger_->debug("type: {}", buf->type_);
// logger_->debug("len: {}", buf->len_);
if (buf->type_ == 199) {
auto t = static_cast<FrameType>(buf->type_);
switch (t) {
case TYPE_GET_LIST: {
task_list_.clear();
std::string source(buf->data_);
auto vec = COfStr::split(source, "\n");
int index = -1;
for (const auto& item : vec) {
if (item.empty()) {
continue;
}
if (item.find("[") == std::string::npos) {
logger_->info("FILE ==> {}", item);
task_list_[index]->files.push_back(item);
} else {
auto a = item.find_first_of("[") + 1;
auto b = item.find_first_of("]");
std::string str_index = item.substr(a, b - a);
index = std::stoi(str_index);
std::string backup = item;
backup.erase(0, b + 1);
auto aa = backup.find_first_of("[") + 1;
auto bb = backup.find_first_of("]");
std::string id = backup.substr(aa, bb - aa);
if (!task_list_.count(index)) {
task_list_[index] = std::make_shared<DownClientInfo>();
task_list_[index]->id = id;
}
logger_->debug("***********************************************");
logger_->debug("{}", item);
logger_->info("{}", item);
}
}
// int index = 0;
// auto vec = COfStr::split(source, "|");
// for (const auto& item : vec) {
// task_list_[index] = item;
// ++index;
// logger_->warn("{}:{}", index, item);
// }
break;
}
// (客户服务都不是指Server)如果是客户端啥也不干,服务端则开始发送数据
case TYPE_READY_TRANS: {
if (buf->mark_ != 0) {
break;
}
std::string* key = new std::string();
key->append(buf->fid_);
send_pool_->submit([&]() { send_file_data_th(key); });
break;
}
// 能接收到 TRANS 一定是客户端(这里不是指Server)
case TYPE_TRANS_FILE: {
auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_);
if (static_cast<int>(ws) != buf->len_) {
logger_->warn("no matched write and data.");
}
break;
}
case TYPE_OPEN_FILE: {
std::shared_ptr<TransInfomation> t = nullptr;
std::string id = buf->fid_;
if (buf->mark_ == 0) {
std::lock_guard<std::mutex> lock(mutex_);
up_[buf->fid_] = std::make_shared<TransInfomation>();
t = up_[buf->fid_];
t->cur_file_ = std::string(buf->data_, buf->len_);
t->file_ = fopen(t->cur_file_.c_str(), "rb");
} else {
t = down_;
t->file_ = fopen(t->cur_file_.c_str(), "wb");
}
if (t->file_ == nullptr) {
logger_->error("open file {} failed.", t->cur_file_);
cancel_trans_file(buf->mark_ == 0 ? id : "");
break;
}
std::shared_ptr<CFrameBuffer> tmp = std::make_shared<CFrameBuffer>();
tmp->type_ = TYPE_READY_TRANS;
tmp->mark_ = buf->mark_ == 0 ? 1 : 0;
tmp->tid_ = buf->fid_;
if (!send_frame(tmp.get())) {
logger_->error("TYPE_OPEN_FILE send ready failed.");
cancel_trans_file(buf->mark_ == 0 ? id : "");
}
break;
}
case TYPE_TRANS_DONE: {
fclose(down_->file_);
down_->trans_state_ = TRANS_DONE;
break;
}
default:
break;
}
}
void CClient::send_file_data_th(std::string* key)
{
std::string str_key = *key;
std::shared_ptr<TransInfomation> t = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!up_.count(str_key)) {
logger_->error("{} no matched key.", __FUNCTION__);
return;
}
t = up_[str_key];
}
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_TRANS_FILE;
buf->tid_ = str_key;
buf->data_ = new char[1024]{};
while (!feof(t->file_)) {
buf->len_ = fread(buf->data_, 1, 1024, t->file_);
if (!send_frame(buf.get())) {
logger_->error("send_file_data_th send failed.");
delete key;
return;
}
}
buf->type_ = TYPE_TRANS_DONE;
if (!send_frame(buf.get())) {
logger_->error("send_file_data_th send DONE failed.");
}
delete key;
}

View File

@ -2,8 +2,31 @@
#include <net_base.h>
#include <util.h>
#include <vector>
#include <mutex>
#include <string>
#include "file_oper.h"
#include <of_util.h>
using namespace ofen;
struct DownClientInfo {
std::vector<std::string> files;
std::string id;
};
enum TransState {
TRANS_FAILE,
TRANS_ING,
TRANS_REDAY,
TRANS_DONE
};
struct TransInfomation {
std::string cur_remote_id_;
std::string cur_remote_file_;
std::string cur_file_;
FILE* file_{};
TransState trans_state_{TRANS_FAILE};
};
class CClient
{
@ -16,20 +39,27 @@ public:
public:
bool get_task_list();
bool down_task();
bool down_task(const std::string& param);
bool up_task(const std::string& cmd);
bool cancel_task();
bool down_one_file(const std::string& id, const std::string& file);
void cancel_trans_file(const std::string& key = "");
private:
bool send_frame(CFrameBuffer* buf);
private:
void handle_frame(CFrameBuffer* buf);
void send_file_data_th(std::string* key);
private:
std::shared_ptr<spdlog::logger> logger_;
asio::io_context io_context_;
std::shared_ptr<CTcpClient> client_;
std::vector<std::string> supported_;
std::map<int, std::string> task_list_;
std::map<int, std::shared_ptr<DownClientInfo>> task_list_;
std::shared_ptr<TransInfomation> down_;
std::map<std::string, std::shared_ptr<TransInfomation>> up_;
std::mutex mutex_;
std::shared_ptr<CThreadPool> send_pool_;
};

View File

@ -39,7 +39,7 @@ bool CTcpClient::send(const char* data, int len)
{
try {
auto send_size = asio::write(socket_, asio::buffer(data, len));
//logger_->info("Need Send len: {} Real Send len: {}", len, send_size);
// logger_->info("Need Send len: {} Real Send len: {}", len, send_size);
return static_cast<int>(send_size) == len;
} catch (const std::exception& ex) {
logger_->error("Send failed: {}", ex.what());

View File

@ -27,4 +27,5 @@ private:
CMutBuffer buffer_;
std::array<char, 1024> tmp_buf_;
ExFun_t fun_;
std::string remote_key_;
};

View File

@ -40,6 +40,8 @@ bool CTcpServer::start(unsigned short port)
logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
auto bound_endpoint = acceptor_.local_endpoint();
server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port());
accept_client();
logger_->info("Server started on port {}", port);
return true;
@ -85,6 +87,8 @@ SimpleBuffer* CTcpServer::get_client_list()
for (const auto& file : files) {
msg.append("\n" + file);
}
msg.append("\n");
++index;
}
buf->data_ = new char[msg.size() + 1];
buf->len_ = static_cast<int>(msg.size() + 1);
@ -125,7 +129,7 @@ void CTcpServer::handle_frame()
FrameType t = static_cast<FrameType>(buf->type_);
switch (t) {
case TYPE_GET_LIST: {
logger_->info("GetList.");
logger_->info("[{}] GetList.", buf->fid_);
auto* sbuf = get_client_list();
if (sbuf == nullptr) {
break;
@ -136,7 +140,7 @@ void CTcpServer::handle_frame()
break;
}
case TYPE_UP_LIST: {
logger_->info("UpList. {}", std::string(buf->data_, buf->len_));
logger_->info("[{}] UpList. {}", buf->fid_, std::string(buf->data_, buf->len_));
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
@ -146,7 +150,7 @@ void CTcpServer::handle_frame()
break;
}
case TYPE_CANCEL_LIST: {
logger_->info("Cancle Task.");
logger_->info("[{}] Cancle Task.", buf->fid_);
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
@ -154,14 +158,30 @@ void CTcpServer::handle_frame()
}
break;
}
case TYPE_OPEN_FILE:
// 两边发送OPEN
case TYPE_OPEN_FILE: {
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->tid_)) {
auto& cli = client_map_[buf->tid_];
if (!send_frame(cli->socket_, buf)) {
logger_->error("[{}] turn tid_ ailed to {}", buf->fid_, buf->tid_);
}
}
if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
if (!send_frame(cli->socket_, buf)) {
logger_->error("[{}] turn fid_ failed to {}", buf->fid_, buf->tid_);
}
}
break;
};
case TYPE_READY_TRANS:
case TYPE_TRANS_FILE: {
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->tid_)) {
auto& cli = client_map_[buf->tid_];
if (!send_frame(cli->socket_, buf)) {
logger_->error("turn failed to {}", buf->tid_);
logger_->error("[{}] turn failed to {}", buf->fid_, buf->tid_);
}
}
break;
@ -279,6 +299,10 @@ bool CTcpServer::send_frame(std::shared_ptr<asio::ip::tcp::socket> socket, CFram
{
char* out_buf{};
int out_len{};
if (buf->fid_.empty()) {
buf->fid_ = server_ip_;
}
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
logger_->error("{} pack failed.", __FUNCTION__);
return false;

View File

@ -64,4 +64,5 @@ private:
std::mutex sbuf_mut_;
std::shared_ptr<CThreadPool> handle_pool_;
std::shared_ptr<CThreadPool> send_pool_;
std::string server_ip_;
};

View File

@ -29,6 +29,8 @@ CTransProtocal::~CTransProtocal()
header 2 char: 0xFF 0xFE
type 2 char:
mark 1 char:
from 32 char:
to 32 char:
len 4 char:
data xxxxx:
tail 2 char: 0xFF 0xFF
@ -45,8 +47,8 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer)
}
int16_t type = *(reinterpret_cast<const int16_t*>(buffer.get_data() + find + 2));
char mark = *(buffer.get_data() + find + 2 + 2);
int32_t len = *(reinterpret_cast<const int32_t*>(buffer.get_data() + find + 2 + 2 + 1));
int32_t tail_index = find + 2 + 2 + 1 + 4 + len;
int32_t len = *(reinterpret_cast<const int32_t*>(buffer.get_data() + find + 2 + 2 + 1 + 32 + 32));
int32_t tail_index = find + 2 + 2 + 1 + 32 + 32 + 4 + len;
if (buffer.get_len() - 2 < tail_index || len < 0) {
return result;
}
@ -58,10 +60,12 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer)
result = new CFrameBuffer();
result->data_ = new char[len];
result->len_ = len;
result->fid_ = std::string(buffer.get_data() + find + 2 + 2 + 1);
result->tid_ = std::string(buffer.get_data() + find + 2 + 2 + 1 + 32);
result->mark_ = mark;
result->type_ = static_cast<FrameType>(type);
std::memset(result->data_, 0x0, len);
std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4, len);
std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4 + 32 + 32, len);
buffer.remove_of(0, tail_index + 2);
return result;
}
@ -76,14 +80,21 @@ bool CTransProtocal::pack(CFrameBuffer* buf, char** out_buf, int& len)
}
unsigned char header[] = {0xFF, 0xFE};
unsigned char tail[] = {0xFF, 0xFF};
len = buf->len_ + 11;
*out_buf = new char[len];
len = buf->len_ + 75;
*out_buf = new char[len]{};
std::memset(*out_buf, 0x0, len);
std::memcpy(*out_buf, header, 2);
std::memcpy(*out_buf + 2, &buf->type_, 2);
std::memcpy(*out_buf + 2 + 2, &buf->mark_, 1);
std::memcpy(*out_buf + 2 + 2 + 1, &buf->len_, 4);
if (!buf->fid_.empty()) {
std::memcpy(*out_buf + 2 + 2 + 1, buf->fid_.data(), buf->fid_.size());
}
if (!buf->tid_.empty()) {
std::memcpy(*out_buf + 2 + 2 + 1 + 32, buf->tid_.data(), buf->tid_.size());
}
std::memcpy(*out_buf + 2 + 2 + 1 + 32 + 32, &buf->len_, 4);
if (buf->data_ != nullptr) {
std::memcpy(*out_buf + 2 + 2 + 1 + 4, buf->data_, buf->len_);
std::memcpy(*out_buf + 2 + 2 + 1 + 32 + 32 + 4, buf->data_, buf->len_);
}
std::memcpy(*out_buf + len - 2, tail, 2);
return true;

View File

@ -14,6 +14,7 @@ enum FrameType : int16_t {
TYPE_CANCEL_LIST,
TYPE_OPEN_FILE,
TYPE_TRANS_FILE,
TYPE_TRANS_DONE,
TYPE_READY_TRANS,
TYPE_INTERRUPT,
TYPE_NO_HIT_TASK,
@ -55,6 +56,8 @@ using ExFun_t = std::function<void(CFrameBuffer* buf)>;
header 2 char: 0xFF 0xFE
type 2 char:
mark 1 char:
from 32 char:
to 32 char:
len 4 char:
data xxxxx:
tail 2 char: 0xFF 0xFF