c++多线程(二)线程间共享数据

  • Post author:
  • Post category:其他



来源:微信公众号「编程学习基地」



1. 共享数据带来的问题

​ 当涉及到共享数据时,问题很可能是因为共享数据修改所导致。如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。

​ 最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。C++标准库提供很多类似的机制,下面会逐一介绍。

​ 另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程(

lock-free programming

)。不过,这种方式很难得到正确的结果。

保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量(

mutex

)。



2.使用互斥量保护共享数据

当访问共享数据前,使用互斥量将相关数据锁住,再当访问结束后,再将数据解锁。



2.1C++中使用互斥量
#include<iostream>
#include<thread>
#include<list>
#include<mutex>
#include <algorithm>
using namespace std;
class DateList{
    std::list<int> data_list;
    std::mutex data_mutex;

public:
    int addVal(int val){
        std::lock_guard<std::mutex> guard(data_mutex);
        data_list.push_back(val);
    }
    int findVal(int val){
        std::lock_guard<std::mutex> guard(data_mutex);
        std::list<int>::iterator ite = find(data_list.begin(),data_list.end(),val);
        return ite!=data_list.end()?*ite:-1;
    }
};

int main(){
    DateList clist;
    clist.addVal(42);
    std::cout<<"contains(1)="<<clist.findVal(1)<<", contains(42)="<<clist.findVal(42)<<std::endl;
    return 0;
}

在大多数情况下,互斥量通常会与保护的数据放在同一个类中;当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。



2.2接口内在的条件竞争

因为使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?并不是,例如双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。为了从列表中删除一个节点,其两边节点的指针都需要更新。为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生。整个数据结构和整个删除操作需要保护。

以构建一个类似于

std::stack

结构的栈为例,除了构造函数和swap()以外,需要对

std::stack

提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素。

template<typename T,typename Container=std::deque<T> >
class stack
{
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);
  void push(T&&);
  void pop();
  void swap(stack&&);
};

虽然empty()和size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。

stack<int> s;
if (! s.empty()){    // 1
  int const value = s.top();    // 2
  s.pop();    // 3
  do_something(value);
}

在单线程情况下以上代码是安全的,但是对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。

表2.2-1 一种可能执行顺序

Thread A Thread B
if (!s.empty);
if(!s.empty);
int const value = s.top();
int const value = s.top();
s.pop();
do_something(value); s.pop();
do_something(value);

解决的方法也就是改变接口设计。


选项1: 传入一个引用


选项2:无异常抛出的拷贝构造函数或移动构造函数


选项3:返回指向弹出值的指针


选项4:“选项1 + 选项2”或 “选项1 + 选项3”


定义线程安全的堆栈

#if 0   //
#include<iostream>
#include<thread>
#include<list>
#include<mutex>
#include <algorithm>
using namespace std;
class DateList{
    std::list<int> data_list;
    std::mutex data_mutex;

public:
    int addVal(int val){
        std::lock_guard<std::mutex> guard(data_mutex);
        data_list.push_back(val);
    }
    int findVal(int val){
        std::lock_guard<std::mutex> guard(data_mutex);
        std::list<int>::iterator ite = find(data_list.begin(),data_list.end(),val);
        return ite!=data_list.end()?*ite:-1;
    }
};

int main(){
    DateList clist;
    clist.addVal(42);
    std::cout<<"contains(1)="<<clist.findVal(1)<<", contains(42)="<<clist.findVal(42)<<std::endl;
    return 0;
}
#endif

#include <exception>
#include <memory>
#include <stack>
#include <mutex>
#include <iostream>
using namespace std;

struct empty_stack: std::exception
{
    const char* what() const throw()
    {
        return "empty stack";
    }
    
};

