精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容
C++11 支持的另一个同步基本类是条件变量(condition variable),该变量允许阻止一个或多个线程,直到从另一个线程收到通知,或者发生超时或虚假唤醒。<condition_variable>头文件中提供了两种条件变量的实现: condition_variable:任何想要等待的线程,它首先获取 std::unique_lock。 condition_variable_any: 是一种更通用的实现,适用于满足基本锁条件的任何类型的操作(提供 lock()和unlock()方法的实现)。这可能使用起来更昂贵(就性能和操作系统资源而言),因此只有在需要它提供的额外灵活性时,才应首选它。 下面介绍了条件变量的工作原理: 必须至少有一个线程正在等待条件变为 true。等待线程必须首先获取unique_lock, 此锁将传递给方法wait(),该方法释放互斥锁并挂起线程,直到条件变量收到信号。发生这种情况时,线程将被唤醒并重新获取锁。 必须至少有一个线程发出信号,进而条件为true。可以使用 notify_one() 取消阻塞一个正在等待条件信号的线程 (任何) 或使用 notify_all() 取消阻塞所有正在等待条件的线程来完成信号。 由于在多处理器系统上使条件唤醒完全可预测存在一些复杂性,因此可能会发生虚假唤醒。这意味着即使没有人向 condition 变量发出信号,线程也会被唤醒。因此,有必要检查线程唤醒后条件是否仍然成立。由于虚假唤醒可能会发生多次,因此必须在循环中完成该检查。 下面的代码显示了使用条件变量同步线程的示例:多个 “worker” 线程可能会在工作期间产生错误,并且它们会将错误代码放入队列中。“logger” 线程通过从队列中获取这些错误代码并打印它们来处理这些错误代码。worker 在发生错误时向 Logger 发出信号。记录器正在等待 condition 变量发出信号。为避免虚假唤醒,等待发生在检查布尔条件的循环中。
#include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int) std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std::thread loggerthread(loggerfunc); // start the working threads std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) { threads.push_back(std::thread(workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for(auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true; loggerthread.join(); return 0; }
运行此代码会生成如下所示的输出(请注意,每次运行时此输出都不同,因为每个工作线程都会在随机间隔内工作,即休眠):
[logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401
上面看到的方法有两个重载:wait() 一个只需要 unique_lock;这个释放锁,阻塞线程并将其添加到正在等待此 condition 变量的线程队列中;当 condition 变量发出信号或发生虚假唤醒时,线程将唤醒。当其中任何一种情况发生时,将重新获取锁并返回函数。 一个除了 unique_lock 之外,还采用一个用于循环直到返回false;此重载可用于避免虚假唤醒。它基本上等价于:
while(!predicate()) wait(lock);
注:意思是全局布尔变量和锁及条件变量一起工作,可以避免虚假唤醒。这点用起来真麻烦,用系统的互斥和信号更直接。
因此,通过使用wait修饰,可以通过上面的示例中使用的boolean标志g_notified来避免,避免和修饰结合预测和验证队列状态(空或非空):
void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } }
除了这个wait()重载方法之外,还有另外两个等待方法,这两个方法都具有类似的重载,它们都使用修饰来避免虚假唤醒: wait_for:阻塞线程,直到向 condition 变量发出信号或发生指定的超时 wait_until:阻塞线程,直到向 condition 变量发出信号或达到指定的时刻 不带修饰直接用这两个重载函数将返回一个cv_status,该指示表明唤醒 condition 变量的原因是由于发生超时、发出信号或由于虚假唤醒。 该标准还提供了一个名为 notify_all_at_thread_exit 的函数,该函数实现了一种机制,用于通知其他线程给定线程已完成,包括销毁所有thread_local对象。 引入此功能是因为,在使用 thread_locals 时,等待具有其他join()机制的线程可能会导致不正确的致命行为,因为即使在等待线程恢复并且可能也完成后,也可能调用它们的析构函数(有关详细信息,请参阅 N3070 和 N2880)。通常,对 this 函数的调用必须在线程存在之前发生。 下面是一个如何与notify_all_at_thread_exit和condition_variable一起使用来同步两个线程的示例:
std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int) std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; }
如果 worker 在线程之前完成工作,则输出将为:main
main running... worker running... main crunching... worker finished... main waiting for worker... main finished...
如果线程在工作线程之前完成,则输出将为:main
main running... worker running... main crunching... main waiting for worker... worker finished... main finished...