89 CPP条件变量-wait()探索

  • Post author:
  • Post category:其他


下面是生产者/消费者模型代码

#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <chrono>
#include <condition_variable>
using namespace std;
//
class AA
{
    mutex m_mutex;             //互斥锁
    condition_variable m_cond; //条件变量
    queue<string> m_q;         //缓存队列
public:
    void incache(int num) //生产数据,num指定数据的个数
    {
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++)
        {
            static int id = 1;
            string msg = to_string(id++) + "号选手";
            m_q.push(msg); //把生产出来的数据入队
        }
        // m_cond.notify_one(); //唤醒一个被当前条件变量阻塞的线程
        m_cond.notify_all(); //唤醒所有被当前条件变量阻塞的线程
    }

    void outcache() //消费者线程任务函数
    {
        string msg;
        while (true)
        {
            //加一个作用域,出作用域  unique_lock自动释放锁. 不加作用域也行,可以手动释放锁
            {
                //把互斥锁转换成unique_lock<mutex>,并申请加锁
                unique_lock<mutex> lock(m_mutex);
                //如果队列空,进入循环,否则字节处理数据,必须用循环,不能用if
                while (m_q.empty())
                {
                    cout << "进入休眠" << endl;
                    m_cond.wait(lock);
                }

                //数据出队
                msg = m_q.front();
                m_q.pop();
                cout << "线程:" << this_thread::get_id() << "处理数据," << msg << endl;
            }

            //处理出队的数据
            this_thread::sleep_for(chrono::milliseconds(1)); //假设处理数据需要消耗1毫秒
        }
    }
};
void test()
{
    AA aa;
    //给线程传递类的普通成员函数
    thread t1(&AA::outcache, &aa); //创建消费者线程t1
    thread t2(&AA::outcache, &aa); //创建消费者线程t2
    thread t3(&AA::outcache, &aa); //创建消费者线程t3

    this_thread::sleep_for(chrono::seconds(2));
    aa.incache(3);

    this_thread::sleep_for(chrono::seconds(3));
    aa.incache(5);

    t1.join();
    t2.join();
    t3.join();
}
int main()
{
    test();
    return 0;
}

其中

unique_lockmutex> lock(m_mutex);

while (m_q.empty())

{

cout “进入休眠” endl;

m_cond.wait(lock);

}

这几行代码很奇怪,逻辑上好像走不通,

消费者进入循环后,第一步是申请说,只有加锁成功的情况下,才有机会折射在条件变量的wait()函数中,如果多个消费者线程同时运行,那么只有一个线程申请锁成功,其它线程都会阻塞在申请互斥锁这里。上面的代码也就是说只有一个线程阻塞在wait函数中,其它线程都会阻塞在申请加锁那里。

来做一个测试,看看到底是不是上面分析的

#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <chrono>
#include <condition_variable>
using namespace std;
//
class AA
{
    mutex m_mutex;             //互斥锁
    condition_variable m_cond; //条件变量
    queue<string> m_q;         //缓存队列
public:
    void incache(int num) //生产数据,num指定数据的个数
    {
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++)
        {
            static int id = 1;
            string msg = to_string(id++) + "号选手";
            m_q.push(msg); //把生产出来的数据入队
        }
        // m_cond.notify_one(); //唤醒一个被当前条件变量阻塞的线程
        m_cond.notify_all(); //唤醒所有被当前条件变量阻塞的线程
    }

    void outcache() //消费者线程任务函数
    {
        string msg;
        while (true)
        {
            //加一个作用域,出作用域  unique_lock自动释放锁. 不加作用域也行,可以手动释放锁
            {
                //把互斥锁转换成unique_lock<mutex>,并申请加锁
                cout << "线程:" << this_thread::get_id() << ",申请加锁..." << endl;
                unique_lock<mutex> lock(m_mutex);
                cout << "线程:" << this_thread::get_id() << ",加锁成功" << endl;
                //如果队列空,进入循环,否则字节处理数据,必须用循环,不能用if
                while (m_q.empty())
                {
                    cout << "进入休眠" << endl;
                    m_cond.wait(lock);
                }

                //数据出队
                msg = m_q.front();
                m_q.pop();
                cout << "线程:" << this_thread::get_id() << "处理数据," << msg << endl;
            }

            //处理出队的数据
            this_thread::sleep_for(chrono::milliseconds(1)); //假设处理数据需要消耗1毫秒
        }
    }
};
void test()
{
    AA aa;
    //给线程传递类的普通成员函数
    thread t1(&AA::outcache, &aa); //创建消费者线程t1
    thread t2(&AA::outcache, &aa); //创建消费者线程t2
    thread t3(&AA::outcache, &aa); //创建消费者线程t3

    t1.join();
    t2.join();
    t3.join();
}
int main()
{
    test();
    return 0;
}
线程:4,申请加锁...
线程:4,加锁成功
进入休眠
线程:3,申请加锁...
线程:3,加锁成功
进入休眠
线程:2,申请加锁...
线程:2,加锁成功
进入休眠

