ofen/include/of_util.h

280 lines
6.2 KiB
C++

#pragma once
#include "of_def.hpp"
#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <cstring>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ofen {
template <typename T> class OfSingleton
{
public:
OfSingleton(const OfSingleton&) = delete;
OfSingleton& operator=(const OfSingleton&) = delete;
static std::shared_ptr<T> getInstance()
{
std::call_once(init_flag, []() { instance.reset(new T()); });
return instance;
}
protected:
OfSingleton() = default;
virtual ~OfSingleton() = default;
private:
static std::shared_ptr<T> instance;
static std::once_flag init_flag;
};
template <typename T> std::shared_ptr<T> OfSingleton<T>::instance = nullptr;
template <typename T> std::once_flag OfSingleton<T>::init_flag;
class OfUtil
{
public:
OfUtil();
~OfUtil();
public:
static ofString now_time();
static ofString get_file_size(long long bytes);
static ofString get_sim_uuid();
static uint64_t get_timestamp_ms();
static uint64_t get_timestamp_s();
};
class CMutBuffer
{
public:
CMutBuffer() = default;
public:
void push(const char* data, int len);
int index_of(const char* data, int len, int start_pos = 0);
const char* get_data() const;
int get_len() const;
void remove_of(int start_pos, int len);
void clear();
private:
std::vector<char> buffer_;
std::mutex mutex_;
};
template <typename T> class CQueueMt
{
public:
CQueueMt() = default;
CQueueMt(CQueueMt&& rh) noexcept
{
}
~CQueueMt() = default;
public:
bool empty()
{
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size()
{
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
}
void push(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.emplace(t);
}
bool pop(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty())
return false;
t = std::move(queue_.front());
queue_.pop();
return true;
}
void clear()
{
std::unique_lock<std::mutex> lock(mutex_);
std::queue<T> queue;
std::swap(queue_, queue);
}
private:
std::mutex mutex_;
std::queue<T> queue_;
};
class CThreadPool
{
private:
class CWorker
{
public:
CWorker(CThreadPool* pool, const size_t id) : id_(id), pool_(pool)
{
is_run_ = false;
}
void operator()()
{
std::function<void()> func;
bool have = false;
while (pool_->is_start_ || !is_run_ || is_continue()) {
is_run_ = true;
{
std::unique_lock<std::mutex> lock(pool_->mutex_);
if (pool_->queue_.empty())
pool_->cv_.wait(lock);
have = pool_->queue_.pop(func);
}
if (have)
func();
}
}
bool is_continue()
{
if (!pool_->is_wait_)
return false;
if (pool_->queue_.empty())
return false;
return true;
}
private:
size_t id_;
CThreadPool* pool_;
bool is_run_;
};
public:
explicit CThreadPool(const int num = 4)
: th_cnt_(num), is_start_(false), is_wait_(false), threads_(std::vector<std::thread>(num))
{
}
~CThreadPool()
{
close_wait_current();
}
CThreadPool(const CThreadPool&) = delete;
CThreadPool(CThreadPool&&) = delete;
CThreadPool& operator=(const CThreadPool&) = delete;
CThreadPool& operator=(CThreadPool&&) = delete;
public:
// 初始化线程池
void init()
{
if (!is_start_) {
threads_.resize(th_cnt_);
for (size_t i = 0; i < threads_.size(); ++i) {
threads_.at(i) = std::thread(CWorker(this, i));
}
is_start_ = true;
is_wait_ = false;
return;
}
}
void close_wait_current()
{
if (!is_start_)
return;
is_wait_ = false;
is_start_ = false;
close();
queue_.clear();
}
void close_wait_all()
{
if (!is_start_)
return;
is_wait_ = true;
is_start_ = false;
close();
queue_.clear();
}
template <typename F, typename... Args> auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto pTask = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> rf = [pTask]() { (*pTask)(); };
queue_.push(rf);
cv_.notify_one();
return pTask->get_future();
}
private:
void close()
{
cv_.notify_all();
for (auto& m_thread : threads_) {
if (m_thread.joinable())
m_thread.join();
}
threads_.clear();
}
private:
int th_cnt_;
bool is_start_;
bool is_wait_;
std::mutex mutex_;
CQueueMt<std::function<void()>> queue_;
std::vector<std::thread> threads_;
std::condition_variable cv_;
};
class CCodec
{
public:
CCodec() = default;
~CCodec() = default;
public:
#ifdef _WIN32
static std::string u8_to_ansi(const std::string& str);
static std::string ansi_to_u8(const std::string& str);
#endif
/// @brief 删除,段落中的空白字符,如[你好 啊,在 哪里 ?] => [你好啊,在哪里?]
/// 仅处理非 ASCII 码附近的内容。
/// @param str
/// @return
static ofString rbs(const ofString& str);
};
typedef class CThreadSleep
{
public:
CThreadSleep();
~CThreadSleep() = default;
public:
void sleep(int milsec = 0);
void contiune();
void set_timeout(int milsec);
private:
bool get_status() const;
private:
std::condition_variable cv_;
int timeout_;
const int default_timeout_ = 10;
std::mutex mutex_;
bool is_stop_sleep_;
} ThreadSleep;
} // namespace ofen