无锁技术

自旋锁

自旋锁是最基本的一种无锁技术,采用基于 CAS(Compare-and-Swap)的操作,尝试获取锁,失败时采用轮询的方式等待资源,避免线程被阻塞,不需要系统调用,从而减少上下文切换和性能损耗。
原子操作使用 CPU 提供的原子性指令,属于硬件层面的,实现数据的原子性访问和修改,避免使用锁导致的线程阻塞和竞争,同时也避免了死锁的发生。
加锁区执行速度快的情况下可以使用该锁。
atomic提供的常见方法:

  • store:原子写操作。
  • load:原子读操作。
  • compare_exchange_strong:传入[期望原值]和[设定值],当前值与期望值相等时,修改当前值为设定值,返回true;当前值与期望值不等时,将期望值修改为当前值,返回false。
  • compare_exchange_weak:同上,但允许偶然出乎意料的返回(比如在字段值和期待值一样的时候却返回了false),比strong性能更好些,但需要循环来保证逻辑的正确性。
  • exchange:交换2个数值,并保证整个过程是原子的。

C++11标准中提供了 std::atomic 模板类,用于实现原子操作,可用于实现 CAS 操作,从而实现无锁编程。
在非并发条件下,实现一个栈的Push操作,通常有如下操作:

  • 步骤1:新建一个节点,node* const new_node = new node(data);
  • 步骤2:将该节点的next指针指向现有栈顶,new_node->next = head;
  • 步骤3:更新栈顶,head = new_node

以下是使用 std::atomic 实现 CAS 的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;

node(T const& data_):
data(data_)
{}
};

std::atomic<node*> head;

public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
//如果head!=new_node->next,会执行new_node->next = head.load()操作并返回false;while循环后再次判断,如果此时head=new_node->next,则执行head.store(new_node)并返回true结束循环
while(!head.compare_exchange_strong(new_node->next,new_node));
}
};

内存序

Memory Order是用来用来约束同一个线程内的内存访问排序方式的,虽然同一个线程内的代码顺序重排不会影响本线程的执行结果(如果结果都不一致,那么重排就没有意义了),但是在多线程环境下,重排造成的数据访问顺序变化会影响其它线程的访问结果。

C++11中引入了六种内存约束符用以解决多线程下的内存一致性问题(在头文件中),其定义如下:

1
2
3
4
5
6
7
8
typedef enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
} memory_order;

Relax模型

memory_order_relaxed对于内存序的限制最小,也就是说这种方式只能保证当前的数据访问是原子操作(不会被其他线程的操作打断),但是对内存访问顺序没有任何约束,也就是说对不同的数据的读写可能会被重新排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <atomic>
#include <thread>
#include <iostream>

std::atomic<bool> x{false};
int a = 0;

void fun1() { // 线程1
a = 1; // L9
x.store(true, std::memory_order_relaxed); // L10
}
void func2() { // 线程2
while(!x.load(std::memory_order_relaxed)); // L13
if(a) { // L14
std::cout << "a = 1" << std::endl;
}
}
int main() {
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();
return 0;
}

上述代码中,线程1有两个代码语句,语句L9是一个简单的赋值操作,语句L10是一个带有memory_order_relaxed标记的原子写操作,基于reorder原则,这两句的顺序没有确定下即不能保证哪个在前,哪个在后。而对于线程2,也有两个代码句,分别是带有memory_order_relaxed标记的原子读操作L13和简单的判断输出语句L14。需要注意的是语句L13和语句L14的顺序是确定的,即语句L13 happens-before 语句L14,这是由while循环代码语义保证的。换句话说,while语句优先于后面的语句执行,这是编译器或者CPU的重排规则。

对于上述示例,我们第一印象会输出a = 1 这句。但实际上,也有可能不会输出。这是因为在线程1中,因为指令的乱序重排,有可能导致L10先执行,然后再执行语句L9。如果结合了线程2一起来分析,就是这4个代码句的执行顺序有可能是#L10–>L13–>L14–>L9,这样就不能得到我们想要的结果了。

