1. 硬件内存模型
理解 C++ 原子操作的前提是理解底层硬件的内存模型。不同的 CPU 架构提供了不同强度的内存顺序保证。
1.1 x86 TSO (Total Store Order)
x86 是最强顺序的架构之一,采用 TSO 模型:
x86 Store 路径(强顺序)
- ✓ 写后读保序:Store → Load 有序(即使不同核心)
- ✓ Store Load 有序:所有 Store 在后续 Load 之前完成
- ✓ 隐式内存屏障:大多数操作自带隐式 MFENCE
- ✗ Store Buffer 带来有限重排(先写入 buffer,后刷入 cache)
1.2 ARM/POWER 弱顺序模型
ARM 和 POWER 采用弱顺序模型,允许更多重排以换取性能:
ARM/POWER 允许所有操作重排
| 特性 | x86 | ARM64 | POWER |
|---|---|---|---|
| Load-Load 重排 | 禁止 | 允许 | 允许 |
| Store-Store 重排 | 禁止 | 允许 | 允许 |
| Load-Store 重排 | 禁止 | 允许 | 允许 |
| Store-Load 重排 | 禁止 | 允许 | 允许 |
| 典型 Barrier 开销 | ~25 cycles | ~50-100 cycles | ~100+ cycles |
1.3 内存模型的直观理解
// 代码执行顺序 vs 实际内存顺序
// Thread 1:
a = 1; // Store a
store(b, 1); // Store b
// Thread 2:
if (b == 1) { // Load b
assert(a == 1); // Load a
}
在弱顺序模型(如 ARM)上,右边的断言可能失败——CPU 可以先完成 Store b,再完成 Store a,甚至在不同的 core 上以不同顺序可见。
⚠️ 跨平台编发的陷阱
在 x86 上运行正常的多线程代码,在 ARM 服务器上可能崩溃。这就是为什么理解 memory_order 对高性能跨平台代码至关重要。
2. memory_order 语义
2.1 六种语义概览
| 语义 | 保证强度 | Load 影响 | Store 影响 |
|---|---|---|---|
memory_order_relaxed |
最弱 | 无 | 无 |
memory_order_acquire |
中等 | 后续读写不能重排到此 Load 之前 | 无 |
memory_order_release |
中等 | 无 | 之前的读写不能重排到此 Store 之后 |
memory_order_acq_rel |
中等+ | 同 acquire | 同 release |
memory_order_consume |
弱于 acquire | 依赖链不能重排 | 无 |
memory_order_seq_cst |
最强 | 全局有序 | 全局有序 |
2.2 每种语义的详细语义
2.2.1 memory_order_relaxed
唯一保证:原子性。没有顺序保证,也没有可见性保证。
// relaxed 的使用场景:简单计数器
std::atomic<int> counter{0};
// 多个线程可以并发 increment
counter.fetch_add(1, std::memory_order_relaxed);
// 注意:relaxed 不能用于同步任何其他操作
// 例如不能用 relaxed 实现 flag = true; 来通知其他线程
2.2.2 memory_order_acquire
常用于读取端,构建与 release 的同步关系:
// Acquire 用于读取共享状态
bool wait(std::atomic<bool>& flag) {
// 当 flag 变为 true 时才返回
while (!flag.load(std::memory_order_acquire)) {
// 自旋等待
std::this_thread::yield();
}
// 此时 flag 之前的操作(store side)已经对当前线程可见
}
2.2.3 memory_order_release
常用于写入端,向 acquire 线程发布数据:
// Release 用于发布共享数据
std::vector<Packet> pending_packets;
// 生产者线程
pending_packets.push_back(packet);
flag.store(true, std::memory_order_release);
// ↑ Release 确保 pending_packets.push_back 在 flag.store 之前完成
// 对等待 acquire 的消费者线程可见
2.2.4 memory_order_acq_rel
同时获取和释放,用于 RMW(Read-Modify-Write)操作:
// compare_exchange 使用 acq_rel
// 成功时:acquire + release(同时)
// 失败时:acquire
std::atomic<int> value{0};
int old = value.load(std::memory_order_acquire);
do {
// CAS 成功时,同时 release,确保之前的所有操作对其他线程可见
} while (!value.compare_exchange_strong(old, old + 1,
std::memory_order_acq_rel,
std::memory_order_acquire));
2.2.5 memory_order_seq_cst
序列一致,是最强也是最慢的语义:
// seq_cst 确保全局顺序一致
// 所有线程看到的操作顺序完全相同
// 代价:在 x86 上需要 MFENCE,在 ARM 上需要 ISB + 大量重排
std::atomic<int> x{0}, y{0};
// Thread 1:
x.store(1, std::memory_order_seq_cst);
y.store(1, std::memory_order_seq_cst);
// Thread 2:
// 在 seq_cst 下,永远不会观察到 y=1 而 x=0 的状态
💡 seq_cst 的性能影响
在 x86 上,seq_cst 相比 release/acquire 有约 10-20% 的额外开销;在 ARM64 上,这个差距可能扩大到 2-5 倍。除非必要,应优先使用 release/acquire。
3. acq_rel 性能
3.1 何时使用 acq_rel
acq_rel 适用于需要同时同步读写操作的场景,典型用例是锁的实现和复杂的状态转换:
// 典型用例:实现自旋锁
class SpinLock {
std::atomic<bool> locked_{false};
public:
void lock() {
// test_and_set 使用 acq_rel:
// - 获取当前值(acquire 语义)
// - 设置为 true(release 语义)
// - 如果之前是 false,获取锁成功
while (locked_.test_and_set(std::memory_order_acq_rel)) {
std::this_thread::yield();
}
}
void unlock() {
// release 语义:释放锁,并确保之前的操作对等待者可见
locked_.store(false, std::memory_order_release);
}
};
3.2 性能对比
| 操作 | x86 seq_cst | x86 acq_rel | ARM64 seq_cst | ARM64 acq_rel |
|---|---|---|---|---|
| Atomic Increment | ~25 cycles | ~20 cycles | ~80 cycles | ~35 cycles |
| CAS (成功) | ~40 cycles | ~30 cycles | ~120 cycles | ~50 cycles |
| Store + Barrier | ~30 cycles | ~15 cycles | ~100 cycles | ~30 cycles |
3.3 选择原则
使用 seq_cst 当:
- ✓ 需要严格全局顺序
- ✓ 不确定使用哪种 memory order
- ✓ 调试阶段可以先用 seq_cst
使用 acq_rel 当:
- ✓ 锁的实现
- ✓ RMW 操作
- ✓ 需要同时同步读和写
使用 release 当:
- ✓ 仅发布数据给其他线程
- ✓ 单向同步(生产者)
使用 acquire 当:
- ✓ 仅读取其他线程发布的数据
- ✓ 单向同步(消费者)
4. 无锁队列
4.1 Michael-Scott 队列原理
无锁队列是最经典的无锁数据结构之一,广泛应用于高性能消息队列。其核心思想是使用 CAS 保证原子性:
// 简化的 Michael-Scott 无锁队列
template<typename T>
struct Node {
std::atomic<Node*> next;
std::atomic<bool> has_value;
T value;
};
template<typename T>
class LockFreeQueue {
std::atomic<Node*> head_;
std::atomic<Node*> tail_;
public:
void enqueue(T value) {
Node* new_node = new Node();
new_node->has_value.store(true, std::memory_order_relaxed);
new_node->value = value;
new_node->next.store(nullptr, std::memory_order_relaxed);
while (true) {
Node* tail = tail_.load(std::memory_order_acquire);
Node* next = tail->next.load(std::memory_order_acquire);
// 检查 tail 是否落后
if (next != nullptr) {
// tail 落后,尝试推进
tail_.compare_exchange_weak(tail, next,
std::memory_order_acq_rel, std::memory_order_acquire);
continue;
}
// 尝试将新节点接到 tail 的 next
if (tail->next.compare_exchange_weak(next, new_node,
std::memory_order_acq_rel, std::memory_order_acquire)) {
// 成功,尝试推进 tail
tail_.compare_exchange_weak(tail, new_node,
std::memory_order_acq_rel, std::memory_order_acquire);
return;
}
}
}
std::optional<T> dequeue() {
// 类似逻辑,省略...
}
};
4.2 无锁算法的 ABA 问题
⚠️ ABA 问题
无锁 CAS 的经典陷阱:线程 A 读取指针 P 指向节点 A,线程 B 删除了节点 A 并添加节点 C(恰好复用同一地址),线程 A 的 CAS 仍会成功,但这不是它想要的节点!
// 解决方案:Tagged Pointers / Hazard Pointers / RCU
// 方案1: Tagged Pointer(增加版本号)
struct TaggedPtr {
Node* ptr;
uint64_t tag; // 每释放一次就递增
};
// 方案2: Hazard Pointers(Google 的方案)
// 每个线程声明自己"关注"的指针,其他线程不能删除
// 方案3: RCU(Read-Copy-Update)
// 读取不阻塞,删除推迟到所有读者离开
5. 实战陷阱
5.1 最常见的错误:使用 relaxed 实现同步
// ❌ 错误:relaxed 不能用于线程间同步
std::atomic<bool> ready{false};
std::atomic<int> value{0};
// Thread 1:
value.store(42, std::memory_order_relaxed); // 错误!
ready.store(true, std::memory_order_relaxed);
// Thread 2:
if (ready.load(std::memory_order_relaxed)) {
// value.load() 可能看到 0,因为 relaxed 没有顺序保证
std::cout << value.load(std::memory_order_relaxed);
}
// ✅ 正确:使用 release/acquire
ready.store(true, std::memory_order_release);
// ... Thread 2 ...
if (ready.load(std::memory_order_acquire)) {
// 此时 value.store(42) 一定已经完成
}
5.2 错误:假设 seq_cst 是唯一安全的选择
// ❌ 过度保守:所有操作都用 seq_cst
std::atomic<int> counter{0};
counter.fetch_add(1, std::memory_order_seq_cst); // x86 上需要隐式屏障
// ✅ 正确:只需要原子性时用 relaxed
counter.fetch_add(1, std::memory_order_relaxed); // 快得多
5.3 错误:内存顺序不匹配
// ❌ 生产者用 release,消费者用 relaxed
// 生产者:
data.store(42, std::memory_order_release); // 同步点
ready.store(true, std::memory_order_release);
// 消费者:
if (ready.load(std::memory_order_relaxed)) { // 错误!
// data 可能还没写入
}
// ✅ 正确:消费者必须用 acquire 或更强的语义
if (ready.load(std::memory_order_acquire)) {
// 此时 release 之前的所有操作可见
}
5.4 迁移检查清单
# 代码审查:memory_order 正确性检查
□ 检查所有 atomic 变量的 memory_order 设置
□ 确保 release 总是与 acquire/consume 配对
□ 检查是否真的需要 seq_cst(性能考虑)
□ 检查 relaxed 使用场景(仅计数器等简单场景)
□ 验证 CAS 的失败顺序是否正确
□ 检查是否可能出现 ABA 问题
✅ 最佳实践总结
- ✓ 默认使用 seq_cst,确安全全后再优化
- ✓ 生产者/消费者模式使用 release/acquire
- ✓ 锁实现使用 test_and_set + acq_rel
- ✓ 简单计数器使用 relaxed
- ✓ 无锁数据结构使用 tagged pointer 防止 ABA
- ✓ 跨平台代码务必测试 ARM/Power
总结
C++ 原子的 memory_order 是连接硬件模型和高级语言的桥梁。理解 release/acquire 的同步语义,是编写正确高效多线程代码的基础。在 x86 上可以享受"免费"的强顺序,但在 ARM/POWER 上必须显式使用正确的 memory order。
无锁数据结构是高性能系统的基石,但 ABA 问题和复杂的内存顺序逻辑使其难以正确实现。在生产环境中,优先使用经过验证的库(如 Boost.Lockfree),只有在性能分析确认瓶颈时,才值得自己实现。