可以看出 3个消费者都加锁成功了,所以上面的分析有问题。

再做个测试,在内层循环前休眠比较长的时间

#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <chrono>
#include <condition_variable>
using namespace std;
//
class AA
{
    mutex m_mutex;             //互斥锁
    condition_variable m_cond; //条件变量
    queue<string> m_q;         //缓存队列
public:
    void incache(int num) //生产数据,num指定数据的个数
    {
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++)
        {
            static int id = 1;
            string msg = to_string(id++) + "号选手";
            m_q.push(msg); //把生产出来的数据入队
        }
        // m_cond.notify_one(); //唤醒一个被当前条件变量阻塞的线程
        m_cond.notify_all(); //唤醒所有被当前条件变量阻塞的线程
    }

    void outcache() //消费者线程任务函数
    {
        string msg;
        while (true)
        {
            //加一个作用域,出作用域  unique_lock自动释放锁. 不加作用域也行,可以手动释放锁
            {
                //把互斥锁转换成unique_lock<mutex>,并申请加锁
                cout << "线程:" << this_thread::get_id() << ",申请加锁..." << endl;
                unique_lock<mutex> lock(m_mutex);
                cout << "线程:" << this_thread::get_id() << ",加锁成功" << endl;
                this_thread::sleep_for(chrono::hours(1));
                while (m_q.empty())
                {
                    cout << "进入休眠" << endl;
                    m_cond.wait(lock);
                }

                //数据出队
                msg = m_q.front();
                m_q.pop();
                cout << "线程:" << this_thread::get_id() << "处理数据," << msg << endl;
            }

            //处理出队的数据
            this_thread::sleep_for(chrono::milliseconds(1)); //假设处理数据需要消耗1毫秒
        }
    }
};
void test()
{
    AA aa;
    //给线程传递类的普通成员函数
    thread t1(&AA::outcache, &aa); //创建消费者线程t1
    thread t2(&AA::outcache, &aa); //创建消费者线程t2
    thread t3(&AA::outcache, &aa); //创建消费者线程t3

    t1.join();
    t2.join();
    t3.join();
}
int main()
{
    test();
    return 0;
}

/*线程:2,申请加锁...
线程:2,加锁成功
线程:4,申请加锁...
线程:3,申请加锁... */

从输出结果来看,只有一个线程加锁成功。另外两个线程被阻塞在申请加锁那里。

通过测试的结果可以看出,条件变量的wait()函数不只是等待生产者信号那么简单,还干了其它一些事情。

它干了3件事情

1 把互斥锁解锁

2 阻塞等待被唤醒

3 给互斥锁加锁

现在可以知道,因为条件变量的wait()函数把互斥锁解开了,所以这3个线程先后都能加锁成功,开始那几行难以理解的代码就解释的通了。

启动后,如果生产者没生产数据,3个消费者线程都被阻塞在条件变量的wait()函数中,这时候互斥锁,没有被人和线程持有,如果生产者要往队列中放数据,可以加锁成功,生产者往队列中放完数据之后,会发出条件信号,那么这3个线程的wait()函数会接受到条件信号,wait()函数接受到信号后,不一定立即返回,它还需要申请加锁,加锁成功了才会返回。如果wait()函数返回了,那么久一定申请到了锁。

申请到锁之后,就可以让队列中的数据出队,出队后再解锁。

普通的互斥锁为什么要转换成unique_lock之后才能用于条件变量呢?

template class unique_lock是模板类,模板参数为互斥锁类型。