那么既然memory_order_relaxed不能保证执行顺序,它们的使用场景又是什么呢?这就需要用到其特性即只保证当前的数据访问是原子操作,通常用于一些统计计数的需求场景,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <cassert>
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> cnt = {0};
void fun1() {
for (int n = 0; n < 100; ++n) {
cnt.fetch_add(1, std::memory_order_relaxed);
}
}

void fun2() {
for (int n = 0; n < 900; ++n) {
cnt.fetch_add(1, std::memory_order_relaxed);
}
}

int main() {
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();

return 0;
}

Acquire-Release模型

Acquire-Release模型的控制力度介于Relax模型和Sequential consistency模型之间。其定义如下:

  • Acquire:如果一个操作X带有acquire语义,那么在操作X后的所有读写指令都不会被重排序到操作X之前
  • Relase:如果一个操作X带有release语义,那么在操作X前的所有读写指令操作都不会被重排序到操作X之后

结合上面的定义,重新解释下该模型:假设有一个原子变量A,对A的写操作(Release)和读操作(Acquire)之间进行同步,并建立排序约束关系,即对于写操作(release)X,在写操作X之前的所有读写指令都不能放到写操作X之后;对于读操作(acquire)Y,在读操作Y之后的所有读写指令都不能放到读操作Y之前。

Acquire-Release模型对应六种约束关系中的memory_order_consume、memory_order_acquire、memory_order_release和memory_order_acq_rel。这些约束关系,有的只能用于读操作(memory_order_consume、memory_order_acquire),有的适用于写操作(memory_order_release),有的既能用于读操作也能用于写操作(memory_order_acq_rel)。这些约束符互相配合,可以实现相对严格一点的内存访问顺序控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <thread>
#include <atomic>
#include <cassert>
#include <string>

std::atomic<std::string*> ptr;
int data;

void producer() {
std::string* p = new std::string("Hello"); // L10
data = 42; // L11
ptr.store(p, std::memory_order_release); // L12
}

void consumer() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_acquire))); // L17
assert(*p2 == "Hello"); // L18
assert(data == 42); // L19
}

int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();

return 0;
}

一个load操作使用了memory_order_consume约束符:在当前线程中,load操作之后的依赖于此原子变量的读和写操作都不能被重排到当前指令前。(与memory_order_acquire相比,一个是有依赖关系的不重排,一个是全部不重排),如果有其他线程使用memory_order_release内存模型对此原子变量进行store操作,在当前线程中是可见的。

如下,与memory_order_acquire一节中示例相比较,producer()没有变化,consumer()函数中将load操作的标记符从memory_order_acquire变成了memory_order_consume。而这个变动会引起如下变化:producer()中,ptr与p有依赖 关系,则p不会重排到store()操作L12之后,而data因为与ptr没有依赖关系,则可能重排到L12之后,所以可能导致L19的assert()失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <thread>
#include <atomic>
#include <cassert>
#include <string>

std::atomic<std::string*> ptr;
int data;

void producer() {
std::string* p = new std::string("Hello"); // L10
data = 42; // L11
ptr.store(p, std::memory_order_release); // L12
}

void consumer() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // L17
assert(*p2 == "Hello"); // L18
assert(data == 42); // L19
}

int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();

return 0;
}

memory_order_acq_rel,这个标记相当于对读操作使用了memory_order_acquire约束符,对写操作使用了memory_order_release约束符。当前线程中这个操作之前的内存读写不能被重排到这个操作之后,这个操作之后的内存读写也不能被重排到这个操作之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Thread-1:
a = y.load(memory_order_acq_rel); // A
x.store(a, memory_order_acq_rel); // B

// Thread-2:
b = x.load(memory_order_acq_rel); // C
y.store(1, memory_order_acq_rel); // D

// 两个示例,效果完全一样,都可以保证A先于B执行,C先于D执行。

// Thread-1:
a = y.load(memory_order_acquire); // A
x.store(a, memory_order_release); // B

// Thread-2:
b = x.load(memory_order_acquire); // C
y.store(1, memory_order_release); // D

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!