C++并发同步利器-condition_variable:优雅的线程等待与唤醒

C++并发同步利器-condition_variable:优雅的线程等待与唤醒

编码文章call10242025-08-02 15:25:393A+A-

在复杂的并发程序中,线程之间常常需要相互等待某个条件成立才能继续执行。例如,生产者线程生产数据后通知消费者线程,或者多个工作线程等待某个共享状态变为特定值。C++标准库通过<condition_variable>头文件提供了std::condition_variable
std::condition_variable_any
,它们是实现这种等待/通知机制的关键同步原语。

1. <condition_variable>库简介

条件变量(Condition Variable)允许一个或多个线程等待(阻塞)直到另一个线程修改了某个共享状态(条件)并通知条件变量。条件变量总是与一个互斥量(通常是std::mutex)配合使用,以保护共享状态的访问并避免竞态条件。

核心工作流程

  1. 等待方线程: a. 获取与条件变量关联的互斥锁(通常通过std::unique_lock)。 b. 检查条件是否满足。如果条件已满足,则继续执行,无需等待。 c. 如果条件不满足,则调用条件变量的wait(或其变体wait_for, wait_until)方法。此方法会原子地: i. 释放互斥锁。 ii. 将当前线程置于阻塞状态,等待通知。 d. 当被唤醒时(由其他线程通知或虚假唤醒),wait方法会重新获取互斥锁,然后返回。 e. 等待方线程必须再次检查条件是否真正满足(因为可能是虚假唤醒,或者条件在唤醒后又被其他线程改变了),如果条件仍不满足,则继续循环等待。
  2. 通知方线程: a. 获取与条件变量关联的同一个互斥锁。 b. 修改共享状态,使得等待条件可能成立。 c. 调用条件变量的notify_one()(唤醒一个等待线程)或notify_all()(唤醒所有等待线程)方法。 d. 释放互斥锁。

<condition_variable>库主要提供两种条件变量:

  • std::condition_variable: 只能与std::unique_lock<std::mutex>一起使用。
  • std::condition_variable_any: 可以与任何满足基本锁需求的锁类型(Lockable)一起使用,例如std::shared_lock。这提供了更大的灵活性,但通常性能略低于std::condition_variable

2. <condition_variable>库的特点

  • 线程间同步:允许线程基于特定条件进行等待和唤醒,实现复杂的协作逻辑。
  • 配合互斥量:必须与互斥量一起使用,以保护共享条件和避免竞态。
  • 避免忙等待:线程在等待条件时会阻塞,释放CPU资源,而不是通过轮询消耗CPU(忙等待)。
  • 虚假唤醒(Spurious Wakeups)wait操作可能会在没有相应通知的情况下返回。因此,等待方总是在循环中检查条件。
  • notify_one vs notify_all:提供精确控制,唤醒单个或所有等待线程。
  • 超时等待wait_forwait_until允许线程等待一段时间或直到某个时间点。

3. std::condition_variable详解

std::condition_variable不可复制,不可移动。

成员函数

  • condition_variable() noexcept;: 默认构造函数。
  • ~condition_variable();: 析构函数。如果在析构时仍有线程在等待此条件变量,行为是未定义的。
  • wait(std::unique_lock<std::mutex>& lock);: void 当前线程必须已锁定lock所管理的互斥量。此函数原子地释放lock,并阻塞当前线程,直到被notify_one()notify_all()唤醒。唤醒后,函数会重新锁定lock然后返回。 必须在循环中调用并检查条件
   std::unique_lock<std::mutex> lk(mtx);
   cv.wait(lk, []{ return shared_condition_is_true; }); // 带谓词的版本
   // 等价于 (但不完全相同,因为谓词版本内部处理了虚假唤醒的循环):
   // while (!shared_condition_is_true) {
   //     cv.wait(lk);
   // }
  • wait(std::unique_lock<std::mutex>& lock, Predicate pred);: void Predicate是一个可调用对象,返回bool。此版本等价于:
   while (!pred()) {
       wait(lock);
   }

