add:添加线程池。

This commit is contained in:
taynpg 2024-12-13 12:29:47 +08:00
parent 1427bf4bca
commit 44fb416ca1

View File

@ -1,12 +1,17 @@
#pragma once
#include "of_def.hpp"
#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <cstring>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <algorithm>
#include <cstring>
namespace ofen {
template <typename T> class OfSingleton
@ -37,6 +42,7 @@ class CMutBuffer
{
public:
CMutBuffer() = default;
public:
void push(const char* data, int len);
int index_of(const char* data, int len, int start_pos = 0);
@ -44,9 +50,177 @@ public:
int get_len() const;
void remove_of(int start_pos, int len);
void clear();
private:
std::vector<char> buffer_;
std::mutex mutex_;
};
template <typename T> class CQueueMt
{
public:
CQueueMt() = default;
CQueueMt(CQueueMt&& rh) noexcept
{
}
~CQueueMt() = default;
public:
bool empty()
{
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size()
{
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
}
void push(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.emplace(t);
}
bool pop(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty())
return false;
t = std::move(queue_.front());
queue_.pop();
return true;
}
void clear()
{
std::unique_lock<std::mutex> lock(mutex_);
std::queue<T> queue;
std::swap(queue_, queue);
}
private:
std::mutex mutex_;
std::queue<T> queue_;
};
class CThreadPool
{
private:
class CWorker
{
public:
CWorker(CThreadPool* pool, const size_t id) : id_(id), pool_(pool)
{
is_run_ = false;
}
void operator()()
{
std::function<void()> func;
bool have = false;
while (pool_->is_start_ || !is_run_ || is_continue()) {
is_run_ = true;
{
std::unique_lock<std::mutex> lock(pool_->mutex_);
if (pool_->queue_.empty())
pool_->cv_.wait(lock);
have = pool_->queue_.pop(func);
}
if (have)
func();
}
}
bool is_continue()
{
if (!pool_->is_wait_)
return false;
if (pool_->queue_.empty())
return false;
return true;
}
private:
size_t id_;
CThreadPool* pool_;
bool is_run_;
};
public:
explicit CThreadPool(const int num = 4)
: th_cnt_(num), is_start_(false), is_wait_(false), threads_(std::vector<std::thread>(num))
{
}
~CThreadPool()
{
close_wait_current();
}
CThreadPool(const CThreadPool&) = delete;
CThreadPool(CThreadPool&&) = delete;
CThreadPool& operator=(const CThreadPool&) = delete;
CThreadPool& operator=(CThreadPool&&) = delete;
public:
// 初始化线程池
void init()
{
if (!is_start_) {
threads_.resize(th_cnt_);
for (size_t i = 0; i < threads_.size(); ++i) {
threads_.at(i) = std::thread(CWorker(this, i));
}
is_start_ = true;
is_wait_ = false;
return;
}
}
void close_wait_current()
{
if (!is_start_)
return;
is_wait_ = false;
is_start_ = false;
close();
queue_.clear();
}
void close_wait_all()
{
if (!is_start_)
return;
is_wait_ = true;
is_start_ = false;
close();
queue_.clear();
}
template <typename F, typename... Args> auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto pTask = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> rf = [pTask]() { (*pTask)(); };
queue_.push(rf);
cv_.notify_one();
return pTask->get_future();
}
private:
void close()
{
cv_.notify_all();
for (auto& m_thread : threads_) {
if (m_thread.joinable())
m_thread.join();
}
threads_.clear();
}
private:
int th_cnt_;
bool is_start_;
bool is_wait_;
std::mutex mutex_;
CQueueMt<std::function<void()>> queue_;
std::vector<std::thread> threads_;
std::condition_variable cv_;
};
} // namespace ofen