unique_lock和lock_guard都是管理锁的辅助类,都是RAII风格(在构造时获得锁,在析构时释放锁)。

它们的区别在于:为了配合condition_variable,unique_lock还有lock()和unlock()成员函数。

另外还有一点需要注意,条件变量存在虚假唤醒的情况,消费者线程被唤醒,缓存队列中没有数据。为什么存在虚假唤醒的情况呢?

生产的数据比消费线程数少。代码如下

#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <chrono>
#include <condition_variable>
using namespace std;
//
class AA
{
    mutex m_mutex;             //互斥锁
    condition_variable m_cond; //条件变量
    queue<string> m_q;         //缓存队列
public:
    void incache(int num) //生产数据,num指定数据的个数
    {
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++)
        {
            static int id = 1;
            string msg = to_string(id++) + "号选手";
            m_q.push(msg); //把生产出来的数据入队
        }
        m_cond.notify_all(); //唤醒所有被当前条件变量阻塞的线程
    }

    void outcache() //消费者线程任务函数
    {
        string msg;
        while (true)
        {
            {
                unique_lock<mutex> lock(m_mutex);
                while (m_q.empty())
                {
                    m_cond.wait(lock);
                    cout << "线程:" << this_thread::get_id() << "被唤醒了" << endl;
                }

                //数据出队
                msg = m_q.front();
                m_q.pop();
                cout << "线程:" << this_thread::get_id() << "处理数据," << msg << endl;
            }
            this_thread::sleep_for(chrono::seconds(5));
        }
    }
};
void test()
{
    AA aa;
    //给线程传递类的普通成员函数
    thread t1(&AA::outcache, &aa); //创建消费者线程t1
    thread t2(&AA::outcache, &aa); //创建消费者线程t2
    thread t3(&AA::outcache, &aa); //创建消费者线程t3

    this_thread::sleep_for(chrono::seconds(2));
    aa.incache(2); //生成的数量比消费者线程数3少

    t1.join();
    t2.join();
    t3.join();
}
int main()
{
    test();
    return 0;
}
/* 线程:2被唤醒了
线程:2处理数据,1号选手
线程:4被唤醒了
线程:4处理数据,2号选手
线程:3被唤醒了 */

如果不用while(m_q.empty())也可以用wait的另外一个重载版本

    template<typename _Predicate>
      void
      wait(unique_lock<mutex>& __lock, _Predicate __p)
      {
  while (!__p())
    wait(__lock);
      }
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <chrono>
#include <condition_variable>
using namespace std;
//
class AA
{
    mutex m_mutex;             //互斥锁
    condition_variable m_cond; //条件变量
    queue<string> m_q;         //缓存队列
public:
    void incache(int num) //生产数据,num指定数据的个数
    {
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++)
        {
            static int id = 1;
            string msg = to_string(id++) + "号选手";
            m_q.push(msg); //把生产出来的数据入队
        }
        m_cond.notify_all(); //唤醒所有被当前条件变量阻塞的线程
    }

    void outcache() //消费者线程任务函数
    {
        string msg;
        while (true)
        {
            {
                unique_lock<mutex> lock(m_mutex);
                // while (m_q.empty())
                // {
                //     m_cond.wait(lock);
                //     cout << "线程:" << this_thread::get_id() << "被唤醒了" << endl;
                // }

                m_cond.wait(lock, [this]()
                            { return !m_q.empty(); }); //效果同上面while,这个重载的wait函数里面也有个while循环

                //数据出队
                msg = m_q.front();
                m_q.pop();
                cout << "线程:" << this_thread::get_id() << "处理数据," << msg << endl;
            }
            this_thread::sleep_for(chrono::seconds(5));
        }
    }
};
void test()
{
    AA aa;
    //给线程传递类的普通成员函数
    thread t1(&AA::outcache, &aa); //创建消费者线程t1
    thread t2(&AA::outcache, &aa); //创建消费者线程t2
    thread t3(&AA::outcache, &aa); //创建消费者线程t3

    this_thread::sleep_for(chrono::seconds(2));
    aa.incache(3); //生成的数量比消费者线程数3少

    t1.join();
    t2.join();
    t3.join();
}
int main()
{
    test();
    return 0;
}



版权声明:本文为wade1010原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。