209 lines
5.7 KiB
C++
209 lines
5.7 KiB
C++
#ifndef THREAD_POOL_HEADER
|
|
#define THREAD_POOL_HEADER
|
|
|
|
#include <condition_variable>
|
|
#include <functional>
|
|
#include <future>
|
|
#include <mutex>
|
|
#include <queue>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
namespace cppbox {
|
|
|
|
// 线程安全队列
|
|
template <typename T> class CQueueMt
|
|
{
|
|
public:
|
|
CQueueMt() = default;
|
|
CQueueMt(CQueueMt&& rh) noexcept {}
|
|
~CQueueMt() = default;
|
|
|
|
public:
|
|
// 返回队列是否为空
|
|
bool IsEmpty()
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
return m_queue.empty();
|
|
}
|
|
// 返回队列数量
|
|
size_t Size()
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
return m_queue.size();
|
|
}
|
|
// 添加元素
|
|
void Push(T& t)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
m_queue.emplace(t);
|
|
}
|
|
// 取出元素
|
|
bool Pop(T& t)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
if (m_queue.empty())
|
|
return false;
|
|
t = std::move(m_queue.front());
|
|
m_queue.pop();
|
|
return true;
|
|
}
|
|
// 清除
|
|
void clear()
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
std::queue<T> queue;
|
|
std::swap(m_queue, queue);
|
|
}
|
|
|
|
private:
|
|
std::mutex m_mutex;
|
|
std::queue<T> m_queue;
|
|
};
|
|
|
|
class CBoxThreadPool
|
|
{
|
|
private:
|
|
class CWorker
|
|
{
|
|
public:
|
|
CWorker(CBoxThreadPool* pThreadPool, const size_t nID)
|
|
: m_nID(nID), m_pThreadPool(pThreadPool)
|
|
{
|
|
is_run_ = false;
|
|
}
|
|
// 重载操作
|
|
void operator()()
|
|
{
|
|
// 1.定义基础函数类 func
|
|
std::function<void()> func;
|
|
// 2.是否成功取出队列中的元素
|
|
bool have = false;
|
|
while (m_pThreadPool->is_start_ || !is_run_ || isContinue()) {
|
|
// 进入 while 后,标志位置成已进入。
|
|
is_run_ = true;
|
|
{
|
|
// 2.1 为线程环境加锁,互斥访问线程的休眠与唤醒
|
|
std::unique_lock<std::mutex> lock(m_pThreadPool->mutex_);
|
|
// 2.2 如果队列为空则阻塞并等待条件变量的通知
|
|
if (m_pThreadPool->m_queue.IsEmpty())
|
|
m_pThreadPool->cv_.wait(lock);
|
|
// 2.3 取出任务队列中的元素
|
|
have = m_pThreadPool->m_queue.Pop(func);
|
|
}
|
|
|
|
if (have)
|
|
func();
|
|
}
|
|
}
|
|
// 检查是否需要继续完成未完成的任务
|
|
bool isContinue()
|
|
{
|
|
if (!m_pThreadPool->is_wait_)
|
|
return false;
|
|
if (m_pThreadPool->m_queue.IsEmpty())
|
|
return false;
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
size_t m_nID; // 工作ID
|
|
CBoxThreadPool* m_pThreadPool; // 所属线程池
|
|
bool is_run_; // 是否进入工作了
|
|
};
|
|
|
|
public:
|
|
explicit CBoxThreadPool(const int nThreadNum = 4)
|
|
: th_cnt_(nThreadNum),
|
|
is_start_(false),
|
|
is_wait_(false),
|
|
threads_(std::vector<std::thread>(nThreadNum))
|
|
{
|
|
}
|
|
~CBoxThreadPool() { ShutDownWaitCurrent(); }
|
|
CBoxThreadPool(const CBoxThreadPool&) = delete;
|
|
CBoxThreadPool(CBoxThreadPool&&) = delete;
|
|
CBoxThreadPool& operator=(const CBoxThreadPool&) = delete;
|
|
CBoxThreadPool& operator=(CBoxThreadPool&&) = delete;
|
|
|
|
public:
|
|
// 初始化线程池
|
|
void Init()
|
|
{
|
|
if (!is_start_) {
|
|
threads_.resize(th_cnt_);
|
|
for (size_t i = 0; i < threads_.size(); ++i) {
|
|
threads_.at(i) = std::thread(CWorker(this, i));
|
|
}
|
|
is_start_ = true;
|
|
is_wait_ = false;
|
|
return;
|
|
}
|
|
}
|
|
// 等待各线程当前任务完成然后关闭线程池
|
|
void ShutDownWaitCurrent()
|
|
{
|
|
if (!is_start_)
|
|
return;
|
|
is_wait_ = false;
|
|
is_start_ = false;
|
|
Close();
|
|
m_queue.clear();
|
|
}
|
|
|
|
// 等待所有已提交的任务完成然后关闭线程池
|
|
void ShutDownWaitAll()
|
|
{
|
|
if (!is_start_)
|
|
return;
|
|
is_wait_ = true;
|
|
is_start_ = false;
|
|
Close();
|
|
m_queue.clear();
|
|
}
|
|
// 向池子中提交一个待执行函数
|
|
template <typename F, typename... Args>
|
|
auto Submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
|
|
{
|
|
// 1. 创建一个绑定参数的函数,
|
|
// 连接函数和参数定义,特殊函数类型,避免左右值错误
|
|
std::function<decltype(f(args...))()> func =
|
|
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
|
|
// 2. 使用智能指针以便调用拷贝构造
|
|
auto pTask =
|
|
std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
|
|
// 3. 包装进一个无参数函数
|
|
std::function<void()> rf = [pTask]() { (*pTask)(); };
|
|
// 4. 入队
|
|
m_queue.Push(rf);
|
|
// 5. 唤醒一个等待中的线程
|
|
cv_.notify_one();
|
|
// 6. 返回先前注册的任务指针
|
|
return pTask->get_future();
|
|
}
|
|
|
|
private:
|
|
void Close()
|
|
{
|
|
cv_.notify_all();
|
|
|
|
for (auto& m_thread : threads_) {
|
|
if (m_thread.joinable())
|
|
m_thread.join();
|
|
}
|
|
threads_.clear();
|
|
}
|
|
|
|
private:
|
|
int th_cnt_; // 线程数
|
|
bool is_start_; // 线程池是否启动
|
|
bool is_wait_; // 是否等待所有线程结束
|
|
std::mutex mutex_; // 线程休眠锁互斥
|
|
|
|
CQueueMt<std::function<void()>> m_queue;
|
|
std::vector<std::thread> threads_;
|
|
std::condition_variable cv_; // 环境锁,用于休眠或者唤醒
|
|
};
|
|
|
|
} // namespace cppbox
|
|
#endif |