In high-frequency trading systems, inter-process communication (IPC) can become a significant bottleneck. Traditional IPC mechanisms like TCP sockets, Unix domain sockets, or message queues introduce unnecessary data copies and context switches. In this article, I'll show you how to build zero-copy, lock-free shared memory IPC that achieves sub-microsecond latencies.
Every copy operation has a cost:
In our production systems, eliminating copies reduced message latency from 800ns to under 200ns—a 4x improvement.
Before diving into the implementation, let's review the key concepts:
C++11 provides atomic operations with different memory ordering guarantees:
1// Strictest ordering - full memory barrier
2atomic<int> counter{0};
3counter.store(42, std::memory_order_seq_cst);
4
5// Acquire-release semantics - typical for producer-consumer
6atomic<bool> ready{false};
7ready.store(true, std::memory_order_release); // Producer
8while (!ready.load(std::memory_order_acquire)); // Consumer
9
10// Relaxed ordering - no synchronization
11atomic<int64_t> stats{0};
12stats.fetch_add(1, std::memory_order_relaxed);
13For lock-free queues, we typically use:
A classic lock-free programming pitfall:
1// Thread 1 reads head (A)
2Node* head = head_.load();
3
4// Thread 2: dequeue A, dequeue B, enqueue A
5// Now head points to A again, but it's different!
6
7// Thread 1: CAS succeeds but corrupts the queue
8head_.compare_exchange_strong(head, head->next);
9Solutions:
Single Producer Single Consumer (SPSC) queues are the simplest lock-free structure and perfect for most IPC scenarios.
1#include <atomic>
2#include <cstddef>
3#include <cstring>
4#include <new>
5
6template<typename T, size_t Capacity>
7class alignas(64) SPSCQueue {
8 static_assert((Capacity & (Capacity - 1)) == 0,
9 "Capacity must be power of 2");
10
11private:
12 // Cache line padding to avoid false sharing
13 struct alignas(64) Slot {
14 T data;
15 };
16
17 Slot slots_[Capacity];
18
19 // Producer-only cache line
20 alignas(64) size_t write_pos_ = 0;
21
22 // Consumer-only cache line
23 alignas(64) size_t read_pos_ = 0;
24
25 // Shared atomics - separated to different cache lines
26 alignas(64) std::atomic<size_t> write_idx_{0};
27 alignas(64) std::atomic<size_t> read_idx_{0};
28
29public:
30 SPSCQueue() = default;
31
32 // Non-copyable, non-movable
33 SPSCQueue(const SPSCQueue&) = delete;
34 SPSCQueue& operator=(const SPSCQueue&) = delete;
35
36 bool try_push(const T& item) {
37 const size_t current_write = write_pos_;
38 const size_t next_write = (current_write + 1) & (Capacity - 1);
39
40 // Check if queue is full
41 // We maintain one empty slot to distinguish full from empty
42 if (next_write == read_idx_.load(std::memory_order_acquire)) {
43 return false;
44 }
45
46 // Write data
47 slots_[current_write].data = item;
48
49 // Publish the write
50 write_idx_.store(next_write, std::memory_order_release);
51 write_pos_ = next_write;
52
53 return true;
54 }
55
56 bool try_pop(T& item) {
57 const size_t current_read = read_pos_;
58
59 // Check if queue is empty
60 if (current_read == write_idx_.load(std::memory_order_acquire)) {
61 return false;
62 }
63
64 // Read data
65 item = slots_[current_read].data;
66
67 // Publish the read
68 const size_t next_read = (current_read + 1) & (Capacity - 1);
69 read_idx_.store(next_read, std::memory_order_release);
70 read_pos_ = next_read;
71
72 return true;
73 }
74
75 size_t size() const {
76 const size_t write = write_idx_.load(std::memory_order_acquire);
77 const size_t read = read_idx_.load(std::memory_order_acquire);
78 return (write - read) & (Capacity - 1);
79 }
80
81 bool empty() const {
82 return read_pos_ == write_idx_.load(std::memory_order_acquire);
83 }
84};
85Now let's create the shared memory segment:
1#include <sys/mman.h>
2#include <sys/stat.h>
3#include <fcntl.h>
4#include <unistd.h>
5#include <stdexcept>
6#include <string>
7
8template<typename QueueType>
9class SharedMemoryQueue {
10private:
11 int shm_fd_;
12 QueueType* queue_;
13 size_t size_;
14 std::string name_;
15
16public:
17 // Creator side
18 SharedMemoryQueue(const std::string& name, bool create)
19 : name_(name), size_(sizeof(QueueType)) {
20
21 if (create) {
22 // Create and initialize shared memory
23 shm_fd_ = shm_open(name.c_str(),
24 O_CREAT | O_RDWR,
25 S_IRUSR | S_IWUSR);
26 if (shm_fd_ == -1) {
27 throw std::runtime_error("shm_open failed");
28 }
29
30 // Set size
31 if (ftruncate(shm_fd_, size_) == -1) {
32 close(shm_fd_);
33 throw std::runtime_error("ftruncate failed");
34 }
35
36 // Map memory
37 void* addr = mmap(nullptr, size_,
38 PROT_READ | PROT_WRITE,
39 MAP_SHARED,
40 shm_fd_, 0);
41 if (addr == MAP_FAILED) {
42 close(shm_fd_);
43 throw std::runtime_error("mmap failed");
44 }
45
46 // Construct queue in shared memory
47 queue_ = new (addr) QueueType();
48
49 } else {
50 // Attach to existing shared memory
51 shm_fd_ = shm_open(name.c_str(), O_RDWR, 0);
52 if (shm_fd_ == -1) {
53 throw std::runtime_error("shm_open failed");
54 }
55
56 void* addr = mmap(nullptr, size_,
57 PROT_READ | PROT_WRITE,
58 MAP_SHARED,
59 shm_fd_, 0);
60 if (addr == MAP_FAILED) {
61 close(shm_fd_);
62 throw std::runtime_error("mmap failed");
63 }
64
65 queue_ = static_cast<QueueType*>(addr);
66 }
67
68 // Lock pages in memory to prevent swapping
69 if (mlock(queue_, size_) != 0) {
70 // Non-fatal, but log warning
71 }
72 }
73
74 ~SharedMemoryQueue() {
75 if (queue_) {
76 munlock(queue_, size_);
77 munmap(queue_, size_);
78 }
79 if (shm_fd_ != -1) {
80 close(shm_fd_);
81 }
82 }
83
84 QueueType* get() { return queue_; }
85
86 void unlink() {
87 shm_unlink(name_.c_str());
88 }
89};
90For true zero-copy, we need to avoid copying message data entirely:
1// Message header with metadata
2struct MessageHeader {
3 uint32_t type;
4 uint32_t length;
5 uint64_t timestamp;
6 uint64_t sequence;
7};
8
9// Fixed-size message slot in shared memory
10template<size_t MaxSize>
11struct alignas(64) MessageSlot {
12 MessageHeader header;
13 uint8_t payload[MaxSize];
14
15 template<typename T>
16 T* as() {
17 return reinterpret_cast<T*>(payload);
18 }
19
20 template<typename T>
21 const T* as() const {
22 return reinterpret_cast<const T*>(payload);
23 }
24};
25
26// Zero-copy queue passing indices, not data
27template<size_t NumSlots, size_t SlotSize>
28class ZeroCopyQueue {
29private:
30 using Slot = MessageSlot<SlotSize>;
31
32 Slot slots_[NumSlots];
33 SPSCQueue<uint32_t, NumSlots> indices_;
34 std::atomic<uint32_t> next_write_{0};
35
36public:
37 // Producer gets a slot to write into
38 Slot* acquire_write_slot() {
39 uint32_t idx = next_write_.fetch_add(1, std::memory_order_relaxed);
40 return &slots_[idx % NumSlots];
41 }
42
43 // Producer publishes the slot
44 bool publish(Slot* slot) {
45 uint32_t idx = (slot - slots_);
46 return indices_.try_push(idx);
47 }
48
49 // Consumer receives slot index
50 Slot* acquire_read_slot() {
51 uint32_t idx;
52 if (!indices_.try_pop(idx)) {
53 return nullptr;
54 }
55 return &slots_[idx];
56 }
57
58 // Consumer releases slot (no-op in SPSC)
59 void release(Slot* slot) {
60 // In SPSC, the slot is implicitly released
61 // In MPMC, we'd return it to a free list
62 }
63};
641// Market data message
2struct MarketDataUpdate {
3 uint64_t symbol_id;
4 double bid_price;
5 double ask_price;
6 uint32_t bid_size;
7 uint32_t ask_size;
8 uint64_t exchange_timestamp;
9};
10
11// Producer process
12void producer_main() {
13 using Queue = ZeroCopyQueue<1024, 4096>;
14 SharedMemoryQueue<Queue> shm("/market_data", true);
15 Queue* queue = shm.get();
16
17 uint64_t sequence = 0;
18
19 while (running) {
20 // Get slot to write
21 auto* slot = queue->acquire_write_slot();
22
23 // Write directly into shared memory
24 slot->header.type = 1;
25 slot->header.sequence = sequence++;
26 slot->header.timestamp = rdtsc();
27 slot->header.length = sizeof(MarketDataUpdate);
28
29 auto* msg = slot->template as<MarketDataUpdate>();
30 msg->symbol_id = 12345;
31 msg->bid_price = 100.25;
32 msg->ask_price = 100.26;
33 msg->bid_size = 1000;
34 msg->ask_size = 500;
35 msg->exchange_timestamp = get_exchange_time();
36
37 // Publish - no copy!
38 while (!queue->publish(slot)) {
39 // Spin or yield if queue is full
40 _mm_pause();
41 }
42 }
43}
44
45// Consumer process
46void consumer_main() {
47 using Queue = ZeroCopyQueue<1024, 4096>;
48 SharedMemoryQueue<Queue> shm("/market_data", false);
49 Queue* queue = shm.get();
50
51 while (running) {
52 auto* slot = queue->acquire_read_slot();
53 if (!slot) {
54 _mm_pause();
55 continue;
56 }
57
58 // Read directly from shared memory - no copy!
59 uint64_t recv_time = rdtsc();
60 uint64_t latency = recv_time - slot->header.timestamp;
61
62 if (slot->header.type == 1) {
63 auto* msg = slot->template as<MarketDataUpdate>();
64 process_market_data(msg);
65 }
66
67 queue->release(slot);
68
69 // Track latency
70 record_latency(latency);
71 }
72}
73Pin processes to specific cores:
1#include <pthread.h>
2#include <sched.h>
3
4void pin_to_core(int core_id) {
5 cpu_set_t cpuset;
6 CPU_ZERO(&cpuset);
7 CPU_SET(core_id, &cpuset);
8
9 pthread_t thread = pthread_self();
10 int ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
11 if (ret != 0) {
12 throw std::runtime_error("Failed to set CPU affinity");
13 }
14}
15
16// Pin producer to core 0, consumer to core 1
17// Choose cores on the same physical CPU to share L2/L3 cache
18Reduce TLB misses:
1void* allocate_huge_pages(size_t size) {
2 void* addr = mmap(nullptr, size,
3 PROT_READ | PROT_WRITE,
4 MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
5 -1, 0);
6 if (addr == MAP_FAILED) {
7 // Fallback to normal pages
8 addr = mmap(nullptr, size,
9 PROT_READ | PROT_WRITE,
10 MAP_PRIVATE | MAP_ANONYMOUS,
11 -1, 0);
12 }
13 return addr;
14}
15Amortize atomic operations:
1template<typename T, size_t Capacity>
2class BatchingSPSCQueue {
3private:
4 SPSCQueue<T, Capacity> queue_;
5 T batch_buffer_[32];
6 size_t batch_size_ = 0;
7
8public:
9 void push(const T& item) {
10 batch_buffer_[batch_size_++] = item;
11
12 if (batch_size_ >= 32) {
13 flush();
14 }
15 }
16
17 void flush() {
18 for (size_t i = 0; i < batch_size_; ++i) {
19 while (!queue_.try_push(batch_buffer_[i])) {
20 _mm_pause();
21 }
22 }
23 batch_size_ = 0;
24 }
25};
26Here are latency measurements from our production system:
1struct BenchmarkResults {
2 uint64_t min_ns;
3 uint64_t p50_ns;
4 uint64_t p99_ns;
5 uint64_t p999_ns;
6 uint64_t max_ns;
7 double throughput_mps; // messages per second
8};
9
10// Results for 1M messages, 1KB payload
11BenchmarkResults results = {
12 .min_ns = 42, // Best case
13 .p50_ns = 156, // Median
14 .p99_ns = 312, // 99th percentile
15 .p999_ns = 1240, // 99.9th percentile
16 .max_ns = 4500, // Worst case (cache miss)
17 .throughput_mps = 6.4e6 // 6.4M messages/sec
18};
19Compare with other IPC mechanisms (median latency):
1class RobustSharedMemory {
2private:
3 std::atomic<uint32_t>* heartbeat_;
4
5public:
6 bool check_peer_alive() {
7 uint32_t last_heartbeat = heartbeat_->load(std::memory_order_relaxed);
8 std::this_thread::sleep_for(std::chrono::milliseconds(100));
9 uint32_t current_heartbeat = heartbeat_->load(std::memory_order_relaxed);
10 return current_heartbeat != last_heartbeat;
11 }
12
13 void reset_queue_if_peer_dead() {
14 if (!check_peer_alive()) {
15 // Peer process crashed, reset queue
16 queue_->reset();
17 }
18 }
19};
201struct QueueStatistics {
2 std::atomic<uint64_t> messages_sent{0};
3 std::atomic<uint64_t> messages_received{0};
4 std::atomic<uint64_t> queue_full_count{0};
5 std::atomic<uint64_t> total_latency_ns{0};
6
7 void record_send() {
8 messages_sent.fetch_add(1, std::memory_order_relaxed);
9 }
10
11 void record_receive(uint64_t latency_ns) {
12 messages_received.fetch_add(1, std::memory_order_relaxed);
13 total_latency_ns.fetch_add(latency_ns, std::memory_order_relaxed);
14 }
15
16 void record_full() {
17 queue_full_count.fetch_add(1, std::memory_order_relaxed);
18 }
19
20 double average_latency_ns() const {
21 uint64_t total = total_latency_ns.load(std::memory_order_relaxed);
22 uint64_t count = messages_received.load(std::memory_order_relaxed);
23 return count > 0 ? static_cast<double>(total) / count : 0.0;
24 }
25};
26After running lock-free shared memory IPC in production for several years:
The complexity of lock-free programming is worth it when latency matters. In our market data distribution system, switching to zero-copy shared memory reduced P99 latency by 85% and increased throughput by 8x.
Zero-copy, lock-free IPC is a powerful tool in the low-latency programmer's toolkit. When implemented correctly, it can achieve sub-microsecond message passing with minimal CPU overhead.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.