#ifndef THREAD_POOL_HEADER #define THREAD_POOL_HEADER #include #include #include #include #include #include #include namespace cppbox { // 线程安全队列 template class CQueueMt { public: CQueueMt() = default; CQueueMt(CQueueMt&& rh) noexcept {} ~CQueueMt() = default; public: // 返回队列是否为空 bool IsEmpty() { std::unique_lock lock(m_mutex); return m_queue.empty(); } // 返回队列数量 size_t Size() { std::unique_lock lock(m_mutex); return m_queue.size(); } // 添加元素 void Push(T& t) { std::unique_lock lock(m_mutex); m_queue.emplace(t); } // 取出元素 bool Pop(T& t) { std::unique_lock lock(m_mutex); if (m_queue.empty()) return false; t = std::move(m_queue.front()); m_queue.pop(); return true; } // 清除 void clear() { std::unique_lock lock(m_mutex); std::queue queue; std::swap(m_queue, queue); } private: std::mutex m_mutex; std::queue m_queue; }; class CBoxThreadPool { private: class CWorker { public: CWorker(CBoxThreadPool* pThreadPool, const size_t nID) : m_nID(nID), m_pThreadPool(pThreadPool) { is_run_ = false; } // 重载操作 void operator()() { // 1.定义基础函数类 func std::function func; // 2.是否成功取出队列中的元素 bool have = false; while (m_pThreadPool->is_start_ || !is_run_ || isContinue()) { // 进入 while 后,标志位置成已进入。 is_run_ = true; { // 2.1 为线程环境加锁,互斥访问线程的休眠与唤醒 std::unique_lock lock(m_pThreadPool->mutex_); // 2.2 如果队列为空则阻塞并等待条件变量的通知 if (m_pThreadPool->m_queue.IsEmpty()) m_pThreadPool->cv_.wait(lock); // 2.3 取出任务队列中的元素 have = m_pThreadPool->m_queue.Pop(func); } if (have) func(); } } // 检查是否需要继续完成未完成的任务 bool isContinue() { if (!m_pThreadPool->is_wait_) return false; if (m_pThreadPool->m_queue.IsEmpty()) return false; return true; } private: size_t m_nID; // 工作ID CBoxThreadPool* m_pThreadPool; // 所属线程池 bool is_run_; // 是否进入工作了 }; public: explicit CBoxThreadPool(const int nThreadNum = 4) : th_cnt_(nThreadNum), is_start_(false), is_wait_(false), threads_(std::vector(nThreadNum)) { } ~CBoxThreadPool() { ShutDownWaitCurrent(); } CBoxThreadPool(const CBoxThreadPool&) = delete; CBoxThreadPool(CBoxThreadPool&&) = delete; CBoxThreadPool& operator=(const CBoxThreadPool&) = delete; CBoxThreadPool& operator=(CBoxThreadPool&&) = delete; public: // 初始化线程池 void Init() { if (!is_start_) { threads_.resize(th_cnt_); for (size_t i = 0; i < threads_.size(); ++i) { threads_.at(i) = std::thread(CWorker(this, i)); } is_start_ = true; is_wait_ = false; return; } } // 等待各线程当前任务完成然后关闭线程池 void ShutDownWaitCurrent() { if (!is_start_) return; is_wait_ = false; is_start_ = false; Close(); m_queue.clear(); } // 等待所有已提交的任务完成然后关闭线程池 void ShutDownWaitAll() { if (!is_start_) return; is_wait_ = true; is_start_ = false; Close(); m_queue.clear(); } // 向池子中提交一个待执行函数 template auto Submit(F&& f, Args&&... args) -> std::future { // 1. 创建一个绑定参数的函数, // 连接函数和参数定义,特殊函数类型,避免左右值错误 std::function func = std::bind(std::forward(f), std::forward(args)...); // 2. 使用智能指针以便调用拷贝构造 auto pTask = std::make_shared>(func); // 3. 包装进一个无参数函数 std::function rf = [pTask]() { (*pTask)(); }; // 4. 入队 m_queue.Push(rf); // 5. 唤醒一个等待中的线程 cv_.notify_one(); // 6. 返回先前注册的任务指针 return pTask->get_future(); } private: void Close() { cv_.notify_all(); for (auto& m_thread : threads_) { if (m_thread.joinable()) m_thread.join(); } threads_.clear(); } private: int th_cnt_; // 线程数 bool is_start_; // 线程池是否启动 bool is_wait_; // 是否等待所有线程结束 std::mutex mutex_; // 线程休眠锁互斥 CQueueMt> m_queue; std::vector threads_; std::condition_variable cv_; // 环境锁,用于休眠或者唤醒 }; } // namespace cppbox #endif