精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容
近期使用httplib,研究了httplib的源代码,里面的线程池TreadPool用C++17标准实现,短小精悍实用性强,把std::mutex、condition_variable、function、thread和lambda函数体有机结合使用,用极少的代码实现了安全可靠的线程池ThreadPool,推荐给大家共同提高。
线程池代码:
//基类,线程常用任务
class TaskQueue {
public:
TaskQueue() = default;
virtual ~TaskQueue() = default;virtual bool enqueue(std::function<void()> fn) = 0;
virtual void shutdown() = 0;virtual void on_idle() {}
};class ThreadPool final : public TaskQueue {
public:
explicit ThreadPool(size_t n, size_t mqr = 0)
: shutdown_(false), max_queued_requests_(mqr) {
while (n) {
threads_.emplace_back(worker(*this));//初始化线程,worker结构体对象的默认函数是线程函数
n--;
}
}ThreadPool(const ThreadPool &) = delete;
~ThreadPool() override = default;bool enqueue(std::function<void()> fn) override {
{
std::unique_lock<std::mutex> lock(mutex_);
if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
return false;
}
jobs_.push_back(std::move(fn));//添加队列
}cond_.notify_one();//通知消息
return true;
}void shutdown() override {
// Stop all worker threads...
{
std::unique_lock<std::mutex> lock(mutex_);
shutdown_ = true;
}cond_.notify_all();//关闭时通知所有
// Join...
for (auto &t : threads_) {
t.join();
}
}private:
struct worker {
explicit worker(ThreadPool &pool) : pool_(pool) {}void operator()() {
for (;;) {
std::function<void()> fn;
{
std::unique_lock<std::mutex> lock(pool_.mutex_);//保证线程互斥,资源安全
//等待信号
pool_.cond_.wait(
lock, [&] { return !pool_.jobs_.empty() || pool_.shutdown_; });
//不满足条件退出
if (pool_.shutdown_ && pool_.jobs_.empty()) { break; }fn = pool_.jobs_.front();
pool_.jobs_.pop_front();
}assert(true == static_cast<bool>(fn));
fn();
}#if defined(CPPHTTPLIB_OPENSSL_SUPPORT) && !defined(OPENSSL_IS_BORINGSSL)
OPENSSL_thread_stop();
#endif
}ThreadPool &pool_;
};
friend struct worker;std::vector<std::thread> threads_;
std::list<std::function<void()>> jobs_;bool shutdown_;
size_t max_queued_requests_ = 0;std::condition_variable cond_;
std::mutex mutex_;
};
调用代码:
//this,sock是引导区给lambda函数体的参数
if (!task_queue->enqueue(
[this, sock]() { process_and_close_socket(sock); })) {
detail::shutdown_socket(sock);
detail::close_socket(sock);
}