这是推荐的等待方式,因为它内部处理了虚假唤醒的循环。

  • wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time);: std::cv_status 等待指定的相对时间rel_time。如果超时或被唤醒,则返回。 返回std::cv_status::timeout如果超时,std::cv_status::no_timeout如果因通知而返回。
  • wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time, Predicate pred);: bool 等待指定的相对时间rel_time,或者直到pred()true。返回pred()的最后一次评估结果。 等价于:
   // C++20 之前的近似实现
   // auto end_time = std::chrono::steady_clock::now() + rel_time;
   // while (!pred()) {
   //     if (wait_until(lock, end_time) == std::cv_status::timeout) {
   //         return pred(); // 可能超时后条件恰好满足
   //     }
   // }
   // return true;
   // C++20 标准定义了更精确的行为
   return wait_until(lock, std::chrono::steady_clock::now() + rel_time, std::move(pred));
  • wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& abs_time);: std::cv_status 等待直到指定绝对时间点abs_time。如果超时或被唤醒,则返回。
  • wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& abs_time, Predicate pred);: bool 等待直到指定绝对时间点abs_time,或者直到pred()true。返回pred()的最后一次评估结果。
  • notify_one() noexcept;: void 如果当前有任何线程在等待此条件变量,则唤醒其中一个。选择哪个线程被唤醒是不确定的。
  • notify_all() noexcept;: void 唤醒所有当前正在等待此条件变量的线程。
  • native_handle();: native_handle_type 返回底层实现的句柄。

std::cv_status 是一个枚举类型:

 enum class cv_status { no_timeout, timeout };

生产者-消费者示例

 #include <iostream>
 #include <thread>
 #include <mutex>
 #include <condition_variable>
 #include <queue>
 #include <chrono>
 
 std::mutex mtx_queue;
 std::condition_variable cv_queue;
 std::queue<int> data_queue;
 bool finished_producing = false;
 const int MAX_QUEUE_SIZE = 5;
 
 void producer(int num_items) {
     for (int i = 0; i < num_items; ++i) {
         std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
         {
             std::unique_lock<std::mutex> lock(mtx_queue);
             // 等待队列有空间 (如果队列已满)
             cv_queue.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });
             
             data_queue.push(i);
             std::cout << "Producer produced: " << i << " (Queue size: " << data_queue.size() << ")" << std::endl;
         }
         cv_queue.notify_one(); // 通知一个消费者
     }
 
     {
         std::unique_lock<std::mutex> lock(mtx_queue);
         finished_producing = true;
         std::cout << "Producer finished." << std::endl;
     }
     cv_queue.notify_all(); // 通知所有可能在等待的消费者,生产已结束
 }
 
 void consumer(int id) {
     while (true) {
         int item;
         {
             std::unique_lock<std::mutex> lock(mtx_queue);
             // 等待队列非空,或者生产已结束且队列为空
             cv_queue.wait(lock, [] { return !data_queue.empty() || finished_producing; });
 
             if (data_queue.empty() && finished_producing) {
                 std::cout << "Consumer " << id << " exiting as queue is empty and production finished." << std::endl;
                 break; // 生产结束且队列已空,退出
             }
             
             if (data_queue.empty()) { // 可能被唤醒但队列仍空(例如,其他消费者更快)
                 std::cout << "Consumer " << id << " woke up but queue empty, re-waiting." << std::endl;
                 continue;
             }
 
             item = data_queue.front();
             data_queue.pop();
             std::cout << "Consumer " << id << " consumed: " << item << " (Queue size: " << data_queue.size() << ")" << std::endl;
         }
         cv_queue.notify_one(); // 通知生产者队列有空间了 (如果之前是满的)
         std::this_thread::sleep_for(std::chrono::milliseconds(250)); // 模拟消费耗时
     }
 }
 
 int main() {
     std::thread prod_thread(producer, 20);
     std::vector<std::thread> cons_threads;
     for (int i = 0; i < 3; ++i) {
         cons_threads.emplace_back(consumer, i);
     }
 
     prod_thread.join();
     for (auto& t : cons_threads) {
         t.join();
     }
 
     std::cout << "All threads finished." << std::endl;
     return 0;
 }

