Atomic operations are the foundation of lock-free programming. After building several high-frequency trading systems, I've learned that understanding atomics and memory ordering is crucial for achieving microsecond-level latencies. This article compares C++ and Rust atomic operations with production examples.
Traditional locks have overhead:
In our market data feed, replacing mutexes with atomics reduced P99 latency from 42μs to 8μs.
Modern CPUs reorder instructions for performance. Memory ordering controls when changes become visible to other threads.
1Relaxed: No synchronization, only atomicity
2Acquire: Prevents reordering of subsequent reads/writes before this load
3Release: Prevents reordering of previous reads/writes after this store
4AcqRel: Combination of Acquire + Release
5SeqCst: Sequentially consistent (strongest, slowest)
6C++11 introduced std::atomic with explicit memory ordering.
1#include <atomic>
2#include <thread>
3#include <vector>
4#include <iostream>
5
6// Simple atomic counter
7class AtomicCounter {
8private:
9 std::atomic<uint64_t> count_{0};
10
11public:
12 // Relaxed: no synchronization needed for counters
13 void increment() {
14 count_.fetch_add(1, std::memory_order_relaxed);
15 }
16
17 uint64_t get() const {
18 return count_.load(std::memory_order_relaxed);
19 }
20
21 // Atomic compare-and-swap
22 bool try_increment_if_below(uint64_t max) {
23 uint64_t current = count_.load(std::memory_order_relaxed);
24
25 while (current < max) {
26 if (count_.compare_exchange_weak(
27 current, current + 1,
28 std::memory_order_relaxed,
29 std::memory_order_relaxed)) {
30 return true;
31 }
32 // current updated to actual value on failure
33 }
34 return false;
35 }
36};
37
38// Example: Thread-safe statistics
39struct Statistics {
40 std::atomic<uint64_t> total_orders{0};
41 std::atomic<uint64_t> filled_orders{0};
42 std::atomic<double> total_volume{0.0};
43
44 void record_order(double volume, bool filled) {
45 total_orders.fetch_add(1, std::memory_order_relaxed);
46
47 if (filled) {
48 filled_orders.fetch_add(1, std::memory_order_relaxed);
49
50 // Atomic float operations
51 double current = total_volume.load(std::memory_order_relaxed);
52 while (!total_volume.compare_exchange_weak(
53 current, current + volume,
54 std::memory_order_relaxed,
55 std::memory_order_relaxed)) {
56 // Retry on failure
57 }
58 }
59 }
60
61 double fill_rate() const {
62 uint64_t total = total_orders.load(std::memory_order_relaxed);
63 uint64_t filled = filled_orders.load(std::memory_order_relaxed);
64 return total > 0 ? static_cast<double>(filled) / total : 0.0;
65 }
66};
671template<typename T>
2class LockFreeStack {
3private:
4 struct Node {
5 T data;
6 Node* next;
7
8 Node(T d) : data(std::move(d)), next(nullptr) {}
9 };
10
11 std::atomic<Node*> head_{nullptr};
12 std::atomic<size_t> size_{0};
13
14public:
15 ~LockFreeStack() {
16 while (Node* node = head_.load()) {
17 head_.store(node->next);
18 delete node;
19 }
20 }
21
22 void push(T value) {
23 Node* new_node = new Node(std::move(value));
24
25 // Load current head
26 Node* old_head = head_.load(std::memory_order_relaxed);
27
28 do {
29 new_node->next = old_head;
30 // Try to update head atomically
31 } while (!head_.compare_exchange_weak(
32 old_head, new_node,
33 std::memory_order_release, // Success: publish the new node
34 std::memory_order_relaxed)); // Failure: retry
35
36 size_.fetch_add(1, std::memory_order_relaxed);
37 }
38
39 bool try_pop(T& value) {
40 Node* old_head = head_.load(std::memory_order_acquire);
41
42 while (old_head != nullptr) {
43 // Try to update head to next
44 if (head_.compare_exchange_weak(
45 old_head, old_head->next,
46 std::memory_order_acquire,
47 std::memory_order_acquire)) {
48
49 value = std::move(old_head->data);
50 delete old_head;
51 size_.fetch_sub(1, std::memory_order_relaxed);
52 return true;
53 }
54 // old_head updated on failure, retry
55 }
56
57 return false; // Stack was empty
58 }
59
60 size_t size() const {
61 return size_.load(std::memory_order_relaxed);
62 }
63};
641template<typename T, size_t Capacity>
2class SPSCQueue {
3 static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be power of 2");
4
5private:
6 struct alignas(64) Slot {
7 T data;
8 std::atomic<bool> ready{false};
9 };
10
11 Slot slots_[Capacity];
12
13 // Separate cache lines for producer and consumer
14 alignas(64) std::atomic<size_t> write_pos_{0};
15 alignas(64) std::atomic<size_t> read_pos_{0};
16
17public:
18 bool try_push(const T& item) {
19 const size_t write_idx = write_pos_.load(std::memory_order_relaxed);
20 const size_t next_write = (write_idx + 1) & (Capacity - 1);
21
22 // Check if full (would overtake reader)
23 if (next_write == read_pos_.load(std::memory_order_acquire)) {
24 return false;
25 }
26
27 // Write data
28 Slot& slot = slots_[write_idx];
29 slot.data = item;
30
31 // Mark ready and advance
32 slot.ready.store(true, std::memory_order_release);
33 write_pos_.store(next_write, std::memory_order_release);
34
35 return true;
36 }
37
38 bool try_pop(T& item) {
39 const size_t read_idx = read_pos_.load(std::memory_order_relaxed);
40 Slot& slot = slots_[read_idx];
41
42 // Check if ready
43 if (!slot.ready.load(std::memory_order_acquire)) {
44 return false;
45 }
46
47 // Read data
48 item = slot.data;
49
50 // Mark consumed and advance
51 slot.ready.store(false, std::memory_order_release);
52 const size_t next_read = (read_idx + 1) & (Capacity - 1);
53 read_pos_.store(next_read, std::memory_order_release);
54
55 return true;
56 }
57};
581class AtomicBarrier {
2private:
3 const size_t num_threads_;
4 std::atomic<size_t> count_{0};
5 std::atomic<size_t> generation_{0};
6
7public:
8 explicit AtomicBarrier(size_t num_threads)
9 : num_threads_(num_threads) {}
10
11 void wait() {
12 const size_t gen = generation_.load(std::memory_order_acquire);
13 const size_t count = count_.fetch_add(1, std::memory_order_acq_rel);
14
15 if (count + 1 == num_threads_) {
16 // Last thread resets counter and bumps generation
17 count_.store(0, std::memory_order_release);
18 generation_.fetch_add(1, std::memory_order_release);
19 } else {
20 // Wait for generation to change
21 while (generation_.load(std::memory_order_acquire) == gen) {
22 // Spin or yield
23 _mm_pause();
24 }
25 }
26 }
27};
28
29// Spinlock using atomic flag
30class AtomicSpinlock {
31private:
32 std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
33
34public:
35 void lock() {
36 while (flag_.test_and_set(std::memory_order_acquire)) {
37 // Spin with pause to reduce cache traffic
38 while (flag_.test(std::memory_order_relaxed)) {
39 _mm_pause();
40 }
41 }
42 }
43
44 void unlock() {
45 flag_.clear(std::memory_order_release);
46 }
47};
48Rust provides safe atomic operations with explicit memory ordering.
1use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4
5// Atomic counter
6struct AtomicCounter {
7 count: AtomicU64,
8}
9
10impl AtomicCounter {
11 fn new() -> Self {
12 AtomicCounter {
13 count: AtomicU64::new(0),
14 }
15 }
16
17 fn increment(&self) {
18 self.count.fetch_add(1, Ordering::Relaxed);
19 }
20
21 fn get(&self) -> u64 {
22 self.count.load(Ordering::Relaxed)
23 }
24
25 // Compare-and-swap
26 fn try_increment_if_below(&self, max: u64) -> bool {
27 let mut current = self.count.load(Ordering::Relaxed);
28
29 loop {
30 if current >= max {
31 return false;
32 }
33
34 match self.count.compare_exchange_weak(
35 current,
36 current + 1,
37 Ordering::Relaxed,
38 Ordering::Relaxed,
39 ) {
40 Ok(_) => return true,
41 Err(actual) => current = actual,
42 }
43 }
44 }
45}
46
47// Example: Atomic statistics
48struct Statistics {
49 total_orders: AtomicU64,
50 filled_orders: AtomicU64,
51}
52
53impl Statistics {
54 fn new() -> Self {
55 Statistics {
56 total_orders: AtomicU64::new(0),
57 filled_orders: AtomicU64::new(0),
58 }
59 }
60
61 fn record_order(&self, filled: bool) {
62 self.total_orders.fetch_add(1, Ordering::Relaxed);
63
64 if filled {
65 self.filled_orders.fetch_add(1, Ordering::Relaxed);
66 }
67 }
68
69 fn fill_rate(&self) -> f64 {
70 let total = self.total_orders.load(Ordering::Relaxed);
71 let filled = self.filled_orders.load(Ordering::Relaxed);
72
73 if total > 0 {
74 filled as f64 / total as f64
75 } else {
76 0.0
77 }
78 }
79}
801use std::ptr;
2use std::sync::atomic::{AtomicPtr, Ordering};
3
4struct Node<T> {
5 data: T,
6 next: *mut Node<T>,
7}
8
9pub struct LockFreeStack<T> {
10 head: AtomicPtr<Node<T>>,
11}
12
13impl<T> LockFreeStack<T> {
14 pub fn new() -> Self {
15 LockFreeStack {
16 head: AtomicPtr::new(ptr::null_mut()),
17 }
18 }
19
20 pub fn push(&self, value: T) {
21 let new_node = Box::into_raw(Box::new(Node {
22 data: value,
23 next: ptr::null_mut(),
24 }));
25
26 unsafe {
27 let mut old_head = self.head.load(Ordering::Relaxed);
28
29 loop {
30 (*new_node).next = old_head;
31
32 match self.head.compare_exchange_weak(
33 old_head,
34 new_node,
35 Ordering::Release, // Success: publish node
36 Ordering::Relaxed, // Failure: retry
37 ) {
38 Ok(_) => break,
39 Err(actual) => old_head = actual,
40 }
41 }
42 }
43 }
44
45 pub fn try_pop(&self) -> Option<T> {
46 unsafe {
47 let mut old_head = self.head.load(Ordering::Acquire);
48
49 loop {
50 if old_head.is_null() {
51 return None;
52 }
53
54 let next = (*old_head).next;
55
56 match self.head.compare_exchange_weak(
57 old_head,
58 next,
59 Ordering::Acquire,
60 Ordering::Acquire,
61 ) {
62 Ok(_) => {
63 let node = Box::from_raw(old_head);
64 return Some(node.data);
65 }
66 Err(actual) => old_head = actual,
67 }
68 }
69 }
70 }
71}
72
73impl<T> Drop for LockFreeStack<T> {
74 fn drop(&mut self) {
75 unsafe {
76 let mut current = self.head.load(Ordering::Relaxed);
77 while !current.is_null() {
78 let node = Box::from_raw(current);
79 current = node.next;
80 }
81 }
82 }
83}
84
85// Safety: Send if T is Send
86unsafe impl<T: Send> Send for LockFreeStack<T> {}
87unsafe impl<T: Send> Sync for LockFreeStack<T> {}
881use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
2use std::cell::UnsafeCell;
3
4pub struct SPSCQueue<T, const N: usize> {
5 buffer: [UnsafeCell<Option<T>>; N],
6 write_pos: AtomicUsize,
7 read_pos: AtomicUsize,
8}
9
10unsafe impl<T: Send, const N: usize> Send for SPSCQueue<T, N> {}
11unsafe impl<T: Send, const N: usize> Sync for SPSCQueue<T, N> {}
12
13impl<T, const N: usize> SPSCQueue<T, N> {
14 pub fn new() -> Self {
15 assert!(N.is_power_of_two(), "Capacity must be power of 2");
16
17 // Initialize array with None
18 let buffer = unsafe {
19 let mut arr: [UnsafeCell<Option<T>>; N] =
20 std::mem::MaybeUninit::uninit().assume_init();
21
22 for slot in &mut arr[..] {
23 std::ptr::write(slot, UnsafeCell::new(None));
24 }
25
26 arr
27 };
28
29 SPSCQueue {
30 buffer,
31 write_pos: AtomicUsize::new(0),
32 read_pos: AtomicUsize::new(0),
33 }
34 }
35
36 pub fn try_push(&self, value: T) -> Result<(), T> {
37 let write_idx = self.write_pos.load(Ordering::Relaxed);
38 let next_write = (write_idx + 1) & (N - 1);
39
40 // Check if full
41 if next_write == self.read_pos.load(Ordering::Acquire) {
42 return Err(value);
43 }
44
45 // Write data
46 unsafe {
47 *self.buffer[write_idx].get() = Some(value);
48 }
49
50 // Publish write
51 self.write_pos.store(next_write, Ordering::Release);
52
53 Ok(())
54 }
55
56 pub fn try_pop(&self) -> Option<T> {
57 let read_idx = self.read_pos.load(Ordering::Relaxed);
58
59 // Check if empty
60 if read_idx == self.write_pos.load(Ordering::Acquire) {
61 return None;
62 }
63
64 // Read data
65 let value = unsafe {
66 (*self.buffer[read_idx].get()).take()
67 };
68
69 // Publish read
70 let next_read = (read_idx + 1) & (N - 1);
71 self.read_pos.store(next_read, Ordering::Release);
72
73 value
74 }
75}
761use std::sync::atomic::{AtomicU8, Ordering};
2
3#[repr(u8)]
4#[derive(Debug, Clone, Copy, PartialEq)]
5enum OrderState {
6 New = 0,
7 Validated = 1,
8 Sent = 2,
9 Filled = 3,
10 Cancelled = 4,
11}
12
13struct Order {
14 state: AtomicU8,
15}
16
17impl Order {
18 fn new() -> Self {
19 Order {
20 state: AtomicU8::new(OrderState::New as u8),
21 }
22 }
23
24 fn get_state(&self) -> OrderState {
25 match self.state.load(Ordering::Acquire) {
26 0 => OrderState::New,
27 1 => OrderState::Validated,
28 2 => OrderState::Sent,
29 3 => OrderState::Filled,
30 4 => OrderState::Cancelled,
31 _ => unreachable!(),
32 }
33 }
34
35 fn try_transition(&self, from: OrderState, to: OrderState) -> bool {
36 self.state.compare_exchange(
37 from as u8,
38 to as u8,
39 Ordering::AcqRel,
40 Ordering::Acquire,
41 ).is_ok()
42 }
43
44 fn validate(&self) -> bool {
45 self.try_transition(OrderState::New, OrderState::Validated)
46 }
47
48 fn send(&self) -> bool {
49 self.try_transition(OrderState::Validated, OrderState::Sent)
50 }
51
52 fn fill(&self) -> bool {
53 self.try_transition(OrderState::Sent, OrderState::Filled)
54 }
55
56 fn cancel(&self) -> bool {
57 let current = self.get_state();
58 match current {
59 OrderState::New | OrderState::Validated => {
60 self.try_transition(current, OrderState::Cancelled)
61 }
62 _ => false,
63 }
64 }
65}
661use crossbeam::queue::ArrayQueue;
2use crossbeam::channel;
3use crossbeam::epoch::{self, Atomic, Owned};
4use std::sync::Arc;
5
6// Using Crossbeam's lock-free queue
7fn crossbeam_queue_example() {
8 let queue = Arc::new(ArrayQueue::new(1024));
9 let queue_clone = queue.clone();
10
11 // Producer
12 let producer = thread::spawn(move || {
13 for i in 0..1000 {
14 while queue_clone.push(i).is_err() {
15 thread::yield_now();
16 }
17 }
18 });
19
20 // Consumer
21 let consumer = thread::spawn(move || {
22 let mut sum = 0;
23 for _ in 0..1000 {
24 loop {
25 if let Some(val) = queue.pop() {
26 sum += val;
27 break;
28 }
29 thread::yield_now();
30 }
31 }
32 sum
33 });
34
35 producer.join().unwrap();
36 let sum = consumer.join().unwrap();
37 println!("Sum: {}", sum);
38}
39
40// Crossbeam channels (MPMC)
41fn crossbeam_channel_example() {
42 let (tx, rx) = channel::bounded(100);
43
44 // Multiple producers
45 for i in 0..4 {
46 let tx = tx.clone();
47 thread::spawn(move || {
48 for j in 0..100 {
49 tx.send(i * 100 + j).unwrap();
50 }
51 });
52 }
53 drop(tx); // Close channel when done
54
55 // Multiple consumers
56 for _ in 0..2 {
57 let rx = rx.clone();
58 thread::spawn(move || {
59 while let Ok(val) = rx.recv() {
60 println!("Received: {}", val);
61 }
62 });
63 }
64}
651// Relaxed: Independent counters/stats
2std::atomic<uint64_t> packet_count{0};
3packet_count.fetch_add(1, std::memory_order_relaxed);
4
5// Acquire-Release: Producer-consumer
6// Producer:
7data = new_value;
8ready.store(true, std::memory_order_release);
9
10// Consumer:
11while (!ready.load(std::memory_order_acquire));
12use(data); // Guaranteed to see new_value
13
14// SeqCst: When unsure (safer, slower)
15flag.store(true, std::memory_order_seq_cst);
161// Double-checked locking
2fn get_or_init<T>(
3 cache: &AtomicPtr<T>,
4 init: impl FnOnce() -> T,
5) -> &T {
6 // Fast path: already initialized
7 let ptr = cache.load(Ordering::Acquire);
8 if !ptr.is_null() {
9 return unsafe { &*ptr };
10 }
11
12 // Slow path: initialize
13 let new_value = Box::into_raw(Box::new(init()));
14
15 match cache.compare_exchange(
16 ptr::null_mut(),
17 new_value,
18 Ordering::Release,
19 Ordering::Acquire,
20 ) {
21 Ok(_) => unsafe { &*new_value },
22 Err(actual) => {
23 // Someone else initialized, use theirs
24 unsafe {
25 Box::from_raw(new_value); // Clean up
26 &*actual
27 }
28 }
29 }
30}
31
32// Reference counting
33struct Arc<T> {
34 ptr: *mut ArcInner<T>,
35}
36
37struct ArcInner<T> {
38 strong: AtomicUsize,
39 data: T,
40}
41
42impl<T> Clone for Arc<T> {
43 fn clone(&self) -> Self {
44 unsafe {
45 (*self.ptr).strong.fetch_add(1, Ordering::Relaxed);
46 Arc { ptr: self.ptr }
47 }
48 }
49}
50
51impl<T> Drop for Arc<T> {
52 fn drop(&mut self) {
53 unsafe {
54 if (*self.ptr).strong.fetch_sub(1, Ordering::Release) == 1 {
55 // Last reference, ensure all writes visible
56 atomic::fence(Ordering::Acquire);
57 Box::from_raw(self.ptr);
58 }
59 }
60 }
61}
62From our trading system (10 million operations):
1Operation C++ (std) Rust (std) Crossbeam
2────────────────────────────────────────────────────────────
3Atomic increment 180M 175M N/A
4CAS (contended) 42M 41M N/A
5SPSC queue 68M 65M 72M
6MPMC queue N/A N/A 48M
71Operation C++ (std) Rust (std) Crossbeam
2────────────────────────────────────────────────────────────
3Atomic load 3 3 N/A
4Atomic store 4 4 N/A
5CAS (success) 12 12 N/A
6SPSC push 28 26 22
7SPSC pop 24 23 21
81// WRONG: ABA problem
2let old = ptr.load(Ordering::Acquire);
3// ... do work ...
4// Meanwhile: ptr changes to B, then back to A
5ptr.compare_exchange(old, new, ...); // Succeeds incorrectly!
6
7// RIGHT: Use tagged pointers or epoch-based reclamation
8use crossbeam::epoch;
91// WRONG: Missing acquire
2data = 42;
3flag.store(true, std::memory_order_relaxed); // BUG!
4
5// Other thread
6if (flag.load(std::memory_order_relaxed)) { // BUG!
7 use(data); // May see old value!
8}
9
10// RIGHT:
11data = 42;
12flag.store(true, std::memory_order_release);
13
14// Other thread
15if (flag.load(std::memory_order_acquire)) {
16 use(data); // Guaranteed to see 42
17}
181// WRONG: False sharing
2struct Counters {
3 std::atomic<uint64_t> counter1; // Same cache line!
4 std::atomic<uint64_t> counter2; // Bouncing between cores
5};
6
7// RIGHT: Pad to separate cache lines
8struct Counters {
9 alignas(64) std::atomic<uint64_t> counter1;
10 alignas(64) std::atomic<uint64_t> counter2;
11};
12After years of lock-free programming in production:
Atomics enable lock-free programming, but they're not a silver bullet. Use them when latency truly matters and you understand the tradeoffs.
Master atomics and memory ordering—they're essential tools for building high-performance concurrent systems.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.