template<typename T>
class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack(){}
    threadsafe_stack(const threadsafe_stack& other){
        std::lock_guard<std::mutex> guard(other.m);
        data = other.data;
    }
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;

    void push(T new_value){
        std::lock_guard<std::mutex> guard(m);
        data.push(new_value);
    }
    std::shared_ptr<T> pop(){
        std::lock_guard<std::mutex> guard(m);
        if(data.empty()) throw empty_stack();
        std::shared_ptr<T> const ptr = std::make_shared<T>(data.top());
        data.pop();
        return ptr;
    }
    void pop(T& value){
        std::lock_guard<std::mutex> guard(m);
        if (data.empty)
        {
            throw empty_stack();
        }
        value = data.top();
        data.pop();
    }
    bool empty() const{
        std::lock_guard<std::mutex> guard(m);
        return data.empty();
    }
};

int main()
{
    threadsafe_stack<int> my_stack;
    try{
        my_stack.push(10);
        shared_ptr<int> ptr = my_stack.pop();
        cout << *(ptr.get()) << endl;
    }catch(const std::exception &e){
        printf("catch error.. ##%s\n",e.what());
    }
}


2.3死锁及解决方案

线程中的一个或者多个又或者全部都在等待某个资源被释放,造成线程无限期的阻塞,导致程序不能正常终止的情况称为死锁。

线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。

避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。

解决办法:

std::lock

——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。

#include <mutex>
#include <iostream>
using namespace std;
class some_big_object
{};

void swap(some_big_object& lhs,some_big_object& rhs)
{
    cout<<" swap ???"<<endl;
}

class X
{
private:
    some_big_object some_detail;
    mutable std::mutex m;
public:
    X(some_big_object const& sd):some_detail(sd){}

    friend void swap(X& lhs, X& rhs)
    {
        cout<<"friend swap"<<endl;
        if(&lhs==&rhs)
            return;
        // lhs.m.lock();
        // rhs.m.lock();
        std::lock(lhs.m,rhs.m);  //可以一次锁住两个或者两个以上的互斥量(最少锁两个)
        //表示std::lock_guard对象已经上锁外,还表示现成的锁,而非尝试创建新的锁,只是拥有锁的所有权,不会重新上锁
        std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);  
        std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);  //只是拥有锁的所有权,不会重新上锁
        swap(lhs.some_detail,rhs.some_detail);
        // lhs.m.unlock();
        // rhs.m.unlock();
    }
};

int main()
{
    some_big_object o,b;
    X x(o);
    X y(b);
    swap(x,y);

    cout << "swap over" <<endl;
}


2.4std::unique_lock


std::unqiue_lock

使用更为自由的不变量,这样

std::unique_lock

实例不会总与互斥量的数据类型相关,使用起来要比

std:lock_guard

更加灵活。

#include <mutex>
#include <iostream>
using namespace std;
class some_big_object
{};

void swap(some_big_object& lhs,some_big_object& rhs)
{
    cout<<" swap ???"<<endl;
}

class X
{
private:
    some_big_object some_detail;
    mutable std::mutex m;
public:
    X(some_big_object const& sd):some_detail(sd){}

    friend void swap(X& lhs, X& rhs)
    {
        cout<<"friend swap"<<endl;
        if(&lhs==&rhs)
            return;
        #if 0
        std::lock(lhs.m,rhs.m);  //可以一次锁住两个或者两个以上的互斥量(最少锁两个)
        std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);  
        std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);  //假设调用方线程已拥有互斥的所有权
        #elif 0
        std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);    //defer_lock 延迟锁,需要调用lock
        std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);    //不获取互斥的所有权
        std::lock(lock_a,lock_b);   //注意这里锁的是unique_lock
        #else
        std::lock(lhs.m,rhs.m);   //注意这里锁的是std::mutex 
        std::unique_lock<std::mutex> lock_a(lhs.m, std::adopt_lock);    //adopt_lock 修饰的对象不能调用lock
        std::unique_lock<std::mutex> lock_b(rhs.m, std::adopt_lock);    //不获取互斥的所有权
        #endif
        swap(lhs.some_detail,rhs.some_detail);
    }
    void lockX(){
        m.lock();
        m.unlock();
    }
};

