C++98
标准中并没有线程库的存在。
C++11
中才提供了多线程的标准库,提供了
thread
、
mutex
、
condition_variable
、
atomic
等相关对象及功能功能。
1. 原子操作原理
-
原子操作:
能够操作最接近机器的指令,这个和硬件相关了,虽然和硬件相关,但我们的C++11还是整合了这一切,让原子操作有了共同的调用接口 -
目的:
使用这个的目的说实话,就是让你更了解机器已及多线程同步的原理和秘密,当然有一些需求较简单的,使用原子操作可能比封装好的更有效率!!用了百遍的mutex可能你现在还不知道他们是怎么互斥的~当然内部还是通过了原子操作来的! -
底层原理:
原子操作只有2种状态,一种是没做,一种是做完了,看不到正在做的状态,这个是在任何线程下都满足这个要求,当两个线程同时访问一块内存的时候,如果有任何一个在写,那肯定会产生竞争,如果两个同时读,没有问题,那如何用原子操作来控制不产生竞争呢?可以这样来想,当两个线程在访问的时候,一定有一个先后顺序,谁先访问,谁后访问,这就是修改顺序,我们要在任何线程可以看到这样的
顺序
!然后就可以通过一定的逻辑来处理并发竞争的情况了!
顺序:
关于原子操作,上述所说顺序
memory_order
定义如下(详情见
www.cplusplus.com
),此处只做简单的介绍。
typedef enum memory_order {
memory_order_relaxed, // relaxed
memory_order_consume, // consume
memory_order_acquire, // acquire
memory_order_release, // release
memory_order_acq_rel, // acquire/release
memory_order_seq_cst // sequentially consistent
} memory_order;
std::memory_order_relaxed
:
仅对此操作要求原子性,松散顺序
通俗将:
在某一线程中,先写入A,再写入B。但是在多核处理器中观测到的顺序可能是先写入B,再写入A。。
std::memory_order_seq_cst
:
顺序一致性,也是默认的选项
通俗将:
每个处理器的执行顺序和代码中的顺序(program order)一样。所有处理器都只能看到一个单一的操作执行顺序。但是它需要对所有线程进行全局同步,比其他的顺序造成更多的消耗。
std::memory_order_acquire
1. 当前线程中读或写不能被重排到
当前操作之前
。
2. 其他
release
同一原子变量的线程的所有写入,能被当前线程所见(
happens-before
)。
std::memory_order_release
1. 当前线程中的读或写不能被重排到
当前操作之后
。
2. 当前线程的所有写入,可见于获得该同一原子变量的其他线程(
happens-before
),并且对该原子变量的带依赖写入变得对于其他消费同一原子对象的线程可见。
std::memory_order_consume
:1. 当前线程中依赖于当前加载的该值的读或写不能被重排到
当前操作之前
。
2. 其他
release
同一原子变量的线程的对数据依赖变量的写入,为当前线程所可见。
std::memory_order_acq_rel
带此内存顺序的读修改写操作既是
release
操作 又是
acquire
操作。
1. 当前线程的读写操作不能被重排到
此操作前或后
。
2. 所有
release
同一原子变量的线程的写入可见于
当前操作之前
,而且修改可见于其他获得同一原子变量的线程。
-
虽然共有 6 个选项,但它们表示的是4种内存模型:
1.
sequential-consistent
:memory_order_seq_cst。
2.
relaxed
:memory_order_relaxed。
3.
acquire-release
:*_release &
_acquire。
4.
release-consume
:
_release & *_consume。 - 不同的内存序模型与 synchronized-with 关系结合时,将产生不同的结果。
2. atomic
std::atomic<>
是一个模板类,使用该模板类实例化的对象,提供了一些保证原子性的成员函数来实现共享数据的常用操作。其原型如下:
template <class T>
struct atomic;
-
简单来说,
std::atomic<>
就是用来定义一个自动加锁解锁的共享变量(“定义”“变量”用词在这里是不准确的,但是更加贴切它的实际功能),供多个线程访问而不发生冲突。
2.1 构造函数
-
atomic
的构造函数如下所示:
atomic() noexcept = default; // 1. default,调用atomic_init 进行初始化
constexpr atomic(T val) noexcept; // 2. initialization
atomic(const atomic&) = delete; // 3. copy [deleted]
测试代码:
std::atomic<int> num_1; // 1. 默认初始化,全局变量为 0初始化
std::atomic<int> num_2(999); // 2. 赋值
int main(){
std::atomic<int> num_loc; // 3. 默认初始化, 局部变量为 int 随机值
std::cout << "num_loc: " << num_loc << std::endl;
std::cout << "num_1: " << num_1 << std::endl;
std::cout << "num_2: " << num_2 << std::endl;
return 0;
}
num_loc: 4197984
num_1: 0
num_2: 999
3. 成员函数
-
atomic
成员函数如下所示:
-
此外,还有一些对应于具体化类型的成员函数
func-specializations
。如下图(左侧为具体化类型,右侧为函数名称):
-
当多个线程访问一个原子对象时,所有的原子操作都会产生
良好行为
:在任何其他原子操作可以访问它之前,每个原子操作都会在该对象上完全执行(即完全执行完当前的操作)。这保证了这些对象上没有数据竞争,而这正是定义原子性的特性。
3.1 laod、store、exchange
-
<1>.
load()
功能:原子地加载并返回原子变量的
当前值
。按照
order
的值影响内存。 -
order
必须是
memory_order_relaxed
memory_order_consume
memory_order_acquire
memory_order_seq_cst
之一。否则行为未定义。
T load(memory_order sync = memory_order_seq_cst) const volatile noexcept;
T load(memory_order sync = memory_order_seq_cst) const noexcept;
-
<2>.
store
原子地以参数
val
替换当前值。按照
order
的值影响内存。 -
order
必须是
memory_order_relaxed
memory_order_release
memory_order_seq_cst
之一。否则行为未定义。
void store(T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
void store(T val, memory_order sync = memory_order_seq_cst) noexcept;
测试代码:
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::memory_order_relaxed
#include <thread> // std::thread
std::atomic<int> foo(0);
void set_foo(int x) {
foo.store(x,std::memory_order_relaxed); // store(), 设置atomic
}
void load_foo() {
int x;
do {
x = foo.load(std::memory_order_relaxed); // load(), 读取atomic
} while (x==0);
std::cout << "foo: " << x << '\n';
}
int main ()
{
std::thread first(load_foo);
std::thread second(set_foo,10);
first.join();
second.join();
return 0;
}
foo: 10
-
<3>.
exchange()
原子地以 参数
val
替换底层值,并返回其之前的值。操作为
读-修改-写
操作。根据
order
的值影响内存。 -
order
必须是
memory_order_relaxed
memory_order_consume
memory_order_acquire
memory_order_release
memory_order_acq_rel
memory_order_seq_cst
之一。否则行为未定义。
T exchange(T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange(T val, memory_order sync = memory_order_seq_cst) noexcept;
测试代码:
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::memory_order_relaxed
#include <thread> // std::thread
std::atomic<bool> ready(false);
std::atomic<bool> winner(false);
void count1m (int id) {
while (!ready) {} // 死循环至 ready=true
for (int i=0; i<1000000; ++i) {} // go!, count to 1 million
if (!winner.exchange(true))
std::cout << "thread #" << id << " won!\n";
};
int main ()
{
std::vector<std::thread> threads;
std::cout << "spawning 10 threads that count to 1 million...\n";
for (int i=1; i<=10; ++i)
threads.push_back(std::thread(count1m,i));
ready = true;
for (auto& th : threads)
th.join();
return 0;
}
spawning 10 threads that count to 1 million…
thread #1 won!
-
第一个线程通过
exchange()
函数修改
winner
值后,可以打印
"thread #" << id << " won!\n";
-
后续线程再调用
exchange()
函数,返回之前值
true
,因此只有一个线程成功打印。
3.2 compare_exchange_*
-
首先介绍一下关键字
volatile
,这篇博文写的不错:
volatile介绍
。 -
C/C++中的 volatile 关键字 和 const对应,用来修饰变量,
volatile
关键字是一种类型修饰符,用它声明的类型变量表示可以被某些编译器未知的因素更改,比如:操作系统,硬件或者其他线程等。 -
遇到这个关键字声明的变量,编译器对访问该变量的代码就不再进行优化,从而可以提供对特殊地址的稳定访问。声明时语法:
int volatile i;
当要求使用
volatile
声明的变量的值的时候,系统总是重新从它所在的内存读取数据,即使它前面的指令刚刚从该处读取过数据。而且读取的数据立刻被保存。
// 1. volatile
volatile int i = 10;
int a = i;
int b = i;
// 2. 优化做法
int i = 10;
int a = i;
int b = i;
-
volatile
指出 i 是随时可能发生变化的,每次使用它的时候必须从 i的地址中读取,因而编译器生成的汇编代码会重新从i的地址读取数据放在 b 中。 -
优化做法是
,由于编译器发现两次从 i读数据的代码之间的代码没有对 i 进行过操作,它会自动把上次读的数据放在 b 中。而不是重新从 i 里面读。这样以来,如果 i是一个寄存器变量或者表示一个端口数据就容易出错,所以说 volatile 可以保证对特殊地址的稳定访问。
多线程下的volatile:
有些变量是用volatile关键字声明的。当两个线程都要用到某一个变量且该变量的值会被改变时,应该用volatile声明,该关键字的作用是防止优化编译器把变量从内存装入CPU寄存器中。
如果变量被装入寄存器,那么两个线程有可能一个使用内存中的变量,一个使用寄存器中的变量,这会造成程序的错误执行。
-
功能函数:
std::atomic::compare_exchange_weak
bool compare_exchange_weak(T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept; // 1.
bool compare_exchange_weak(T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_weak(T& expected, T val,
memory_order success, memory_order failure) volatile noexcept; // 2.
bool compare_exchange_weak(T& expected, T val,
memory_order success, memory_order failure) noexcept;
-
该函数比较
expected
与 当前对象的值:
1. 如果为
true
,使用
val
替换当前对象的值(
store
)。
2. 如果为
false
,使用当前对象的值 替换
expected
的值。
3. 返回值为比较结果。函数中操作都是原子的:读取值和替换值时,其他线程不能修改该值。 -
对于
版本2
,使用的内存顺序取决于比较的结果:如果为true,则使用
success
;如果为false,则使用
failure
。 -
该算法是弱比较,允许虚假失败,即:此函数直接将包含值的物理内容与预期值的内容进行比较;这可能导致使用运算符
==
(如果基础类型具有填充位、陷阱值或相同值的替代表示形式)比较相等的值失败,尽管这种比较会在一个保持预期的循环中迅速收敛。 -
与
compare_exchange_strong
相比,即使
expected
的值等于当前对象值,也允许此弱版本函数通过返回
false
而错误地失败,对于某些循环算法,这可能是可以接受的行为,并且在某些平台上可能会导致显著更好的性能。在这些虚假故障中,函数返回false,而不修改
expected
值。 - 对于非循环,最好使用强版本。
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread
#include <vector> // std::vector
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);
void append (int val) { // append an element to the list
Node* oldHead = list_head;
Node* newNode = new Node {val,oldHead};
// 函数返回:
// true, head=new, 线程结束
// false,old的元素替换为 head的元素,进入循环
while (!list_head.compare_exchange_weak(oldHead,newNode))
newNode->next = oldHead;
}
int main () {
// 创建一个链表,存在1~10 十个元素
std::vector<std::thread> threads;
for (int i=0; i<10; ++i) threads.push_back(std::thread(append,i));
for (auto& th : threads) th.join();
// 遍历链表
std::cout << std::endl;
for (Node* it=list_head; it!=nullptr; it=it->next)
std::cout << it->value << " ";
std::cout << '\n';
// 释放链表内存
Node* it; while (it=list_head) {list_head=it->next; delete it;}
return 0;
}
9 8 7 6 5 4 3 0 2 1
-
上述代码使用弱版本函数
compare_exchange_weak
,因此即便是相等情况,函数也会返回
false
。 -
当函数返回
true
,
head=new
,未进入循环,head 的元素替换为
val
。 -
当函数返回
false
,使用 head 替换 old_node(两者本来就一样),进入循环,直至函数返回
true
。
使用
compare_exchange_weak
很好的解决了元素的多线程插入。
那么考虑一下,如果不使用
compare_exchange_weak
,直接
store
会产生怎样的效果。
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);
void append (int val) {
auto new_node = new Node {val,list_head};
if (new_node->next) // 1. 打印当前 head 结点元素
cout << new_node->next->value << " ";
list_head.store(new_node); // 使用store() 函数,链表中插入新的元素
std::cout << val << " \n"; // 2. 打印当前插入的元素
}
int main () {
// 创建一个链表,存在1~10 十个元素
std::vector<std::thread> threads;
for (int i=0; i<10; ++i) threads.push_back(std::thread(append,i));
for (auto& th : threads) th.join();
// 遍历链表
std::cout << std::endl;
for (Node* it=list_head; it!=nullptr; it=it->next)
std::cout << it->value << " ";
std::cout << '\n';
// 释放链表内存
Node* it; while (it=list_head) {list_head=it->next; delete it;}
return 0;
}
0
0 2
0 1
1 3
3 4
4 5
5 6
6 7
7 8
8 99 8 7 6 5 4 3 1 0
-
可以看到,十个线程都成功完成 store() 函数,但是最终链表中却少了很多元素。这是因为存在一下情景:
1.两个线程
A(val=1)
、
B(val=2)
同时构造自己线程的
new_node
。此时
list_head
对应的结点(
val=0
)。
2. 接下来线程A完成store(),
list_head
指针指向的结点替换为
A_new_node
,新的
list_head
结点元素为
val=1
,
list_head->next
对应元素为
val=0
。
3. 此时线程B中的
new_node->next
仍然指向
val=0
的结点,线程B调用store()后,新的
list_head
结点元素为
val=2
,
list_head->next
对应元素为
val=0
,此时元素
1
虽然插入了,但是被覆盖了。 -
插入顺序可以看出,
head
结点元素为
0
时,两个线程分别插入
2
、
1
,最终结果可以看出
2
被覆盖。
-
功能函数:
std::atomic::compare_exchange_strong
bool compare_exchange_strong(T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept; // 1.
bool compare_exchange_strong(T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_strong(T& expected, T val,
memory_order success, memory_order failure) volatile noexcept; // 2.
bool compare_exchange_strong(T& expected, T val,
memory_order success, memory_order failure) noexcept;
-
与
compare_exchange_weak
区别: 当
expected
与当前对象进行比较时,此强版本不允许虚假失败。
参考文章: