在2014年的C++大会上,Herb Sutter做了C++无锁(译注:lock-free,意为不使用锁来保持代码的同步)编程的演讲,在演讲中他解释了无锁编程的基本概念,并用三种算法展示了无锁技术。本文是他演讲重点的概要。 Herb首先指出了无锁代码的优势:
但是无锁编程不是万能药,因为无锁算法实现起来更复杂,它也有潜在问题,比如竞争(contention),这会极大地影响性能。从这一点出发,Herb引出了他的第一条强烈建议:
基本概念以下是考虑无锁算法的时候需要理解的一些基本概念:
为达到这些目的所用到的基本技术有:
在C++ 11中,你可以使用
关于
例子:两次检查的加锁(double-checked locking) 核心思想是:用一个锁来保护原子写操作,但是让原子读操作处于无锁状态。这样只有在各个线程竞争初始化这个单例(singleton)的时候才可能发生阻塞。算法取这个名字的原因是,实例的指针共检查了两次,加锁前和加锁后: atomic Widget::pInstance{ nullptr };
Widget* Widget::Instance() {
if (pInstance == nullptr) {
lock_guard lock { mutW };
if (pInstance == nullptr) {
pInstance = new Widget();
}
}
}
例子:生产者——消费者
使用无锁技术对这个算法做第一种改进,通过 然后,我们考虑完全无锁的实现。在这种情况下,算法的思想是,生产者要去填满一定数量的“槽(slot)”。当生产者有一个新任务要处理,它会去检查是否有空闲的槽,并把要处理的任务放进去。在下面的代码中,slot是一个 curr = 0;
// 第一阶段:生成任务
while (ThereAreMoreTasks()) {
task = AllocateAndBuildNewTask();
while (slot[curr] != null)
curr = (curr+1)%K;
slot[curr] = task;
sem[curr].signal();
}
// 第二阶段: 把邮箱置为done状态
numNotified = 0;
while (numNotified < K) {
while (slot[curr] != null)
curr = (curr+1)%K;
slot[curr] = done;
sem[curr].signal();
++numNotified;
}
对于消费者来说,代码更简单: myTask = null;
while (myTask != done) {
while (myTask = slot[mySlot]) == null)
sem[mySlot].wait();
if (myTask != done) {
slot[mySlot] = null;
DoWork(myTask);
}
}
消费者等待信号量,直到槽里面有任务为止。任务来了,消费者把它拿出来,并清空槽,然后开始处理任务。这就是把任务处理放到关键区 (critical section)之外的思想。但是如果消费者处理得比生产者慢,那么在任务处理完之后再释放锁比较合理,这样,当消费者忙的时候,生产者就不会再去填充同 一个槽,而是去找另外一个空闲的槽。这表明了,你的设计决策会对业务吞吐量/可扩展性(throughput/scalability)和负载平衡 (load balancing)之间的取舍产生微妙的影响。 例子:单向链表: Herb提出的无锁实现使用了 template
void slist<T>::push_front(T t) {
auto p = new Node;
p->t = t;
p->next = head;
head = p;
}
这个代码有问题,因为它在设置新的 template
void slist<T>::push_front(T t) {
auto p = new Node;
p->t = t;
p->next = head;
while (!head.compare_exchange_weak(p->next, p))
{}
}
这里,我们不停尝试交换 而当我们要实现一个移除节点(pop)的操作时,更多的问题就来了。移除操作会删掉链表中的第一个节点,在这种情况下,导致复杂性的一个主要原因是,返回的对象指针有可能很快被另外一个线程释放掉。这个问题有个广为人知的名字叫ABA问题,Herb随后深入细节,讲述了它在特定的环境下是如何发生的。 C++ 11为这个问题给出了优雅的解决方案,不再使用原始的指针,而是用 template
struct Node { T t; shared_ptr<Node> next; };
atomic<shared_ptr<Node>> head;
public:
slist() =default;
~slist() =default;
class reference {
shared_ptr p;
public:
reference(shared_ptr<Node> p_) : p{_p} {}
T& operator*() { return p->t; }
T* operator->() { return &p->t; }
};
auto find(T t) const {
auto p = head.load();
while (p && p->t != t)
p = p->next;
return reference{move(p)};
void push_front(T t) {
auto p = make_shared<Node>();
p->t = t;
p->next = head;
while (head.compare_exchange_weak(p->next, p))
{}
}
void pop_front() {
auto p = head.load();
while (p && !head.compare_exchange_weak(p, p->next))
{}
}
};
这里用到的技巧是,返回的指针被指定为一个 这种实现展示了一种很好的属性叫线性化,它可以使一组互相交错的操作看起来像是顺序执行的一样。 演讲的最后部分讨论了一个例子,这个例子展示了如何来衡量一个程序的行为,以及它可以从无锁的实现中获得什么样的好处。 查看英文原文:Lock-free Programming in C++ with Herb Sutter |