int main()
{
    some_big_object o,b;
    X x(o);
    X y(b);
    try{
    swap(x,y);
    }
    catch(const std::exception &e){
        cout << "exception info:" << e.what() <<endl;
    }
    
    // cout << "try to lock" <<endl;
    // x.lockX();
    cout << "swap over" <<endl;
}

比较

类型 效果

defer_lock_t
不获得互斥的所有权

try_to_lock_t
尝试获得互斥的所有权而不阻塞

adopt_lock_t
假设调用方线程已拥有互斥的所有权


2.5不同域中互斥量所有权的传递


std::unique_lock

实例没有与自身相关的互斥量,一个互斥量的所有权可以通过移动操作,在不同的实例中进行传递。

一种使用可能是允许一个函数去锁住一个互斥量,并且将所有权移到调用者上,所以调用者可以在这个锁保护的范围内执行额外的动作

下面的程序片段展示了:函数get_lock()锁住了互斥量,然后准备数据,返回锁的调用函数:

#include <mutex>
#include <iostream>
using namespace std;

std::mutex some_mutex;
std::unique_lock<std::mutex> get_lock()
{
    some_mutex.lock();
    //不指定第二个参数,unique_lock同guard_lock,在构造时加锁,析构时解锁
    std::unique_lock<std::mutex> lk(some_mutex,std::adopt_lock);  
    return lk; // 1
}
void process_data()
{
    std::unique_lock<std::mutex> lk(get_lock());  // 2    将锁的所有权进行转移
    // some_mutex.lock(); //这里不需要加锁,依旧在锁的保护范围内,重复加锁会造成死锁
}

int main(){
    process_data();
    some_mutex.lock();  //此局域网内不在锁的范围,需要加锁才能保证数据唯一性
    // do something
    some_mutex.unlock();
    return 0;
}


2.6锁的粒度

问题:你说说在进行加锁的时候需要注意什么。

答:需要注意锁的粒度,尽量缩小锁的范围,执行必要的操作时,尽可能将持有锁的时间缩减到最小。这也就意味有一些浪费时间的操作,例如对文件的输入/输出操作进行上锁



3.保护共享数据的替代设施

互斥量是最通用的机制,但其并非保护共享数据的唯一方式。这里有很多替代方式可以在特定情况下,提供更加合适的保护。



3.1保护共享数据的初始化过程

以单例模式为例

#include <mutex>
#include <iostream>
#include <memory>
using namespace std;

class Base{
    static Base* g_instance;
    static std::mutex g_mutex;

public:
    static Base* Instance(){
        if(Base::g_instance==NULL){
            std::lock_guard<std::mutex> guard(Base::g_mutex);
            if (Base::g_instance == NULL)
            {
                cout << "new base.." << endl;
                Base::g_instance = new Base();
            }
        }
        return Base::g_instance;
    }
};

Base* Base::g_instance = NULL;
std::mutex Base::g_mutex;

/**
 * @brief 使用std::call_once作为类成员的延迟初始化(线程安全)
 */
std::once_flag g_resource_flag;
class Test{
    static Test* g_instance;
    Test(){}
    Test(Test&){}
public:
    static Test* Instance(){
        std::call_once(
            g_resource_flag,
            [=](){
                cout << "new Test.." << endl;
                g_instance = new Test();
            });
        return g_instance;
    }
};

Test* Test::g_instance = NULL;

int main(){
    // Base::Instance();
    // Base::Instance();
    Test::Instance();
    Test::Instance();
    return 0;
}

使用

std::call_once

作为类成员的延迟初始化(线程安全)



3.2保护很少更新的数据结构

读写锁,c++未提供相关标准,boost库有对其的补充



版权声明:本文为qq_44519484原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。