fix:修正数据传输部分。

This commit is contained in:
taynpg 2025-04-08 21:13:02 +08:00
parent 334df57b63
commit 29d721ab8a
4 changed files with 41 additions and 29 deletions

View File

@ -15,7 +15,7 @@ namespace fs = boost::filesystem;
namespace fs = std::filesystem;
#endif
CClient::CClient()
CClient::CClient() : msg_info_("")
{
client_ = std::make_shared<CTcpClient>(io_context_);
sleep_.set_timeout(5000);
@ -507,7 +507,7 @@ bool CClient::down_one_file(int remote_id, const std::string& file, const std::s
TLOGW("down one file success, total:[{}/{}]", cur_down_size_, cur_file_size_);
return true;
} else {
TLOGW("down one file {} failed.", down_->cur_file_);
TLOGW("down one file {} failed, size not matched.", down_->cur_file_);
if (!down_->file_.is_open()) {
down_->file_.close();
fs::remove(down_->cur_file_);
@ -1034,12 +1034,17 @@ void CClient::handle_frame(CFrameBuffer* buf)
downloading_ = true;
}
if (will_receive_) {
down_->file_.write(buf->data_, buf->len_);
msg_info_.id = buf->fid_;
if (!deserialize(buf->data_, buf->len_, msg_info_)) {
TLOGE("{} TRANS deserialize failed.", __LINE__);
break;
}
down_->file_.write(msg_info_.data.c_str(), msg_info_.data.size());
if (down_->file_.fail()) {
report_trans_ret(TRANS_FAILED);
TLOGW("no matched write and data. {}", buf->len_);
TLOGW("no matched write and data. {}", msg_info_.data.size());
}
cur_down_size_ += buf->len_;
cur_down_size_ += msg_info_.data.size();
}
break;
}
@ -1242,10 +1247,14 @@ void CClient::handle_frame(CFrameBuffer* buf)
break;
}
case TYPE_FILE_INFO: {
std::string str_size(buf->data_, buf->len_);
auto vec = COfStr::split(str_size, ",");
CMessageInfo msg_info(buf->fid_);
if (!deserialize(buf->data_, buf->len_, msg_info)) {
TLOGE("{} TRANS deserialize failed.", __LINE__);
break;
}
auto vec = COfStr::split(msg_info.str, ",");
if (vec.size() < 3) {
TLOGE("invalid file information:{}", str_size);
TLOGE("invalid file information:{}", msg_info.str);
break;
}
long long size = std::stoll(vec[1]);
@ -1280,8 +1289,8 @@ void CClient::send_file_data_th(const char* keys)
TLOGI("Start Trans File {} To {}", t->cur_file_, str_key);
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->data_ = new char[g_BuffSize]{};
buf->tid_ = str_key;
CMessageInfo msg_info(own_id_);
// ********************************************************
// TYPE_FILE_INFO格式:平台([0,win], [1,unix]),大小,权限
@ -1307,7 +1316,8 @@ void CClient::send_file_data_th(const char* keys)
std::string info_result = plat + "," + str_size + "," + str_perm;
TLOGI("To {} File Size: {} [{}], permissions:{}", str_key, ofen::OfUtil::get_file_size(size), size,
str_perm);
buf->len_ = std::snprintf(buf->data_, g_BuffSize, "%s", info_result.c_str());
msg_info.str = info_result;
serialize(msg_info, &buf->data_, buf->len_);
if (!send_frame(buf.get())) {
report_trans_ret(TRANS_FAILED, str_key);
TLOGE("Stop Trans {} To {} failed.", t->cur_file_, str_key);
@ -1316,20 +1326,25 @@ void CClient::send_file_data_th(const char* keys)
buf->type_ = TYPE_TRANS_FILE;
buf->mark_ = 1;
msg_info.data.resize(g_BuffSize);
long long send_size = 0;
long long cur_send_size = 0;
while (!t->file_.eof()) {
if (t->trans_state_ == TRANS_BREAK) {
TLOGW("Stop Trans {} To {} failed.", t->cur_file_, str_key);
report_trans_ret(TRANS_FAILED, str_key);
return;
}
t->file_.read(buf->data_, g_BuffSize);
buf->len_ = t->file_.gcount();
t->file_.read(&msg_info.data[0], g_BuffSize);
cur_send_size = t->file_.gcount();
msg_info.data.resize(cur_send_size);
send_size += cur_send_size;
serialize(msg_info, &buf->data_, buf->len_);
if (!send_frame(buf.get())) {
report_trans_ret(TRANS_FAILED, str_key);
TLOGE("Stop Trans {} To {} failed.", t->cur_file_, str_key);
return;
}
// std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
buf->type_ = TYPE_TRANS_DONE;
@ -1337,7 +1352,7 @@ void CClient::send_file_data_th(const char* keys)
TLOGE("send_file_data_th send DONE failed.");
}
report_trans_ret(TRANS_DONE, str_key);
TLOGD("Trans File {} To {} Done !!!", t->cur_file_, str_key);
TLOGD("Trans File {} To {} Done !!!, {}", t->cur_file_, str_key, send_size);
}
void CClient::hearts()

View File

@ -90,6 +90,7 @@ private:
bool will_receive_{false};
bool downloading_{false};
asio::io_context io_context_;
CMessageInfo msg_info_;
std::shared_ptr<CTcpClient> client_;
std::map<int, std::shared_ptr<DownClientInfo>> clients_;
std::shared_ptr<TransInfomation> down_;

View File

@ -133,16 +133,15 @@ CFrameBuffer::~CFrameBuffer()
len_ = 0;
}
void serialize(const CMessageInfo& msg_info, char** out_buf, int& len)
void serialize(CMessageInfo& msg_info, char** out_buf, int& len)
{
CMessageInfo info(msg_info);
auto& info = msg_info;
info.id = localtou8(info.id);
info.uuid = localtou8(info.uuid);
info.str = localtou8(info.str);
info.o = localtou8(info.o);
// 计算总长度
len = sizeof(int) * 4 + info.id.size() + info.uuid.size() + info.str.size() + info.o.size() + kz;
len = sizeof(int) * 4 + info.id.size() + info.uuid.size() + info.str.size() + info.data.size() + kz;
*out_buf = new char[len]; // 分配内存(调用方负责释放)
char* ptr = *out_buf + kz;
@ -169,10 +168,10 @@ void serialize(const CMessageInfo& msg_info, char** out_buf, int& len)
ptr += str_size;
// 序列化 o
int o_size = static_cast<int>(info.o.size());
int o_size = static_cast<int>(info.data.size());
memcpy(ptr, &o_size, sizeof(int));
ptr += sizeof(int);
memcpy(ptr, info.o.data(), o_size);
memcpy(ptr, info.data.data(), o_size);
uint8_t ik[32]{};
hash(msg_info.id.c_str(), ik);
@ -193,7 +192,7 @@ bool deserialize(char* data, int len, CMessageInfo& msg_info)
return false;
}
CMessageInfo info(msg_info.id);
auto& info = msg_info;
char* ptr = data + kz;
int remaining = len;
@ -259,14 +258,11 @@ bool deserialize(char* data, int len, CMessageInfo& msg_info)
if (remaining < o_size) {
return false;
}
info.o.assign(ptr, o_size);
info.data.assign(ptr, o_size);
info.id = u8tolocal(info.id);
info.uuid = u8tolocal(info.uuid);
info.str = u8tolocal(info.str);
info.o = u8tolocal(info.o);
msg_info = info;
return true;
}
@ -349,7 +345,7 @@ CMessageInfo::CMessageInfo(const CMessageInfo& info)
id = info.id;
uuid = info.uuid;
str = info.str;
o = info.o;
data.assign(info.data.begin(), info.data.end());
}
CMessageInfo& CMessageInfo::operator=(const CMessageInfo& info)
@ -360,6 +356,6 @@ CMessageInfo& CMessageInfo::operator=(const CMessageInfo& info)
id = info.id;
uuid = info.uuid;
str = info.str;
o = info.o;
data.assign(info.data.begin(), info.data.end());
return *this;
}

View File

@ -50,10 +50,10 @@ struct CMessageInfo {
std::string id;
std::string uuid;
std::string str;
std::string o;
std::string data;
};
void serialize(const CMessageInfo& msg_info, char** out_buf, int& len);
void serialize(CMessageInfo& msg_info, char** out_buf, int& len);
bool deserialize(char* data, int len, CMessageInfo& msg_info);
std::string u8tolocal(const std::string& str);
std::string localtou8(const std::string& str);