在这个例子中:

  • 生产者在队列满时等待,通过cv_queue.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });
  • 消费者在队列空时等待,通过cv_queue.wait(lock, [] { return !data_queue.empty() || finished_producing; });
  • notify_one()用于在生产者放入数据后通知一个消费者,或消费者取出数据后通知生产者(如果队列之前是满的)。
  • notify_all()在生产者完成所有生产后使用,以确保所有等待的消费者都能检查到finished_producing状态并最终退出。

4. std::condition_variable_any详解


std::condition_variable_any
std::condition_variable功能类似,但它可以与任何满足C++标准中Lockable要求的锁类型一起工作,而不仅仅是std::unique_lock<std::mutex>。这包括如std::shared_lock<std::shared_mutex>等。

主要区别

  • wait系列函数的第一个参数是泛型锁类型Lock& lock
  • 通常性能可能略逊于std::condition_variable,因为它需要处理更通用的锁类型。

使用场景: 当需要与std::shared_mutex等更复杂的锁机制(例如,允许多个读者或一个写者的场景)结合使用条件变量时,
std::condition_variable_any
非常有用。例如,一个线程可能需要等待某个条件,而这个条件由一个持有共享锁的线程来通知。

 #include <shared_mutex> // For std::shared_mutex, std::shared_lock
 
 std::shared_mutex s_mtx;
 std::condition_variable_any cv_any;
 bool ready_for_readers = false;
 std::string shared_document_content;
 
 void writer_thread() {
     std::unique_lock<std::shared_mutex> lock(s_mtx); // 独占锁
     std::cout << "Writer preparing document..." << std::endl;
     std::this_thread::sleep_for(std::chrono::seconds(2));
     shared_document_content = "This is the final document.";
     ready_for_readers = true;
     std::cout << "Writer finished. Notifying readers." << std::endl;
     lock.unlock(); // 可以在通知前解锁,如果通知本身不需要保护
     cv_any.notify_all();
 }
 
 void reader_thread(int id) {
     std::shared_lock<std::shared_mutex> lock(s_mtx); // 共享锁
     std::cout << "Reader " << id << " waiting for document..." << std::endl;
     // 等待 ready_for_readers 变为 true
     cv_any.wait(lock, []{ return ready_for_readers; }); 
     // lock 被重新获取
     std::cout << "Reader " << id << " can now read: " << shared_document_content << std::endl;
 }
 
 int main() {
     std::thread wt(writer_thread);
     std::vector<std::thread> rts;
     for (int i = 0; i < 5; ++i) {
         rts.emplace_back(reader_thread, i);
     }
 
     wt.join();
     for (auto& rt : rts) {
         rt.join();
     }
     return 0;
 }

在这个例子中,读者线程使用std::shared_lock来等待,而写者线程使用std::unique_lock
std::condition_variable_any
使得这种混合锁类型的同步成为可能。

5. 虚假唤醒 (Spurious Wakeups)

一个非常重要的概念是“虚假唤醒”。即使没有相应的notify_one()notify_all()调用,等待在条件变量上的线程也可能被唤醒。这是操作系统底层线程调度和实现细节可能导致的一种现象。

因此,标准强烈建议(实际上是强制要求正确性)将wait调用放在一个循环中,并在每次唤醒后重新检查实际的条件。

 std::unique_lock<std::mutex> lk(m);
 while (!condition_is_met()) { // 必须循环检查
     cv.wait(lk);
 }

使用带谓词的wait版本(如cv.wait(lk, []{ return condition_is_met(); });)是更简洁和安全的方式,因为它内部已经实现了这个循环检查逻辑。

