Real-time systems must respond to events within strict deadlines. After building several trading systems requiring microsecond-level determinism, I've learned that Linux can be a real-time OS—with the right configuration. This article covers PREEMPT_RT, CPU isolation, priority scheduling, and implementation in both C++ and Rust.
Real-time doesn't mean "fast"—it means deterministic. Key metrics:
Hard real-time: Missing deadline = system failure Soft real-time: Missing deadline = degraded performance
Trading systems are typically soft real-time: missing a quote update by 10μs doesn't break the system, but consistent delays hurt profitability.
The PREEMPT_RT patch makes Linux fully preemptible:
1# Install PREEMPT_RT kernel
2sudo apt-get install linux-image-rt-amd64
3
4# Verify
5uname -a
6# Should show "PREEMPT RT"
7
8# Check preemption model
9cat /sys/kernel/realtime
10# Should return 1
111# /etc/sysctl.conf optimizations
2
3# Disable CPU frequency scaling
4kernel.sched_rt_runtime_us = -1
5
6# Reduce swapping
7vm.swappiness = 0
8
9# Increase max locked memory
10vm.max_map_count = 262144
11
12# Disable watchdog
13kernel.watchdog = 0
14kernel.nmi_watchdog = 0
151# /etc/default/grub
2GRUB_CMDLINE_LINUX="isolcpus=2,3,4,5 nohz_full=2,3,4,5 rcu_nocbs=2,3,4,5"
3
4# Update grub
5sudo update-grub
6sudo reboot
7
8# Verify isolation
9cat /sys/devices/system/cpu/isolated
10# Should show: 2-5
111#include <pthread.h>
2#include <sched.h>
3#include <sys/mman.h>
4#include <iostream>
5#include <cstring>
6#include <stdexcept>
7
8class RealTimeThread {
9private:
10 pthread_t thread_;
11 int priority_;
12 int cpu_core_;
13
14public:
15 RealTimeThread(int priority, int cpu_core)
16 : priority_(priority), cpu_core_(cpu_core) {
17
18 if (priority < 1 || priority > 99) {
19 throw std::invalid_argument("Priority must be 1-99");
20 }
21 }
22
23 template<typename Func>
24 void start(Func&& func) {
25 // Lock all current and future memory pages
26 if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) {
27 throw std::runtime_error("mlockall failed");
28 }
29
30 // Create thread with attributes
31 pthread_attr_t attr;
32 pthread_attr_init(&attr);
33
34 // Set scheduling policy to FIFO
35 pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
36
37 // Set priority
38 struct sched_param param;
39 param.sched_priority = priority_;
40 pthread_attr_setschedparam(&attr, ¶m);
41
42 // Set CPU affinity
43 cpu_set_t cpuset;
44 CPU_ZERO(&cpuset);
45 CPU_SET(cpu_core_, &cpuset);
46 pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
47
48 // Create thread
49 auto thread_func = new Func(std::forward<Func>(func));
50
51 if (pthread_create(&thread_, &attr,
52 [](void* arg) -> void* {
53 auto* f = static_cast<Func*>(arg);
54 (*f)();
55 delete f;
56 return nullptr;
57 },
58 thread_func) != 0) {
59 delete thread_func;
60 throw std::runtime_error("pthread_create failed");
61 }
62
63 pthread_attr_destroy(&attr);
64 }
65
66 void join() {
67 pthread_join(thread_, nullptr);
68 }
69
70 // Set thread name for debugging
71 void set_name(const char* name) {
72 pthread_setname_np(thread_, name);
73 }
74
75 // Get current priority
76 static int get_current_priority() {
77 struct sched_param param;
78 int policy;
79 pthread_getschedparam(pthread_self(), &policy, ¶m);
80 return param.sched_priority;
81 }
82};
83
84// Pre-allocated memory pool to avoid malloc in RT path
85template<typename T, size_t N>
86class RTMemoryPool {
87private:
88 alignas(64) T pool_[N];
89 std::atomic<uint64_t> free_mask_{(1ULL << N) - 1};
90
91public:
92 T* allocate() {
93 uint64_t mask = free_mask_.load(std::memory_order_relaxed);
94
95 while (mask != 0) {
96 int idx = __builtin_ctzll(mask); // Find first set bit
97 uint64_t new_mask = mask & ~(1ULL << idx);
98
99 if (free_mask_.compare_exchange_weak(
100 mask, new_mask,
101 std::memory_order_acquire,
102 std::memory_order_relaxed)) {
103 return &pool_[idx];
104 }
105 }
106
107 return nullptr; // Pool exhausted
108 }
109
110 void deallocate(T* ptr) {
111 size_t idx = ptr - pool_;
112 if (idx >= N) return;
113
114 uint64_t mask = free_mask_.load(std::memory_order_relaxed);
115 uint64_t new_mask = mask | (1ULL << idx);
116
117 while (!free_mask_.compare_exchange_weak(
118 mask, new_mask,
119 std::memory_order_release,
120 std::memory_order_relaxed)) {
121 new_mask = mask | (1ULL << idx);
122 }
123 }
124};
1251#include <sys/timerfd.h>
2#include <unistd.h>
3#include <atomic>
4
5class RTEventLoop {
6private:
7 int timer_fd_;
8 std::atomic<bool> running_{true};
9 uint64_t missed_deadlines_{0};
10
11 // Latency tracking
12 struct LatencyStats {
13 uint64_t min_ns = UINT64_MAX;
14 uint64_t max_ns = 0;
15 uint64_t total_ns = 0;
16 uint64_t count = 0;
17
18 void record(uint64_t latency_ns) {
19 min_ns = std::min(min_ns, latency_ns);
20 max_ns = std::max(max_ns, latency_ns);
21 total_ns += latency_ns;
22 count++;
23 }
24
25 double avg_ns() const {
26 return count > 0 ? static_cast<double>(total_ns) / count : 0.0;
27 }
28 };
29
30 LatencyStats stats_;
31
32public:
33 RTEventLoop(uint64_t period_ns) {
34 // Create timer
35 timer_fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
36 if (timer_fd_ < 0) {
37 throw std::runtime_error("timerfd_create failed");
38 }
39
40 // Set periodic timer
41 struct itimerspec ts;
42 ts.it_value.tv_sec = period_ns / 1000000000;
43 ts.it_value.tv_nsec = period_ns % 1000000000;
44 ts.it_interval = ts.it_value;
45
46 if (timerfd_settime(timer_fd_, 0, &ts, nullptr) < 0) {
47 close(timer_fd_);
48 throw std::runtime_error("timerfd_settime failed");
49 }
50 }
51
52 ~RTEventLoop() {
53 close(timer_fd_);
54 }
55
56 template<typename Func>
57 void run(Func&& process_tick) {
58 uint64_t expected_time = get_time_ns();
59
60 while (running_.load(std::memory_order_relaxed)) {
61 // Wait for timer
62 uint64_t expirations;
63 ssize_t ret = read(timer_fd_, &expirations, sizeof(expirations));
64
65 if (ret < 0) {
66 if (errno == EAGAIN) {
67 // Spurious wakeup
68 continue;
69 }
70 break;
71 }
72
73 uint64_t now = get_time_ns();
74 uint64_t latency = now - expected_time;
75
76 // Track missed deadlines
77 if (expirations > 1) {
78 missed_deadlines_ += (expirations - 1);
79 }
80
81 stats_.record(latency);
82
83 // Process tick
84 process_tick(now, latency);
85
86 // Update expected time
87 expected_time = now + (ts.it_interval.tv_sec * 1000000000ULL +
88 ts.it_interval.tv_nsec);
89 }
90 }
91
92 void stop() {
93 running_.store(false, std::memory_order_release);
94 }
95
96 const LatencyStats& get_stats() const { return stats_; }
97 uint64_t missed_deadlines() const { return missed_deadlines_; }
98
99private:
100 static uint64_t get_time_ns() {
101 struct timespec ts;
102 clock_gettime(CLOCK_MONOTONIC, &ts);
103 return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
104 }
105
106 struct itimerspec ts;
107};
108
109// Example: Real-time market data processor
110class RTMarketDataProcessor {
111private:
112 RTEventLoop event_loop_;
113 RTMemoryPool<MarketUpdate, 1024> update_pool_;
114
115 struct MarketUpdate {
116 uint64_t timestamp;
117 uint32_t symbol_id;
118 double price;
119 uint32_t size;
120 };
121
122public:
123 RTMarketDataProcessor(uint64_t period_ns)
124 : event_loop_(period_ns) {}
125
126 void run() {
127 event_loop_.run([this](uint64_t now, uint64_t latency) {
128 // Check if we exceeded deadline
129 if (latency > 10000) { // 10μs deadline
130 std::cerr << "Deadline miss: " << latency << "ns\n";
131 }
132
133 // Process market data
134 process_updates(now);
135 });
136 }
137
138 void stop() {
139 event_loop_.stop();
140 }
141
142private:
143 void process_updates(uint64_t now) {
144 // Real-time processing (no allocations, no blocking)
145 // Update order books, trigger strategies, etc.
146 }
147};
1481// SPSC queue with bounded latency
2template<typename T, size_t N>
3class RTQueue {
4 static_assert((N & (N - 1)) == 0, "N must be power of 2");
5
6private:
7 struct alignas(64) Slot {
8 std::atomic<uint32_t> sequence{0};
9 T data;
10 };
11
12 alignas(64) Slot buffer_[N];
13 alignas(64) std::atomic<uint32_t> write_pos_{0};
14 alignas(64) std::atomic<uint32_t> read_pos_{0};
15
16public:
17 bool try_push(const T& item) {
18 uint32_t pos = write_pos_.load(std::memory_order_relaxed);
19 Slot& slot = buffer_[pos & (N - 1)];
20 uint32_t seq = slot.sequence.load(std::memory_order_acquire);
21
22 if (seq != pos) {
23 return false; // Full
24 }
25
26 slot.data = item;
27 slot.sequence.store(pos + 1, std::memory_order_release);
28 write_pos_.store(pos + 1, std::memory_order_release);
29
30 return true;
31 }
32
33 bool try_pop(T& item) {
34 uint32_t pos = read_pos_.load(std::memory_order_relaxed);
35 Slot& slot = buffer_[pos & (N - 1)];
36 uint32_t seq = slot.sequence.load(std::memory_order_acquire);
37
38 if (seq != pos + 1) {
39 return false; // Empty
40 }
41
42 item = slot.data;
43 slot.sequence.store(pos + N, std::memory_order_release);
44 read_pos_.store(pos + 1, std::memory_order_release);
45
46 return true;
47 }
48};
491use std::thread;
2use std::time::Duration;
3use libc::{self, c_int, c_void, cpu_set_t};
4
5struct RealTimeThread {
6 priority: i32,
7 cpu_core: usize,
8}
9
10impl RealTimeThread {
11 fn new(priority: i32, cpu_core: usize) -> Self {
12 assert!(priority >= 1 && priority <= 99, "Priority must be 1-99");
13 RealTimeThread { priority, cpu_core }
14 }
15
16 fn spawn<F>(self, f: F) -> std::io::Result<thread::JoinHandle<()>>
17 where
18 F: FnOnce() + Send + 'static,
19 {
20 thread::Builder::new()
21 .name(format!("rt-{}", self.cpu_core))
22 .spawn(move || {
23 // Lock memory
24 unsafe {
25 libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE);
26 }
27
28 // Set CPU affinity
29 unsafe {
30 let mut cpuset: cpu_set_t = std::mem::zeroed();
31 libc::CPU_ZERO(&mut cpuset);
32 libc::CPU_SET(self.cpu_core, &mut cpuset);
33
34 libc::sched_setaffinity(
35 0,
36 std::mem::size_of::<cpu_set_t>(),
37 &cpuset,
38 );
39 }
40
41 // Set realtime priority
42 unsafe {
43 let param = libc::sched_param {
44 sched_priority: self.priority,
45 };
46
47 libc::sched_setscheduler(
48 0,
49 libc::SCHED_FIFO,
50 ¶m,
51 );
52 }
53
54 // Run user function
55 f();
56 })
57 }
58}
59
60// Pre-allocated memory pool
61struct RTMemoryPool<T, const N: usize> {
62 pool: [Option<T>; N],
63 free_mask: std::sync::atomic::AtomicU64,
64}
65
66impl<T: Default + Clone, const N: usize> RTMemoryPool<T, N> {
67 fn new() -> Self {
68 assert!(N <= 64, "Pool size limited to 64 for bitmask");
69
70 RTMemoryPool {
71 pool: [(); N].map(|_| None),
72 free_mask: std::sync::atomic::AtomicU64::new((1u64 << N) - 1),
73 }
74 }
75
76 fn allocate(&self) -> Option<usize> {
77 let mut mask = self.free_mask.load(std::sync::atomic::Ordering::Relaxed);
78
79 while mask != 0 {
80 let idx = mask.trailing_zeros() as usize;
81 let new_mask = mask & !(1u64 << idx);
82
83 match self.free_mask.compare_exchange_weak(
84 mask,
85 new_mask,
86 std::sync::atomic::Ordering::Acquire,
87 std::sync::atomic::Ordering::Relaxed,
88 ) {
89 Ok(_) => return Some(idx),
90 Err(actual) => mask = actual,
91 }
92 }
93
94 None
95 }
96
97 fn deallocate(&self, idx: usize) {
98 let mut mask = self.free_mask.load(std::sync::atomic::Ordering::Relaxed);
99 let new_bit = 1u64 << idx;
100
101 loop {
102 let new_mask = mask | new_bit;
103 match self.free_mask.compare_exchange_weak(
104 mask,
105 new_mask,
106 std::sync::atomic::Ordering::Release,
107 std::sync::atomic::Ordering::Relaxed,
108 ) {
109 Ok(_) => break,
110 Err(actual) => mask = actual,
111 }
112 }
113 }
114}
1151use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::os::unix::io::AsRawFd;
3use std::time::{Duration, Instant};
4
5struct RTEventLoop {
6 timer_fd: i32,
7 running: AtomicBool,
8 missed_deadlines: AtomicU64,
9 stats: LatencyStats,
10}
11
12struct LatencyStats {
13 min_ns: AtomicU64,
14 max_ns: AtomicU64,
15 total_ns: AtomicU64,
16 count: AtomicU64,
17}
18
19impl LatencyStats {
20 fn new() -> Self {
21 LatencyStats {
22 min_ns: AtomicU64::new(u64::MAX),
23 max_ns: AtomicU64::new(0),
24 total_ns: AtomicU64::new(0),
25 count: AtomicU64::new(0),
26 }
27 }
28
29 fn record(&self, latency_ns: u64) {
30 // Update min
31 let mut current_min = self.min_ns.load(Ordering::Relaxed);
32 while latency_ns < current_min {
33 match self.min_ns.compare_exchange_weak(
34 current_min,
35 latency_ns,
36 Ordering::Relaxed,
37 Ordering::Relaxed,
38 ) {
39 Ok(_) => break,
40 Err(actual) => current_min = actual,
41 }
42 }
43
44 // Update max
45 let mut current_max = self.max_ns.load(Ordering::Relaxed);
46 while latency_ns > current_max {
47 match self.max_ns.compare_exchange_weak(
48 current_max,
49 latency_ns,
50 Ordering::Relaxed,
51 Ordering::Relaxed,
52 ) {
53 Ok(_) => break,
54 Err(actual) => current_max = actual,
55 }
56 }
57
58 self.total_ns.fetch_add(latency_ns, Ordering::Relaxed);
59 self.count.fetch_add(1, Ordering::Relaxed);
60 }
61
62 fn avg_ns(&self) -> f64 {
63 let total = self.total_ns.load(Ordering::Relaxed);
64 let count = self.count.load(Ordering::Relaxed);
65 if count > 0 {
66 total as f64 / count as f64
67 } else {
68 0.0
69 }
70 }
71}
72
73impl RTEventLoop {
74 fn new(period_ns: u64) -> std::io::Result<Self> {
75 unsafe {
76 let timer_fd = libc::timerfd_create(
77 libc::CLOCK_MONOTONIC,
78 libc::TFD_NONBLOCK,
79 );
80
81 if timer_fd < 0 {
82 return Err(std::io::Error::last_os_error());
83 }
84
85 let mut ts = libc::itimerspec {
86 it_value: libc::timespec {
87 tv_sec: (period_ns / 1_000_000_000) as i64,
88 tv_nsec: (period_ns % 1_000_000_000) as i64,
89 },
90 it_interval: libc::timespec {
91 tv_sec: (period_ns / 1_000_000_000) as i64,
92 tv_nsec: (period_ns % 1_000_000_000) as i64,
93 },
94 };
95
96 if libc::timerfd_settime(timer_fd, 0, &ts, std::ptr::null_mut()) < 0 {
97 libc::close(timer_fd);
98 return Err(std::io::Error::last_os_error());
99 }
100
101 Ok(RTEventLoop {
102 timer_fd,
103 running: AtomicBool::new(true),
104 missed_deadlines: AtomicU64::new(0),
105 stats: LatencyStats::new(),
106 })
107 }
108 }
109
110 fn run<F>(&self, mut process_tick: F)
111 where
112 F: FnMut(u64, u64),
113 {
114 let mut expected_time = get_time_ns();
115
116 while self.running.load(Ordering::Relaxed) {
117 let mut expirations: u64 = 0;
118
119 unsafe {
120 let ret = libc::read(
121 self.timer_fd,
122 &mut expirations as *mut u64 as *mut c_void,
123 std::mem::size_of::<u64>(),
124 );
125
126 if ret < 0 {
127 let err = std::io::Error::last_os_error();
128 if err.kind() == std::io::ErrorKind::WouldBlock {
129 continue;
130 }
131 break;
132 }
133 }
134
135 let now = get_time_ns();
136 let latency = now - expected_time;
137
138 if expirations > 1 {
139 self.missed_deadlines.fetch_add(
140 expirations - 1,
141 Ordering::Relaxed,
142 );
143 }
144
145 self.stats.record(latency);
146
147 // Process tick
148 process_tick(now, latency);
149
150 // Update expected time
151 expected_time = now; // Simplified
152 }
153 }
154
155 fn stop(&self) {
156 self.running.store(false, Ordering::Release);
157 }
158}
159
160impl Drop for RTEventLoop {
161 fn drop(&mut self) {
162 unsafe {
163 libc::close(self.timer_fd);
164 }
165 }
166}
167
168fn get_time_ns() -> u64 {
169 unsafe {
170 let mut ts = libc::timespec {
171 tv_sec: 0,
172 tv_nsec: 0,
173 };
174 libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
175 (ts.tv_sec as u64) * 1_000_000_000 + (ts.tv_nsec as u64)
176 }
177}
178From our production trading system:
1Metric Standard Linux PREEMPT_RT
2────────────────────────────────────────────────────
3Min latency 0.8 μs 0.6 μs
4P50 latency 2.1 μs 1.2 μs
5P99 latency 45.3 μs 3.8 μs
6P99.9 latency 328 μs 12.4 μs
7Max latency 2.1 ms 42 μs
8Jitter (std dev) 18.2 μs 1.9 μs
91Configuration P99 Latency Max Latency
2─────────────────────────────────────────────────
3No isolation 18.3 μs 1.2 ms
4isolcpus only 8.7 μs 124 μs
5Full RT config 3.8 μs 42 μs
61// NEVER in RT path:
2malloc() / free() // Non-deterministic
3std::cout / printf() // I/O, locks
4std::vector::push_back() // May allocate
5std::mutex // May block
6sleep() / usleep() // Unpredictable
7
8// USE instead:
9Pre-allocated pools
10Lock-free structures
11Atomic operations
12Bounded spinning
13Fixed-size buffers
141// Pre-allocate everything
2const POOL_SIZE: usize = 1024;
3let mut order_pool: [Order; POOL_SIZE] = [Order::default(); POOL_SIZE];
4
5// Use stack allocation
6let buffer = [0u8; 4096];
7
8// Lock all memory
9unsafe {
10 libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE);
11}
121// Track deadline misses
2struct RTMetrics {
3 std::atomic<uint64_t> ticks{0};
4 std::atomic<uint64_t> deadline_misses{0};
5 std::atomic<uint64_t> max_latency_ns{0};
6
7 double miss_rate() const {
8 uint64_t total = ticks.load();
9 uint64_t misses = deadline_misses.load();
10 return total > 0 ? (double)misses / total : 0.0;
11 }
12};
13After years of real-time Linux development:
Real-time Linux works, but requires discipline. One malloc() in the hot path can ruin your latency distribution.
Master real-time Linux—it's the foundation for building deterministic low-latency systems.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.