无锁队列通过原子操作和CAS实现多线程并发访问,使用std::atomic和内存序优化性能,需解决ABA问题并谨慎处理内存回收。

实现一个无锁队列(lock-free queue)是高性能并发编程中的常见需求。相比使用互斥量(mutex)保护的队列,无锁队列通过原子操作和CAS(Compare-And-Swap)机制避免线程阻塞,提升多线程环境下的吞吐量。C++ 提供了 std::atomic 和底层原子指令支持,使得实现无锁数据结构成为可能。
无锁队列的基本原理
无锁队列的核心思想是:多个线程可以同时访问共享数据结构,但通过原子操作保证操作的完整性,而不是依赖锁来串行化访问。关键在于使用 CAS(Compare-And-Swap) 操作来更新指针或状态变量。
CAS 的逻辑是:如果当前值等于预期值,则将其更新为新值,否则不做修改。这个过程是原子的,由硬件层面保障。在 C++ 中,可以通过 std::atomic
基于链表的无锁队列实现
一个常见的无锁队列实现是使用单向链表,维护 head 和 tail 两个指针。每个节点包含数据和指向下一个节点的指针。入队操作从 tail 插入,出队操作从 head 移除。
立即学习“C++免费学习笔记(深入)”;
以下是简化版的无锁队列实现:
#include#include template
class LockFreeQueue { private: struct Node { T data; std::atomic next; Node(const T& d) : data(d), next(nullptr) {} }; std::atomiczuojiankuohaophpcnNode*youjiankuohaophpcn head; std::atomiczuojiankuohaophpcnNode*youjiankuohaophpcn tail;public: LockFreeQueue() { Node* dummy = new Node(T{}); head.store(dummy, std::memory_order_relaxed); tail.store(dummy, std::memory_order_relaxed); }
void enqueue(const T& data) { Node* new_node = new Node(data); Node* expected_tail; Node* null_next = nullptr; while (true) { expected_tail = tail.load(std::memory_order_acquire); // 尝试将 tail 的 next 设置为新节点 if (expected_tail-youjiankuohaophpcnnext.compare_exchange_weak( null_next, new_node, std::memory_order_release)) { // 更新 tail 指针 tail.compare_exchange_weak(expected_tail, new_node, std::memory_order_acq_rel); break; } else { // tail 已被其他线程更新,尝试更新本地副本 tail.compare_exchange_weak(expected_tail, expected_tail-youjiankuohaophpcnnext.load(), std::memory_order_acq_rel); } } } bool dequeue(T& result) { Node* old_head = head.load(std::memory_order_acquire); Node* old_tail = tail.load(std::memory_order_acquire); Node* next_head = old_head-youjiankuohaophpcnnext.load(std::memory_order_acquire); if (old_head == old_tail) { if (next_head == nullptr) { return false; // 队列为空 } // tail 落后,尝试推进 tail.compare_exchange_weak(old_tail, next_head, std::memory_order_acq_rel); return false; } if (head.compare_exchange_weak(old_head, next_head, std::memory_order_acq_rel)) { result = next_head-youjiankuohaophpcndata; delete old_head; // 注意:存在 ABA 问题风险 return true; } return false; // 失败,重试 } ~LockFreeQueue() { while (dequeue(T{})); delete head.load(); }};
内存序与性能优化
上述代码中使用了不同的内存序(memory order),这是无锁编程的关键部分:
- std::memory_order_relaxed:只保证原子性,不提供同步或顺序约束。
- std::memory_order_acquire:用于读操作,确保之后的读写不会被重排序到该操作之前。
- std::memory_order_release:用于写操作,确保之前的读写不会被重排序到该操作之后。
- std::memory_order_acq_rel:同时具备 acquire 和 release 语义,用于读-修改-写操作。
合理选择内存序可以在保证正确性的前提下减少内存屏障开销,提升性能。
ABA 问题与解决方案
在无锁队列中,一个经典问题是 ABA 问题:线程 A 读取了 head 指针为 A,期间另一个线程 B 将 A 出队,并重新分配一个新节点放在同一地址,再入队。此时 A 看到 head 还是 A,误以为没有变化,继续操作,可能导致数据错误或崩溃。
解决方法包括:
- 使用带标记的指针(tagged pointer):将版本号与指针组合存储,例如用 64 位整数的高 16 位作为版本号,低 48 位作为指针(需平台支持)。
- 使用 Hazard Pointer 或 RCU(Read-Copy-Update) 机制延迟内存回收。
C++ 标准库暂未提供内置的 hazard pointer 支持,但可手动实现。
基本上就这些。无锁队列虽然性能优越,但实现复杂,容易出错。建议在真正需要极致性能的场景下使用,并充分测试边界条件和内存模型行为。