6. notify_one()vs notify_all()

  • notify_one()
    • 唤醒一个(任意的)正在等待的线程。
    • 效率更高,因为它只涉及一个线程的上下文切换。
    • 适用于只有一个线程能够从条件变化中受益的情况,或者所有等待线程执行相同操作且一次一个就足够的情况(例如,从队列中取出一个元素)。
    • 如果多个线程都在等待同一个条件,但只有部分线程在条件满足时可以继续(例如,资源有限),使用notify_one后,被唤醒的线程处理完后可能需要再次notify_one以唤醒下一个可能的受益者。
  • notify_all()
    • 唤醒所有正在等待的线程。
    • 开销较大,因为它可能导致多个线程同时被唤醒(“惊群效应”,Thundering Herd),它们都会尝试获取锁,但最终只有一个能成功,其余的会再次阻塞。
    • 适用于以下情况:
      • 多个线程可能都从条件变化中受益(例如,广播一个状态变化,所有等待者都需要响应)。
      • 条件的变化可能使得多个不同的等待条件同时满足。
      • 无法确定哪个等待线程应该被唤醒。
      • 生产者完成所有工作,通知所有消费者可以退出了。

选择策略

  • 如果不确定,或者唤醒所有线程是安全的并且逻辑上正确,使用notify_all()通常更简单、更不容易出错,尽管可能效率稍低。
  • 如果能明确只有一个线程需要被唤醒,并且唤醒一个就足够,使用notify_one()
  • 仔细考虑条件和等待者的逻辑。如果一个notify_one可能导致某个应该被唤醒的线程没有被唤醒(例如,它等待的特定子条件没有被满足,而另一个线程被错误地唤醒),那么notify_all可能是更安全的选择。

7. 应用场景

  • 生产者-消费者模型:如前例所示,控制对共享缓冲区的访问。
  • 线程池任务队列:工作线程等待任务队列中有新任务。
  • 事件通知:一个线程等待某个特定事件发生,由另一个线程通知。
  • 屏障(Barrier)同步:等待一组线程都到达某个同步点后再一起继续执行(虽然std::barrier (C++20) 更适合此场景)。
  • 资源可用性等待:线程等待某个资源(如数据库连接)变为可用。

8. 注意事项与最佳实践

  • 始终与互斥量配合:条件变量本身不提供互斥保护,共享状态的读写必须由互斥量保护。
  • 使用std::unique_lock (或兼容锁)std::condition_variable::wait系列函数要求传入std::unique_lock<std::mutex>std::condition_variable_any则更灵活。
  • 在循环中检查条件并等待:由于虚假唤醒,必须使用循环。带谓词的wait是首选。
  • 谓词必须无副作用且快速:传递给wait的谓词函数会在持有锁的情况下被调用,应避免耗时操作或修改受保护状态之外的东西。
  • 通知时机:通常在修改完共享状态、条件可能已满足之后,再调用notify_one()notify_all()。可以在持有锁时通知,也可以在释放锁之后通知。在释放锁后通知可以减少被唤醒线程重新获取锁的等待时间,但需要确保通知前的状态修改对被唤醒线程可见(通常通过锁的内存序保证)。标准库的实现通常能很好地处理这两种情况。
  • 小心死锁:确保锁的获取顺序一致,避免在持有锁的情况下调用可能导致阻塞的代码(除了wait本身)。
  • 选择std::condition_variablestd::condition_variable_any:如果只需要与std::mutex配合,std::condition_variable通常更高效。如果需要与std::shared_mutex等其他锁类型配合,则使用std::condition_variable_any

9. 总结

std::condition_variable
std::condition_variable_any
是C++并发编程中用于实现复杂线程同步和协作的关键工具。它们提供了一种高效的方式,让线程能够等待特定条件成立而避免忙等待,并通过通知机制在条件满足时唤醒等待的线程。

正确使用条件变量(特别是处理好与互斥量的配合、循环等待和虚假唤醒)对于构建健壮、无死锁且高效的并发应用程序至关重要。理解notify_onenotify_all的区别和适用场景,以及选择合适的条件变量类型,也是开发者需要掌握的核心技能。

点击这里复制本文地址 以上内容由文彬编程网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

文彬编程网 © All Rights Reserved.  蜀ICP备2024111239号-4