#include "client.h" #include #include #include #include #include #include #ifdef USE_BOOST #include namespace fs = boost::filesystem; #else #include namespace fs = std::filesystem; #endif CClient::CClient() { client_ = std::make_shared(io_context_); sleep_.set_timeout(5000); } CClient::~CClient() { th_run_ = false; sleep_.contiune(); if (down_ && down_->file_.is_open()) { down_->file_.close(); } std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->file_.is_open()) { item.second->file_.close(); } } for (auto& item : ths_) { if (item.joinable()) { item.join(); } } if (update_list_th_.joinable()) { update_list_th_.join(); } if (th_down_active_.joinable()) { th_down_active_.join(); } if (hearts_.joinable()) { hearts_.join(); } } void CClient::print_help(bool detail) { TLOGI("version: {}", VERSION_NUM); TLOGI("opensource: {}", VERSION_URL); TLOGW("SupportCmd ==>"); if (!detail) { TLOGW("Get|Who|Where|Ls|Sub|Fetch|Up|Down|UpTask|DownTask"); TLOGI("You can use 'h' to show cmd's detail."); TLOGI("You can use 'end' or 'ctrl-c' to exit."); return; } constexpr char* sp = "=================================================="; TLOGI("{}", sp); TLOGW("#Get#"); TLOGI(" des: Get all clients on server."); TLOGI(" cmd: Get get g G"); TLOGI(" arg: NULL"); TLOGI("{}", sp); TLOGW("#Who#"); TLOGI(" des: Show current client's ID."); TLOGI(" cmd: Who who"); TLOGI(" arg: NULL"); TLOGI("{}", sp); TLOGW("#Where#"); TLOGI(" des: Show current client's work dir."); TLOGI(" cmd: Where where wh"); TLOGI(" arg: NULL"); TLOGI("{}", sp); TLOGW("#Ls#"); TLOGI(" des: List one client's all files and folders."); TLOGI(" cmd: Ls ls"); TLOGI(" arg: @id @remotedir"); TLOGI("{}", sp); TLOGW("#Sub#"); TLOGI(" des: Submit local file's list on the server."); TLOGI(" cmd: Sub sub"); TLOGI(" arg: @files(Separated by | if multi)"); TLOGI("{}", sp); TLOGW("#Fetch#"); TLOGI(" des: Fetch one client's submited files."); TLOGI(" cmd: Fetch fetch"); TLOGI(" arg: @id @savedir"); TLOGI("{}", sp); TLOGW("#Clear#"); TLOGI(" des: clear submited list."); TLOGI(" cmd: Clear clear c C"); TLOGI(" arg: NULL"); TLOGI("{}", sp); TLOGW("#Up#"); TLOGI(" des: Upload local files to one client directly."); TLOGI(" cmd: Up up"); TLOGI(" arg: @id @files(Sep by | if multi) @savedir"); TLOGI("{}", sp); TLOGW("#Down#"); TLOGI(" des: Down one client's files directly."); TLOGI(" cmd: Down down"); TLOGI(" arg: @id @files(Sep by | if multi) @savedir(opt)"); TLOGI("{}", sp); TLOGW("#UpTask#"); TLOGI(" des: Upload local files to one client by taskfile."); TLOGI(" cmd: UpTask uptask ut"); TLOGI(" arg: @id @taskfile(UTF-8)"); TLOGI("{}", sp); TLOGW("#DownTask#"); TLOGI(" des: Down one client's files by taskfile."); TLOGI(" cmd: DownTask downtask dt"); TLOGI(" arg: @id @taskfile(UTF-8)"); TLOGI("{}", sp); } void CClient::run(const std::string& ip, const std::string& port, const std::string& config_dir) { fs::path fp(config_dir); config_path_ = fp.append("history.txt").string(); fs::path fp2(config_dir); uuid_path_ = fp2.append("uuid.txt").string(); save_uuid(); uuid_ = read_uuid(); if (uuid_.empty()) { TLOGE("uuid is empty!"); return; } auto his = load_line_his(); for (const auto& cc : his) { fc_add_his(cc.c_str()); } th_run_ = true; if (!client_->connect(ip, port)) { TLOGI("{} connect err.", __FUNCTION__); return; } client_->register_func([&](CFrameBuffer* buf) { handle_frame(buf); }); client_->async_recv(); hearts_ = std::thread([&]() { hearts(); }); std::thread thread([&]() { io_context_.run(); }); th_down_active_ = std::thread([&]() { judget_down_active(); }); print_help(false); fc_append('|'); get_id(); if (ip == "127.0.0.1") { get_clients(); } while (1) { char* readline = fc_readline(); if (readline == nullptr) { break; } if (!th_run_ || !client_->is_normal()) { TLOGW("The link has been closed and cannot be continued. It will automatically exit."); break; } std::string cmd_input(readline); fc_free(readline); std::cout << "" << std::endl; cmd_input = ofen::COfStr::trim(cmd_input); if (cmd_input == "help" || cmd_input == "h") { print_help(true); continue; } if (cmd_input == "end" || cmd_input == "End") { th_run_ = false; std::this_thread::sleep_for(std::chrono::milliseconds(10)); break; } save_line_his(cmd_input); if (cmd_input == "who" || cmd_input == "Who") { TLOGD("ID => {}", own_id_); continue; } if (cmd_input == "Where" || cmd_input == "where" || cmd_input == "wh") { TLOGD("At => {}", COfPath::to_full(".")); continue; } if (cmd_input == "Get" || cmd_input == "get" || cmd_input == "g" || cmd_input == "G") { get_clients(); continue; } if (cmd_input == "Clear" || cmd_input == "clear" || cmd_input == "c" || cmd_input == "C") { cmd_clear_submited(); continue; } auto vec = COfStr::split(cmd_input, " "); if (vec.size() < 2) { TLOGE("No matched cmd, May be param size incorrect."); continue; } std::string param(cmd_input); std::string scmd = param.substr(0, param.find_first_of(" ")); param.erase(0, param.find_first_of(" ") + 1); if (scmd == "UpTask" || scmd == "uptask" || scmd == "ut") { cmd_sub_task(param, true); continue; } if (scmd == "DownTask" || scmd == "downtask" || scmd == "dt") { cmd_sub_task(param, false); continue; } if (scmd == "Up" || scmd == "up") { cmd_upload_files(param); continue; } if (scmd == "Fetch" || scmd == "fetch") { cmd_fetch_files(param); continue; } if (scmd == "Sub" || scmd == "sub") { cmd_sub_list(param); continue; } if (scmd == "Ls" || scmd == "ls") { cmd_ls(param); continue; } if (scmd == "Down" || scmd == "down") { cmd_down_list(param); continue; } TLOGE("No matched cmd, May be param size incorrect."); } client_->disconnect(); thread.join(); TLOGI("{} exit.", __FUNCTION__); } bool CClient::get_clients() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_GET_LIST; return send_frame(buf.get()); } bool CClient::cmd_fetch_files(const std::string& param) { if (downloading_) { TLOGW("Have Task Downloading, Please wait....."); return false; } std::string relative_path; auto v = COfStr::split(param, " "); if (v.size() < 1) { TLOGE("param size not enough."); return false; } if (v.size() > 1) { relative_path = v[1]; } auto id = std::stoi(v[0]); if (!clients_.count(id)) { TLOGE("No matched id[{}] in task list.", id); return false; } if (clients_[id]->id == own_id_) { TLOGW("You can't down your own file!!!"); return false; } const auto& vec = clients_[id]->files; down_ = std::make_shared(); if (vec.empty()) { TLOGW("No files List, Please Check!"); return false; } // 开始传输文件 for (const auto& item : vec) { if (!down_one_file(id, item, relative_path)) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(20)); } return true; } bool CClient::cmd_sub_list(const std::string& param) { { std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->trans_state_ == TRANS_REDAY || item.second->trans_state_ == TRANS_ING) { TLOGW("Have Task Upping, Please wait!"); return false; } } } std::vector list{}; if (!CFileOpr::get_file_list(param, list)) { TLOGE("abort do up task."); return false; } std::string msg; for (const auto& item : list) { if (!fs::exists(item)) { TLOGE("File {} not exist, please check.", item); return false; } if (!fs::is_regular_file(item)) { TLOGE("Only Support Up File, But directory.", item); return false; } if (msg.empty()) { msg.append(item); } else { msg.append("|" + item); } } if (msg.empty()) { TLOGW("{} msg empty.", __FUNCTION__); return false; } CMessageInfo msg_info; msg_info.uuid = uuid_; msg_info.str = msg; std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_UP_LIST; serialize(msg_info, &buf->data_, buf->len_); return send_frame(buf.get()); } bool CClient::cmd_clear_submited() { { std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->trans_state_ == TRANS_REDAY || item.second->trans_state_ == TRANS_ING) { TLOGW("Have Task Upping, Please wait!"); return false; } } } std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_CANCEL_LIST; return send_frame(buf.get()); } bool CClient::cmd_upload_files(const std::string& param) { auto tvec = COfStr::split(param, " "); if (tvec.size() < 3) { TLOGE("{} invalid param format [{}]", __FUNCTION__, param); return false; } int index = std::stoi(tvec[0]); std::string list_file = tvec[1]; std::string pur = tvec[2]; if (downloading_) { TLOGW("Have Task Downloading, Please wait....."); return false; } if (!clients_.count(index)) { TLOGE("{} No Index Found {}.", __LINE__, index); return false; } const auto& sr = clients_[index]; if (sr->id == own_id_) { TLOGW("You can't send file to yourself!!!"); return false; } // 校验格式是否正确 auto vec = COfStr::split(list_file, "|"); bool valid = true; int line = 1; std::unordered_map mre{}; std::string handled_content; for (const auto& item : vec) { std::string hitem = COfStr::trim(item); if (hitem.empty()) { continue; } auto real_path = COfPath::to_full(hitem); if (!fs::exists(real_path)) { TLOGE("file {} not exist.", real_path); valid = false; break; } TLOGI("--->check pass {}:{}", line, real_path); mre[line++] = real_path + "|" + pur; } if (!valid) { TLOGE("Judge File Not Passed."); return false; } auto handel_ret = handle_user_select(mre, true); if (handel_ret.empty()) { TLOGE("handle_user_select not pass, abort action!"); return false; } list_file_ = "auto_list"; std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_REQUEST_UPDATE_LIST; CMessageInfo msg_info; msg_info.str = handel_ret; serialize(msg_info, &buf->data_, buf->len_); buf->tid_ = clients_[index]->id; if (!send_frame(buf.get())) { TLOGE("Send Failed {}", __LINE__); return false; } return true; } bool CClient::down_one_file(int remote_id, const std::string& file, const std::string& local_dir) { if (clients_.count(remote_id) == 0) { TLOGE("{} No Index Found {}.", __LINE__, remote_id); return false; } auto client = clients_[remote_id]; down_->cur_remote_id_ = client->id; down_->cur_remote_file_ = file; fs::path remote_file(ofen::COfPath::normalize(down_->cur_remote_file_)); if (local_dir.empty()) { down_->cur_file_ = COfPath::to_full(remote_file.filename().string()); } else { down_->cur_file_ = fs::path(local_dir).append(remote_file.filename().string()).string(); } // 这里要先检查羁绊 if (client->uuid == uuid_ && COfPath::is_same_dir(remote_file.string(), down_->cur_file_)) { // 处在同一个机器上的同目录下 TLOGE("You Can't Operate File In Same Dir And In Same Machine.", down_->cur_remote_file_); return false; } TLOGW("Start Down => {} To {}", down_->cur_remote_file_, down_->cur_file_); down_->file_.open(down_->cur_file_, std::ios::out | std::ios::binary); if (!down_->file_.is_open()) { TLOGE("Open {} Failed.", down_->cur_file_); return false; } // 请求下载文件 std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_OPEN_FILE; buf->tid_ = client->id; CMessageInfo msg_info; msg_info.str = file; serialize(msg_info, &buf->data_, buf->len_); if (!send_frame(buf.get())) { TLOGE("{} request open file [{}] send failed.", __FUNCTION__, down_->cur_remote_file_); down_->cur_remote_id_.clear(); down_->cur_remote_file_.clear(); return false; } will_receive_ = true; down_->trans_state_ = TRANS_REDAY; cur_down_size_ = 0; float percent = 0.0; fc_disable_cur(); while (down_->trans_state_ != TRANS_DONE && down_->trans_state_ != TRANS_FAILED) { std::this_thread::sleep_for(std::chrono::milliseconds(down_check_wait)); if (cur_file_size_ > 0) { percent = (float)cur_down_size_ / cur_file_size_; CTransProtocal::display_progress(percent); } if (!th_run_) { TLOGE("Interrup When Receive File."); report_trans_ret(TRANS_FAILED); return false; } } if (cur_file_size_ > 0) { percent = (float)cur_down_size_ / cur_file_size_; CTransProtocal::display_progress(percent); } fc_enable_cur(); if (cur_down_size_ > 0 && cur_file_size_ == cur_down_size_) { TLOGW("down one file success, total:[{}/{}]", cur_down_size_, cur_file_size_); return true; } else { TLOGW("down one file {} failed.", down_->cur_file_); if (!down_->file_.is_open()) { down_->file_.close(); fs::remove(down_->cur_file_); } return false; } } void CClient::report_trans_ret(TransState state, const std::string& key) { std::shared_ptr t = nullptr; if (key.empty()) { t = down_; downloading_ = false; will_receive_ = false; } else { std::lock_guard lock(mutex_); if (up_.count(key)) { t = up_[key]; } } if (t == nullptr) { return; } t->trans_state_ = state; if (t->file_.is_open()) { t->file_.close(); if (key.empty() && t->trans_state_ == TRANS_FAILED) { fs::remove(t->cur_file_); } } t->cur_remote_file_.clear(); t->cur_remote_id_.clear(); } /* 清单文件,内容格式为: D:/a.txt|/home/zhangsan/ C:/Dijava|/home/zhangsan/dia 功能为,请求某个客户端,更新我所列出的文件,右侧是远端需要存储的目录(必须存在,不存在则不理会) */ bool CClient::cmd_sub_task(const std::string& param, bool is_send) { auto tvec = COfStr::split(param, " "); if (tvec.size() < 2) { TLOGE("{} invalid param format [{}]", __FUNCTION__, param); return false; } int index = std::stoi(tvec[0]); std::string list_file = tvec[1]; if (downloading_) { TLOGW("Have Task Downloading, Please wait....."); return false; } if (!clients_.count(index)) { TLOGE("{} No Index Found {}.", __LINE__, index); return false; } const auto& sr = clients_[index]; // 读取list文件 std::string list_file_full = COfPath::to_full(list_file); if (fs::is_directory(list_file_full)) { TLOGE("{} is a directory, only support file.", list_file_full); return false; } bool local_trans = false; if (sr->uuid == uuid_) { TLOGW("local_trans path handle mode!!!"); local_trans = true; } std::ifstream in(list_file_full); if (!in.is_open()) { TLOGE("Can't Open File:{}", list_file_full); return false; } std::istreambuf_iterator iterf(in); std::istreambuf_iterator iter; std::string content(iterf, iter); in.close(); #if defined(_WIN32) // 从文件中读取的内容还是要手动转码一下。 content = CCodec::u8_to_ansi(content); #endif // 校验格式是否正确 auto vec = COfStr::split(content, "\n"); bool valid = true; int line = 1; std::unordered_map mre{}; std::string handled_content; for (const auto& item : vec) { std::string hitem = COfStr::trim(item); if (hitem.empty()) { continue; } auto v = COfStr::split(hitem, "|"); if (v.size() >= 2) { auto pr = variable_handle(list_file_full, v[0], true); pr = COfPath::standardize(pr); if (is_send && !fs::exists(pr)) { TLOGE("file {} not exist.", pr); valid = false; break; } if (is_send) { TLOGI("--->check pass {}:{}", line, pr); } auto sec = v[1]; sec = variable_handle(list_file_full, sec, local_trans); sec = COfPath::standardize(sec); mre[line++] = pr + "|" + sec; continue; } valid = false; break; } if (!valid) { TLOGE("Judge List File {} Format Or Logic Not Passed.", list_file_full); return false; } auto handel_ret = handle_user_select(mre, is_send); if (handel_ret.empty()) { TLOGE("handle_user_select not pass, abort action!"); return false; } list_file_ = list_file_full; std::shared_ptr buf = std::make_shared(); if (is_send) { buf->type_ = TYPE_REQUEST_UPDATE_LIST; } else { buf->type_ = TYPE_REQUEST_DOWN_UPDATE_LIST; } CMessageInfo msg_info; msg_info.str = handel_ret; serialize(msg_info, &buf->data_, buf->len_); buf->tid_ = clients_[index]->id; if (!send_frame(buf.get())) { TLOGE("Send Failed {}", __LINE__); return false; } return true; } bool CClient::variable_and_parse_files(const std::string& content, std::map& files) { auto vec = COfStr::split(content, "\n"); bool valid = true; for (const auto& item : vec) { if (item.empty()) { continue; } auto vi = COfStr::split(item, "|"); if (vi.size() != 2) { TLOGE("Size not 2 {}", item); valid = false; break; } auto pr = variable_handle("", vi[1], false); if (!fs::exists(pr)) { valid = false; TLOGE("Not exist {}", pr); break; } TLOGI("---> check pass {}", pr); files[vi[0]] = pr; } return valid; } bool CClient::down_update_file(const std::map& files) { std::shared_ptr buf = std::make_shared(); buf->tid_ = list_serve_id_; down_ = std::make_shared(); bool suc = true; int id = -1; for (const auto& item : clients_) { if (item.second->id == list_serve_id_) { id = item.first; break; } } for (const auto& item : files) { if (!down_one_file(id, item.first, item.second)) { suc = false; break; } } if (suc) { buf->type_ = TYPE_DONE_UPDATE_LIST; TLOGI("################## Do Task From Remote {} Done!", buf->tid_); } else { buf->type_ = TYPE_FAILED_UPDATE_LIST; TLOGE("################## Do Task From Remote {} Failed!", buf->tid_); } send_frame(buf.get()); return suc; } bool CClient::get_dir_files(const std::string& dir, std::string& out, std::string& error) { fs::path p(dir); out.clear(); error.clear(); if (!fs::is_directory(p) || !fs::exists(p)) { error = fmt::format("{} not a dir or not exist", p.string()); return false; } // F开头文件,D开头目录,使用前需要去除。 for (const auto& entry : fs::directory_iterator(p)) { if (fs::is_directory(entry)) { out += "D" + COfPath::standardize(entry.path().string()) + "\n"; } else { out += "F" + COfPath::standardize(entry.path().string()) + "\n"; } } return true; } bool CClient::cmd_ls(const std::string& param) { auto tvec = COfStr::split(param, " "); if (tvec.size() < 2) { TLOGE("{} invalid param format [{}]", __FUNCTION__, param); return false; } int index = std::stoi(tvec[0]); std::string path = tvec[1]; if (!clients_.count(index)) { TLOGE("{} No Index Found {}.", __LINE__, index); return false; } const auto& sr = clients_[index]; std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_GET_DIRFILES; CMessageInfo msg_info{}; msg_info.str = path; serialize(msg_info, &buf->data_, buf->len_); buf->tid_ = sr->id; if (!send_frame(buf.get())) { TLOGE("Send Failed {}", __LINE__); return false; } return true; } bool CClient::cmd_down_list(const std::string& param) { auto tvec = COfStr::split(param, " "); if (tvec.size() < 2) { TLOGE("{} invalid param format [{}]", __FUNCTION__, param); return false; } int index = std::stoi(tvec[0]); std::string lists = tvec[1]; std::string local = tvec.size() > 2 ? COfPath::to_full(tvec[2]) : COfPath::to_full(""); if (!clients_.count(index)) { TLOGE("{} No Index Found {}.", __LINE__, index); return false; } const auto& sr = clients_[index]; if (sr->id == own_id_) { TLOGE("Can't Req Self Task {}.", sr->id); return false; } down_ = std::make_shared(); auto vec = COfStr::split(lists, "|"); for (const auto& item : vec) { auto path = COfStr::trim(item); if (path.empty()) { continue; } if (!down_one_file(index, path, local)) { TLOGE("Down Failed {}", __LINE__); return false; } } return true; } bool CClient::send_frame(CFrameBuffer* buf) { char* out_buf{}; int out_len{}; if (!CTransProtocal::pack(buf, &out_buf, out_len)) { TLOGE("{} pack failed.", __FUNCTION__); return false; } std::lock_guard lock(send_mut_); if (!client_->send(out_buf, out_len)) { delete[] out_buf; return false; } delete[] out_buf; return true; } void CClient::save_line_his(const std::string& input) { if (input.empty()) { return; } auto history = load_line_his(); for (const auto& item : history) { if (input == item) { return; } } history.push_back(input); const size_t max_history = 30; if (history.size() > max_history) { history.erase(history.begin()); } std::ofstream out_file(config_path_, std::ios::out); if (out_file.is_open()) { for (const auto& line : history) { out_file << line << "\n"; } out_file.close(); } } std::vector CClient::load_line_his() { std::vector history; if (!fs::exists(config_path_)) { return history; } std::ifstream in_file(config_path_, std::ios::in); if (in_file.is_open()) { std::string line; while (std::getline(in_file, line)) { if (!line.empty()) { history.push_back(line); } } in_file.close(); } return history; } std::string CClient::variable_and_reverse_files(const std::string& source) { auto vec = COfStr::split(source, "\n"); std::string result; bool valid = true; for (const auto& item : vec) { auto rl = COfStr::trim(item); if (rl.empty()) { continue; } auto vi = COfStr::split(rl, "|"); if (vi.size() != 2) { TLOGE("Size not 2 {}", item); valid = false; break; } auto pr = variable_handle("", vi[1], false); if (!fs::exists(pr)) { valid = false; TLOGE("Not exist {}", pr); break; } TLOGI("---> check pass {}", pr); fs::path ppr(pr); fs::path pvi(vi[0]); auto lp = ppr.append(pvi.filename().string()).string(); auto rp = pvi.parent_path().string(); auto line = lp + "|" + rp; result = result + line + "\n"; } if (!valid) { return ""; } return result; } bool CClient::save_uuid() { fs::path uuid_path(uuid_path_); if (fs::exists(uuid_path)) { return true; } std::ofstream out_file(uuid_path.string(), std::ios::out); if (!out_file.is_open()) { TLOGE("Open File Failed {}", uuid_path.string()); return false; } std::string uuid1 = OfUtil::get_sim_uuid(); std::string uuid2 = OfUtil::get_sim_uuid(); std::string uuid = uuid1 + "-" + uuid2; out_file << uuid; out_file.close(); return true; } std::string CClient::read_uuid() { fs::path uuid_path(uuid_path_); if (!fs::exists(uuid_path)) { return std::string(); } std::ifstream in_file(uuid_path.string(), std::ios::in); if (!in_file.is_open()) { TLOGE("Open File Failed {}", uuid_path.string()); return ""; } std::string uuid; in_file >> uuid; in_file.close(); return uuid; } void CClient::get_id() { auto* bf = new CFrameBuffer(); bf->type_ = TYPE_GET_ID; CMessageInfo msg_info; msg_info.uuid = uuid_; serialize(msg_info, &bf->data_, bf->len_); send_frame(bf); delete bf; } void CClient::handle_frame(CFrameBuffer* buf) { if (buf == nullptr) { TLOGE("{} nullptr.", __FUNCTION__); return; } switch (buf->type_) { case TYPE_GET_ID: { TLOGD("ID => {}", buf->tid_); own_id_ = buf->tid_; break; } case TYPE_GET_LIST: { clients_.clear(); CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetList deserialize failed.", __LINE__); break; } auto vec = COfStr::split(msg_info.str, "\n"); int index = -1; size_t num = 0; for (const auto& item : vec) { std::string real = COfStr::trim(item); if (real.empty()) { continue; } if (real.find('[') == std::string::npos) { if (num < 20) { TLOGI("FILE ==> {}", real); } clients_[index]->files.push_back(real); ++num; } else { auto a1 = real.find_first_of('[') + 1; auto b1 = real.find_first_of(']'); std::string str_index = real.substr(a1, b1 - a1); index = std::stoi(str_index); std::string backup = real; backup.erase(0, b1 + 1); auto a2 = backup.find_first_of('[') + 1; auto b2 = backup.find_first_of(']'); std::string id = backup.substr(a2, b2 - a2); auto a3 = real.find_last_of('[') + 1; auto b3 = real.find_last_of(']'); std::string uuid = real.substr(a3, b3 - a3); real.erase(a3 - 1, b3 - a3 + 2); if (!clients_.count(index)) { clients_[index] = std::make_shared(); clients_[index]->id = id; clients_[index]->uuid = uuid; } if (num < 20) { TLOGD("============================================"); if (id == own_id_) { TLOGI("@{}", real); } else { TLOGI("{}", real); } } } } if (num >= 20) { TLOGW("Too Many Files [{}], Only Display 20.", num); } break; } // 能接收到 TRANS 一定是客户端(这里不是指Server) case TYPE_TRANS_FILE: { if (!downloading_) { downloading_ = true; } if (will_receive_) { down_->file_.write(buf->data_, buf->len_); if (down_->file_.fail()) { report_trans_ret(TRANS_FAILED); TLOGW("no matched write and data. {}", buf->len_); } cur_down_size_ += buf->len_; } break; } case TYPE_GET_DIRFILES: { CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetDirFiles deserialize failed.", __LINE__); break; } TLOGI("{} Get Dir Files: {}", buf->fid_, msg_info.str); std::string err; std::string out; if (!get_dir_files(msg_info.str, out, err)) { TLOGE("Get Dir Files Failed. {}", err); buf->type_ = TYPE_GET_DIRFILES_FAILED; delete[] buf->data_; msg_info.str = err; } else { buf->type_ = TYPE_GET_DIRFILES_DONE; delete[] buf->data_; msg_info.str = out; } serialize(msg_info, &buf->data_, buf->len_); std::swap(buf->tid_, buf->fid_); if (!send_frame(buf)) { TLOGE("Send Failed {}.", __LINE__); break; } break; } case TYPE_GET_DIRFILES_FAILED: { CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetDirFiles deserialize failed.", __LINE__); break; } TLOGE("Get {} Dir Files Failed. {}", buf->fid_, msg_info.str); break; } case TYPE_GET_DIRFILES_DONE: { CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetDirFiles deserialize failed.", __LINE__); break; } auto vec = COfStr::split(msg_info.str, "\n"); TLOGD("$$$$$$$$$ START $$$$$$$$$"); for (const auto& item : vec) { std::string real = COfStr::trim(item); if (real.empty()) { continue; } if (real.find("D") == 0) { real.erase(0, 1); TLOGI("d=>{}", real); } else if (real.find("F") == 0) { real.erase(0, 1); TLOGI("f=>{}", real); } else { TLOGE("Error Remote List Format {}", real); } } TLOGD("$$$$$$$$$ OVER $$$$$$$$$"); break; } case TYPE_OPEN_FILE: { std::string keys{}; CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} OpenFile deserialize failed.", __LINE__); break; } { std::lock_guard lock(mutex_); up_[buf->fid_] = std::make_shared(); up_[buf->fid_]->cur_file_ = msg_info.str; up_[buf->fid_]->file_.open(up_[buf->fid_]->cur_file_, std::ios::in | std::ios::binary); up_[buf->fid_]->trans_state_ = TRANS_REDAY; if (!up_[buf->fid_]->file_.is_open()) { TLOGE("Ready Send File {} Open Failed.", up_[buf->fid_]->cur_file_); buf->type_ = TYPE_OPEN_FAILED; std::swap(buf->tid_, buf->fid_); if (!send_frame(buf)) { TLOGE("Send Failed {}.", __LINE__); break; } break; } keys = buf->fid_; } if (!keys.empty()) { ths_.emplace_back([this, keys]() { send_file_data_th(keys.c_str()); }); } break; } case TYPE_TRANS_DONE: { report_trans_ret(TRANS_DONE); #ifdef _WIN32 #else if (down_ && down_->remote_plat == 1) { TLOGI("recovery permissions {}.", down_->permissions); fs::perms perms = static_cast(down_->permissions); fs::permissions(down_->cur_file_, perms); } #endif break; } case TYPE_OPEN_FAILED: { TLOGE("Remote {} Open File Failed.", buf->fid_); if (down_) { down_->trans_state_ = TRANS_FAILED; } break; } case TYPE_OFFLINE: { if (buf->mark_) { std::lock_guard lock(mutex_); if (!up_.count(buf->fid_)) { TLOGW("Offline no match."); break; } auto t = up_[buf->fid_]; t->trans_state_ = TRANS_BREAK; break; } if (downloading_ && !down_->cur_remote_file_.empty()) { TLOGW("Stop Down {} From {}.", down_->cur_remote_file_, buf->fid_); } report_trans_ret(TRANS_FAILED); break; } case TYPE_REQUEST_DOWN_UPDATE_LIST: { CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetList deserialize failed.", __LINE__); break; } delete[] buf->data_; msg_info.str = variable_and_reverse_files(msg_info.str); serialize(msg_info, &buf->data_, buf->len_); std::swap(buf->tid_, buf->fid_); buf->type_ = TYPE_REQUEST_UPDATE_LIST; if (!send_frame(buf)) { TLOGE("Send Failed {}.", __LINE__); break; } break; }; case TYPE_REQUEST_UPDATE_LIST: { std::map files; if (down_ && down_->trans_state_ == TRANS_REDAY) { TLOGW("Update Busy......, Ignore {}", buf->fid_); buf->type_ = TYPE_BUSY_UPDATE_LIST; } else { CMessageInfo msg_info; if (!deserialize(buf->data_, buf->len_, msg_info)) { TLOGE("{} GetList deserialize failed.", __LINE__); break; } if (variable_and_parse_files(msg_info.str, files)) { buf->type_ = TYPE_CONFIRM_UPDATE_LIST; } else { buf->type_ = TYPE_UNCONFIRM_UPDATE_LIST; } } std::swap(buf->tid_, buf->fid_); if (!send_frame(buf)) { TLOGE("Send Failed {}.", __LINE__); break; } if (buf->type_ != TYPE_CONFIRM_UPDATE_LIST) { break; } list_serve_id_ = buf->tid_; TLOGD("Do Task From Remote {}.", buf->tid_); if (update_list_th_.joinable()) { update_list_th_.join(); } update_list_th_ = std::thread([this, files]() { down_update_file(files); }); break; } case TYPE_CONFIRM_UPDATE_LIST: { TLOGI("remote {} check {} passed!", buf->fid_, list_file_); break; } case TYPE_UNCONFIRM_UPDATE_LIST: { TLOGE("remote {} check {} not passed!", buf->fid_, list_file_); break; } case TYPE_DONE_UPDATE_LIST: { TLOGI("remote {} do task {} success!", buf->fid_, list_file_); break; } case TYPE_FAILED_UPDATE_LIST: { TLOGI("remote {} do task {} failed!", buf->fid_, list_file_); break; } case TYPE_BUSY_UPDATE_LIST: { TLOGI("remote {} are busy, will not exec task {}", buf->fid_, list_file_); break; } case TYPE_FILE_INFO: { std::string str_size(buf->data_, buf->len_); auto vec = COfStr::split(str_size, ","); if (vec.size() < 3) { TLOGE("invalid file information:{}", str_size); break; } long long size = std::stoll(vec[1]); std::string show_str = OfUtil::get_file_size(size); TLOGI("Ready Down Size: {}, permissions:{}", show_str, vec[2]); if (down_) { down_->permissions = static_cast(std::stoul(vec[2])); down_->remote_plat = static_cast(std::stoul(vec[0])); } cur_file_size_ = size; break; } default: TLOGE("UnSupport Type {}, Current Version v{}", static_cast(buf->type_), VERSION_NUM); break; } } void CClient::send_file_data_th(const char* keys) { std::string str_key(keys); std::shared_ptr t = nullptr; { std::lock_guard lock(mutex_); if (!up_.count(str_key)) { TLOGE("{} no matched key.", __FUNCTION__); return; } t = up_[str_key]; } TLOGI("Start Trans File {} To {}", t->cur_file_, str_key); std::shared_ptr buf = std::make_shared(); buf->data_ = new char[g_BuffSize]{}; buf->tid_ = str_key; // ******************************************************** // TYPE_FILE_INFO格式:平台([0,win], [1,unix]),大小,权限 // ******************************************************** // seekg 用于读,seekp 用于写。 t->file_.seekg(0, std::ios::end); long long size = t->file_.tellg(); t->file_.seekg(0, std::ios::beg); buf->type_ = TYPE_FILE_INFO; std::string str_size = std::to_string(size); // 文件权限 auto perms = fs::status(t->cur_file_).permissions(); std::string str_perm = std::to_string(static_cast(perms)); #if defined(_WIN32) std::string plat("0"); #else std::string plat("1"); #endif std::string info_result = plat + "," + str_size + "," + str_perm; TLOGI("To {} File Size: {} [{}], permissions:{}", str_key, ofen::OfUtil::get_file_size(size), size, str_perm); buf->len_ = std::snprintf(buf->data_, g_BuffSize, "%s", info_result.c_str()); if (!send_frame(buf.get())) { report_trans_ret(TRANS_FAILED, str_key); TLOGE("Stop Trans {} To {} failed.", t->cur_file_, str_key); return; } buf->type_ = TYPE_TRANS_FILE; buf->mark_ = 1; while (!t->file_.eof()) { if (t->trans_state_ == TRANS_BREAK) { TLOGW("Stop Trans {} To {} failed.", t->cur_file_, str_key); report_trans_ret(TRANS_FAILED, str_key); return; } t->file_.read(buf->data_, g_BuffSize); buf->len_ = t->file_.gcount(); if (!send_frame(buf.get())) { report_trans_ret(TRANS_FAILED, str_key); TLOGE("Stop Trans {} To {} failed.", t->cur_file_, str_key); return; } // std::this_thread::sleep_for(std::chrono::milliseconds(10)); } buf->type_ = TYPE_TRANS_DONE; if (!send_frame(buf.get())) { TLOGE("send_file_data_th send DONE failed."); } report_trans_ret(TRANS_DONE, str_key); TLOGD("Trans File {} To {} Done !!!", t->cur_file_, str_key); } void CClient::hearts() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_HEARTS; while (th_run_) { sleep_.sleep(); if (th_run_ && !send_frame(buf.get())) { TLOGE("{} send failed.", __FUNCTION__); th_run_ = false; } } } void CClient::judget_down_active() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_JUDGE_ACTIVE; while (th_run_) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (!downloading_) { continue; } if (down_) { buf->tid_ = down_->cur_remote_id_; send_frame(buf.get()); } } } std::string CClient::variable_handle(const std::string& task_list_path, const std::string& source, bool is_local) { std::string result(source); // 支持的变量如下: // ${HOME} 用户目录(发送端接收端均支持) // ${CURRENT} 任务文件所在目录(该变量仅支持发送端,因为接收端没有任务文件所在路径) if (is_local && source.find("${HOME}") != std::string::npos) { result = COfStr::replace(result, "${HOME}", COfPath::get_home()); } if (is_local && source.find("${CURRENT}") != std::string::npos) { if (task_list_path.empty()) { return result; } fs::path p(task_list_path); std::string list_dir = p.parent_path().string(); result = COfStr::replace(result, "${CURRENT}", list_dir); } return result; } std::string CClient::handle_user_select(const std::unordered_map& source, bool is_send) { std::string handled_content{}; std::string input{}; if (!is_send) { handled_content.clear(); // 清空之前的内容 for (const auto& pair : source) { handled_content.append(pair.second + "\n"); } return handled_content; } while (true) { TLOGI("numbers by space, or '0' use all, 'end' to quit: "); // std::getline(std::cin, input); char* readline = fc_readline(); input = std::string(readline); fc_free(readline); if (input == "end") { handled_content.clear(); break; } if (input == "0") { handled_content.clear(); // 清空之前的内容 for (const auto& pair : source) { handled_content.append(pair.second + "\n"); } break; } else { // 处理多个值的输入 std::stringstream ss(input); std::string num_str; while (ss >> num_str) { // 判断输入的每个值是否为有效的数字 try { int key = std::stoi(num_str); if (source.find(key) != source.end()) { handled_content.append(source.at(key) + "\n"); } else { // 如果mre中没有这个key TLOGE("Invalid input, please enter valid numbers or '0' for all."); break; } } catch (const std::exception& e) { TLOGE("Invalid input, please enter valid numbers or '0' for all."); break; } } if (!handled_content.empty()) { break; } } } return handled_content; } CFileOpr::CFileOpr() = default; CFileOpr::~CFileOpr() = default; bool CFileOpr::get_file_list(const std::string& input, std::vector& out) { out.clear(); auto backup = COfStr::trim(input); if (backup.empty()) { return false; } auto vec = COfStr::split(backup, "|"); for (const auto& item : vec) { std::string ret = COfStr::trim(item); std::string trim_item = ret; #ifdef _WIN32 if (item.find("\"") != std::string::npos) { ret = COfStr::replace(trim_item, "\"", ""); } #else if (item.find(R"(')") != std::string::npos) { ret = COfStr::replace(trim_item, R"(')", ""); } #endif if (ret.find("?") != std::string::npos || ret.find("*") != std::string::npos) { auto fv = COfPath::match_files(ret); for (const auto& v : fv) { TLOGI("match file: {}", v); } std::string cof; while (true) { TLOGI("Detected regex's file (num = {}), please confirm if it is correct? ", fv.size()); TLOGW("support input in [y,Y,end]", fv.size()); std::getline(std::cin, cof); if (cof == "y" || cof == "Y") { for (const auto& v : fv) { out.push_back(v); } TLOGD("OK, Done!"); break; } if (cof == "end") { return false; } } } else { out.push_back(COfPath::to_full(ret)); } } return true; }