C++并发同步利器-condition_variable:优雅的线程等待与唤醒
在复杂的并发程序中,线程之间常常需要相互等待某个条件成立才能继续执行。例如,生产者线程生产数据后通知消费者线程,或者多个工作线程等待某个共享状态变为特定值。C++标准库通过<condition_variable>头文件提供了std::condition_variable和
std::condition_variable_any,它们是实现这种等待/通知机制的关键同步原语。
1. <condition_variable>库简介
条件变量(Condition Variable)允许一个或多个线程等待(阻塞)直到另一个线程修改了某个共享状态(条件)并通知条件变量。条件变量总是与一个互斥量(通常是std::mutex)配合使用,以保护共享状态的访问并避免竞态条件。
核心工作流程:
- 等待方线程: a. 获取与条件变量关联的互斥锁(通常通过std::unique_lock)。 b. 检查条件是否满足。如果条件已满足,则继续执行,无需等待。 c. 如果条件不满足,则调用条件变量的wait(或其变体wait_for, wait_until)方法。此方法会原子地: i. 释放互斥锁。 ii. 将当前线程置于阻塞状态,等待通知。 d. 当被唤醒时(由其他线程通知或虚假唤醒),wait方法会重新获取互斥锁,然后返回。 e. 等待方线程必须再次检查条件是否真正满足(因为可能是虚假唤醒,或者条件在唤醒后又被其他线程改变了),如果条件仍不满足,则继续循环等待。
- 通知方线程: a. 获取与条件变量关联的同一个互斥锁。 b. 修改共享状态,使得等待条件可能成立。 c. 调用条件变量的notify_one()(唤醒一个等待线程)或notify_all()(唤醒所有等待线程)方法。 d. 释放互斥锁。
<condition_variable>库主要提供两种条件变量:
- std::condition_variable: 只能与std::unique_lock<std::mutex>一起使用。
- std::condition_variable_any: 可以与任何满足基本锁需求的锁类型(Lockable)一起使用,例如std::shared_lock。这提供了更大的灵活性,但通常性能略低于std::condition_variable。
2. <condition_variable>库的特点
- 线程间同步:允许线程基于特定条件进行等待和唤醒,实现复杂的协作逻辑。
- 配合互斥量:必须与互斥量一起使用,以保护共享条件和避免竞态。
- 避免忙等待:线程在等待条件时会阻塞,释放CPU资源,而不是通过轮询消耗CPU(忙等待)。
- 虚假唤醒(Spurious Wakeups):wait操作可能会在没有相应通知的情况下返回。因此,等待方总是在循环中检查条件。
- notify_one vs notify_all:提供精确控制,唤醒单个或所有等待线程。
- 超时等待:wait_for和wait_until允许线程等待一段时间或直到某个时间点。
3. std::condition_variable详解
std::condition_variable不可复制,不可移动。
成员函数:
- condition_variable() noexcept;: 默认构造函数。
- ~condition_variable();: 析构函数。如果在析构时仍有线程在等待此条件变量,行为是未定义的。
- wait(std::unique_lock<std::mutex>& lock);: void 当前线程必须已锁定lock所管理的互斥量。此函数原子地释放lock,并阻塞当前线程,直到被notify_one()或notify_all()唤醒。唤醒后,函数会重新锁定lock然后返回。 必须在循环中调用并检查条件:
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, []{ return shared_condition_is_true; }); // 带谓词的版本
// 等价于 (但不完全相同,因为谓词版本内部处理了虚假唤醒的循环):
// while (!shared_condition_is_true) {
// cv.wait(lk);
// }
- wait(std::unique_lock<std::mutex>& lock, Predicate pred);: void Predicate是一个可调用对象,返回bool。此版本等价于:
while (!pred()) {
wait(lock);
}
这是推荐的等待方式,因为它内部处理了虚假唤醒的循环。
- wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time);: std::cv_status 等待指定的相对时间rel_time。如果超时或被唤醒,则返回。 返回std::cv_status::timeout如果超时,std::cv_status::no_timeout如果因通知而返回。
- wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time, Predicate pred);: bool 等待指定的相对时间rel_time,或者直到pred()为true。返回pred()的最后一次评估结果。 等价于:
// C++20 之前的近似实现
// auto end_time = std::chrono::steady_clock::now() + rel_time;
// while (!pred()) {
// if (wait_until(lock, end_time) == std::cv_status::timeout) {
// return pred(); // 可能超时后条件恰好满足
// }
// }
// return true;
// C++20 标准定义了更精确的行为
return wait_until(lock, std::chrono::steady_clock::now() + rel_time, std::move(pred));
- wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& abs_time);: std::cv_status 等待直到指定绝对时间点abs_time。如果超时或被唤醒,则返回。
- wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& abs_time, Predicate pred);: bool 等待直到指定绝对时间点abs_time,或者直到pred()为true。返回pred()的最后一次评估结果。
- notify_one() noexcept;: void 如果当前有任何线程在等待此条件变量,则唤醒其中一个。选择哪个线程被唤醒是不确定的。
- notify_all() noexcept;: void 唤醒所有当前正在等待此条件变量的线程。
- native_handle();: native_handle_type 返回底层实现的句柄。
std::cv_status 是一个枚举类型:
enum class cv_status { no_timeout, timeout };
生产者-消费者示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
std::mutex mtx_queue;
std::condition_variable cv_queue;
std::queue<int> data_queue;
bool finished_producing = false;
const int MAX_QUEUE_SIZE = 5;
void producer(int num_items) {
for (int i = 0; i < num_items; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
{
std::unique_lock<std::mutex> lock(mtx_queue);
// 等待队列有空间 (如果队列已满)
cv_queue.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });
data_queue.push(i);
std::cout << "Producer produced: " << i << " (Queue size: " << data_queue.size() << ")" << std::endl;
}
cv_queue.notify_one(); // 通知一个消费者
}
{
std::unique_lock<std::mutex> lock(mtx_queue);
finished_producing = true;
std::cout << "Producer finished." << std::endl;
}
cv_queue.notify_all(); // 通知所有可能在等待的消费者,生产已结束
}
void consumer(int id) {
while (true) {
int item;
{
std::unique_lock<std::mutex> lock(mtx_queue);
// 等待队列非空,或者生产已结束且队列为空
cv_queue.wait(lock, [] { return !data_queue.empty() || finished_producing; });
if (data_queue.empty() && finished_producing) {
std::cout << "Consumer " << id << " exiting as queue is empty and production finished." << std::endl;
break; // 生产结束且队列已空,退出
}
if (data_queue.empty()) { // 可能被唤醒但队列仍空(例如,其他消费者更快)
std::cout << "Consumer " << id << " woke up but queue empty, re-waiting." << std::endl;
continue;
}
item = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << item << " (Queue size: " << data_queue.size() << ")" << std::endl;
}
cv_queue.notify_one(); // 通知生产者队列有空间了 (如果之前是满的)
std::this_thread::sleep_for(std::chrono::milliseconds(250)); // 模拟消费耗时
}
}
int main() {
std::thread prod_thread(producer, 20);
std::vector<std::thread> cons_threads;
for (int i = 0; i < 3; ++i) {
cons_threads.emplace_back(consumer, i);
}
prod_thread.join();
for (auto& t : cons_threads) {
t.join();
}
std::cout << "All threads finished." << std::endl;
return 0;
}
在这个例子中:
- 生产者在队列满时等待,通过cv_queue.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });。
- 消费者在队列空时等待,通过cv_queue.wait(lock, [] { return !data_queue.empty() || finished_producing; });。
- notify_one()用于在生产者放入数据后通知一个消费者,或消费者取出数据后通知生产者(如果队列之前是满的)。
- notify_all()在生产者完成所有生产后使用,以确保所有等待的消费者都能检查到finished_producing状态并最终退出。
4. std::condition_variable_any详解
std::condition_variable_any与std::condition_variable功能类似,但它可以与任何满足C++标准中Lockable要求的锁类型一起工作,而不仅仅是std::unique_lock<std::mutex>。这包括如std::shared_lock<std::shared_mutex>等。
主要区别:
- wait系列函数的第一个参数是泛型锁类型Lock& lock。
- 通常性能可能略逊于std::condition_variable,因为它需要处理更通用的锁类型。
使用场景: 当需要与std::shared_mutex等更复杂的锁机制(例如,允许多个读者或一个写者的场景)结合使用条件变量时,
std::condition_variable_any非常有用。例如,一个线程可能需要等待某个条件,而这个条件由一个持有共享锁的线程来通知。
#include <shared_mutex> // For std::shared_mutex, std::shared_lock
std::shared_mutex s_mtx;
std::condition_variable_any cv_any;
bool ready_for_readers = false;
std::string shared_document_content;
void writer_thread() {
std::unique_lock<std::shared_mutex> lock(s_mtx); // 独占锁
std::cout << "Writer preparing document..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
shared_document_content = "This is the final document.";
ready_for_readers = true;
std::cout << "Writer finished. Notifying readers." << std::endl;
lock.unlock(); // 可以在通知前解锁,如果通知本身不需要保护
cv_any.notify_all();
}
void reader_thread(int id) {
std::shared_lock<std::shared_mutex> lock(s_mtx); // 共享锁
std::cout << "Reader " << id << " waiting for document..." << std::endl;
// 等待 ready_for_readers 变为 true
cv_any.wait(lock, []{ return ready_for_readers; });
// lock 被重新获取
std::cout << "Reader " << id << " can now read: " << shared_document_content << std::endl;
}
int main() {
std::thread wt(writer_thread);
std::vector<std::thread> rts;
for (int i = 0; i < 5; ++i) {
rts.emplace_back(reader_thread, i);
}
wt.join();
for (auto& rt : rts) {
rt.join();
}
return 0;
}
在这个例子中,读者线程使用std::shared_lock来等待,而写者线程使用std::unique_lock。
std::condition_variable_any使得这种混合锁类型的同步成为可能。
5. 虚假唤醒 (Spurious Wakeups)
一个非常重要的概念是“虚假唤醒”。即使没有相应的notify_one()或notify_all()调用,等待在条件变量上的线程也可能被唤醒。这是操作系统底层线程调度和实现细节可能导致的一种现象。
因此,标准强烈建议(实际上是强制要求正确性)将wait调用放在一个循环中,并在每次唤醒后重新检查实际的条件。
std::unique_lock<std::mutex> lk(m);
while (!condition_is_met()) { // 必须循环检查
cv.wait(lk);
}
使用带谓词的wait版本(如cv.wait(lk, []{ return condition_is_met(); });)是更简洁和安全的方式,因为它内部已经实现了这个循环检查逻辑。
6. notify_one()vs notify_all()
- notify_one():
- 唤醒一个(任意的)正在等待的线程。
- 效率更高,因为它只涉及一个线程的上下文切换。
- 适用于只有一个线程能够从条件变化中受益的情况,或者所有等待线程执行相同操作且一次一个就足够的情况(例如,从队列中取出一个元素)。
- 如果多个线程都在等待同一个条件,但只有部分线程在条件满足时可以继续(例如,资源有限),使用notify_one后,被唤醒的线程处理完后可能需要再次notify_one以唤醒下一个可能的受益者。
- notify_all():
- 唤醒所有正在等待的线程。
- 开销较大,因为它可能导致多个线程同时被唤醒(“惊群效应”,Thundering Herd),它们都会尝试获取锁,但最终只有一个能成功,其余的会再次阻塞。
- 适用于以下情况:
- 多个线程可能都从条件变化中受益(例如,广播一个状态变化,所有等待者都需要响应)。
- 条件的变化可能使得多个不同的等待条件同时满足。
- 无法确定哪个等待线程应该被唤醒。
- 生产者完成所有工作,通知所有消费者可以退出了。
选择策略:
- 如果不确定,或者唤醒所有线程是安全的并且逻辑上正确,使用notify_all()通常更简单、更不容易出错,尽管可能效率稍低。
- 如果能明确只有一个线程需要被唤醒,并且唤醒一个就足够,使用notify_one()。
- 仔细考虑条件和等待者的逻辑。如果一个notify_one可能导致某个应该被唤醒的线程没有被唤醒(例如,它等待的特定子条件没有被满足,而另一个线程被错误地唤醒),那么notify_all可能是更安全的选择。
7. 应用场景
- 生产者-消费者模型:如前例所示,控制对共享缓冲区的访问。
- 线程池任务队列:工作线程等待任务队列中有新任务。
- 事件通知:一个线程等待某个特定事件发生,由另一个线程通知。
- 屏障(Barrier)同步:等待一组线程都到达某个同步点后再一起继续执行(虽然std::barrier (C++20) 更适合此场景)。
- 资源可用性等待:线程等待某个资源(如数据库连接)变为可用。
8. 注意事项与最佳实践
- 始终与互斥量配合:条件变量本身不提供互斥保护,共享状态的读写必须由互斥量保护。
- 使用std::unique_lock (或兼容锁):std::condition_variable::wait系列函数要求传入std::unique_lock<std::mutex>。std::condition_variable_any则更灵活。
- 在循环中检查条件并等待:由于虚假唤醒,必须使用循环。带谓词的wait是首选。
- 谓词必须无副作用且快速:传递给wait的谓词函数会在持有锁的情况下被调用,应避免耗时操作或修改受保护状态之外的东西。
- 通知时机:通常在修改完共享状态、条件可能已满足之后,再调用notify_one()或notify_all()。可以在持有锁时通知,也可以在释放锁之后通知。在释放锁后通知可以减少被唤醒线程重新获取锁的等待时间,但需要确保通知前的状态修改对被唤醒线程可见(通常通过锁的内存序保证)。标准库的实现通常能很好地处理这两种情况。
- 小心死锁:确保锁的获取顺序一致,避免在持有锁的情况下调用可能导致阻塞的代码(除了wait本身)。
- 选择std::condition_variable或std::condition_variable_any:如果只需要与std::mutex配合,std::condition_variable通常更高效。如果需要与std::shared_mutex等其他锁类型配合,则使用std::condition_variable_any。
9. 总结
std::condition_variable和
std::condition_variable_any是C++并发编程中用于实现复杂线程同步和协作的关键工具。它们提供了一种高效的方式,让线程能够等待特定条件成立而避免忙等待,并通过通知机制在条件满足时唤醒等待的线程。
正确使用条件变量(特别是处理好与互斥量的配合、循环等待和虚假唤醒)对于构建健壮、无死锁且高效的并发应用程序至关重要。理解notify_one与notify_all的区别和适用场景,以及选择合适的条件变量类型,也是开发者需要掌握的核心技能。