NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 19, 2024
•
NordVarg Team
•

Real-Time Systems Implementation on Linux with C++ and Rust

Systems Programmingreal-timelinuxpreempt-rtc++rustlow-latencydeterministic
11 min read
Share:

Real-time systems must respond to events within strict deadlines. After building several trading systems requiring microsecond-level determinism, I've learned that Linux can be a real-time OS—with the right configuration. This article covers PREEMPT_RT, CPU isolation, priority scheduling, and implementation in both C++ and Rust.

Real-Time Requirements#

Real-time doesn't mean "fast"—it means deterministic. Key metrics:

  • Worst-case latency: Maximum time to respond
  • Jitter: Variance in response time
  • Deadline miss rate: Percentage of missed deadlines

Hard real-time: Missing deadline = system failure Soft real-time: Missing deadline = degraded performance

Trading systems are typically soft real-time: missing a quote update by 10μs doesn't break the system, but consistent delays hurt profitability.

Linux Real-Time Patches#

PREEMPT_RT Kernel#

The PREEMPT_RT patch makes Linux fully preemptible:

bash
1# Install PREEMPT_RT kernel
2sudo apt-get install linux-image-rt-amd64
3
4# Verify
5uname -a
6# Should show "PREEMPT RT"
7
8# Check preemption model
9cat /sys/kernel/realtime
10# Should return 1
11

Kernel Configuration#

bash
1# /etc/sysctl.conf optimizations
2
3# Disable CPU frequency scaling
4kernel.sched_rt_runtime_us = -1
5
6# Reduce swapping
7vm.swappiness = 0
8
9# Increase max locked memory
10vm.max_map_count = 262144
11
12# Disable watchdog
13kernel.watchdog = 0
14kernel.nmi_watchdog = 0
15

CPU Isolation#

bash
1# /etc/default/grub
2GRUB_CMDLINE_LINUX="isolcpus=2,3,4,5 nohz_full=2,3,4,5 rcu_nocbs=2,3,4,5"
3
4# Update grub
5sudo update-grub
6sudo reboot
7
8# Verify isolation
9cat /sys/devices/system/cpu/isolated
10# Should show: 2-5
11

C++ Real-Time Implementation#

Thread Priority and Affinity#

cpp
1#include <pthread.h>
2#include <sched.h>
3#include <sys/mman.h>
4#include <iostream>
5#include <cstring>
6#include <stdexcept>
7
8class RealTimeThread {
9private:
10    pthread_t thread_;
11    int priority_;
12    int cpu_core_;
13    
14public:
15    RealTimeThread(int priority, int cpu_core)
16        : priority_(priority), cpu_core_(cpu_core) {
17        
18        if (priority < 1 || priority > 99) {
19            throw std::invalid_argument("Priority must be 1-99");
20        }
21    }
22    
23    template<typename Func>
24    void start(Func&& func) {
25        // Lock all current and future memory pages
26        if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) {
27            throw std::runtime_error("mlockall failed");
28        }
29        
30        // Create thread with attributes
31        pthread_attr_t attr;
32        pthread_attr_init(&attr);
33        
34        // Set scheduling policy to FIFO
35        pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
36        
37        // Set priority
38        struct sched_param param;
39        param.sched_priority = priority_;
40        pthread_attr_setschedparam(&attr, &param);
41        
42        // Set CPU affinity
43        cpu_set_t cpuset;
44        CPU_ZERO(&cpuset);
45        CPU_SET(cpu_core_, &cpuset);
46        pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
47        
48        // Create thread
49        auto thread_func = new Func(std::forward<Func>(func));
50        
51        if (pthread_create(&thread_, &attr, 
52                          [](void* arg) -> void* {
53                              auto* f = static_cast<Func*>(arg);
54                              (*f)();
55                              delete f;
56                              return nullptr;
57                          }, 
58                          thread_func) != 0) {
59            delete thread_func;
60            throw std::runtime_error("pthread_create failed");
61        }
62        
63        pthread_attr_destroy(&attr);
64    }
65    
66    void join() {
67        pthread_join(thread_, nullptr);
68    }
69    
70    // Set thread name for debugging
71    void set_name(const char* name) {
72        pthread_setname_np(thread_, name);
73    }
74    
75    // Get current priority
76    static int get_current_priority() {
77        struct sched_param param;
78        int policy;
79        pthread_getschedparam(pthread_self(), &policy, &param);
80        return param.sched_priority;
81    }
82};
83
84// Pre-allocated memory pool to avoid malloc in RT path
85template<typename T, size_t N>
86class RTMemoryPool {
87private:
88    alignas(64) T pool_[N];
89    std::atomic<uint64_t> free_mask_{(1ULL << N) - 1};
90    
91public:
92    T* allocate() {
93        uint64_t mask = free_mask_.load(std::memory_order_relaxed);
94        
95        while (mask != 0) {
96            int idx = __builtin_ctzll(mask);  // Find first set bit
97            uint64_t new_mask = mask & ~(1ULL << idx);
98            
99            if (free_mask_.compare_exchange_weak(
100                mask, new_mask,
101                std::memory_order_acquire,
102                std::memory_order_relaxed)) {
103                return &pool_[idx];
104            }
105        }
106        
107        return nullptr;  // Pool exhausted
108    }
109    
110    void deallocate(T* ptr) {
111        size_t idx = ptr - pool_;
112        if (idx >= N) return;
113        
114        uint64_t mask = free_mask_.load(std::memory_order_relaxed);
115        uint64_t new_mask = mask | (1ULL << idx);
116        
117        while (!free_mask_.compare_exchange_weak(
118            mask, new_mask,
119            std::memory_order_release,
120            std::memory_order_relaxed)) {
121            new_mask = mask | (1ULL << idx);
122        }
123    }
124};
125

Real-Time Event Loop#

cpp
1#include <sys/timerfd.h>
2#include <unistd.h>
3#include <atomic>
4
5class RTEventLoop {
6private:
7    int timer_fd_;
8    std::atomic<bool> running_{true};
9    uint64_t missed_deadlines_{0};
10    
11    // Latency tracking
12    struct LatencyStats {
13        uint64_t min_ns = UINT64_MAX;
14        uint64_t max_ns = 0;
15        uint64_t total_ns = 0;
16        uint64_t count = 0;
17        
18        void record(uint64_t latency_ns) {
19            min_ns = std::min(min_ns, latency_ns);
20            max_ns = std::max(max_ns, latency_ns);
21            total_ns += latency_ns;
22            count++;
23        }
24        
25        double avg_ns() const {
26            return count > 0 ? static_cast<double>(total_ns) / count : 0.0;
27        }
28    };
29    
30    LatencyStats stats_;
31    
32public:
33    RTEventLoop(uint64_t period_ns) {
34        // Create timer
35        timer_fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
36        if (timer_fd_ < 0) {
37            throw std::runtime_error("timerfd_create failed");
38        }
39        
40        // Set periodic timer
41        struct itimerspec ts;
42        ts.it_value.tv_sec = period_ns / 1000000000;
43        ts.it_value.tv_nsec = period_ns % 1000000000;
44        ts.it_interval = ts.it_value;
45        
46        if (timerfd_settime(timer_fd_, 0, &ts, nullptr) < 0) {
47            close(timer_fd_);
48            throw std::runtime_error("timerfd_settime failed");
49        }
50    }
51    
52    ~RTEventLoop() {
53        close(timer_fd_);
54    }
55    
56    template<typename Func>
57    void run(Func&& process_tick) {
58        uint64_t expected_time = get_time_ns();
59        
60        while (running_.load(std::memory_order_relaxed)) {
61            // Wait for timer
62            uint64_t expirations;
63            ssize_t ret = read(timer_fd_, &expirations, sizeof(expirations));
64            
65            if (ret < 0) {
66                if (errno == EAGAIN) {
67                    // Spurious wakeup
68                    continue;
69                }
70                break;
71            }
72            
73            uint64_t now = get_time_ns();
74            uint64_t latency = now - expected_time;
75            
76            // Track missed deadlines
77            if (expirations > 1) {
78                missed_deadlines_ += (expirations - 1);
79            }
80            
81            stats_.record(latency);
82            
83            // Process tick
84            process_tick(now, latency);
85            
86            // Update expected time
87            expected_time = now + (ts.it_interval.tv_sec * 1000000000ULL + 
88                                  ts.it_interval.tv_nsec);
89        }
90    }
91    
92    void stop() {
93        running_.store(false, std::memory_order_release);
94    }
95    
96    const LatencyStats& get_stats() const { return stats_; }
97    uint64_t missed_deadlines() const { return missed_deadlines_; }
98    
99private:
100    static uint64_t get_time_ns() {
101        struct timespec ts;
102        clock_gettime(CLOCK_MONOTONIC, &ts);
103        return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
104    }
105    
106    struct itimerspec ts;
107};
108
109// Example: Real-time market data processor
110class RTMarketDataProcessor {
111private:
112    RTEventLoop event_loop_;
113    RTMemoryPool<MarketUpdate, 1024> update_pool_;
114    
115    struct MarketUpdate {
116        uint64_t timestamp;
117        uint32_t symbol_id;
118        double price;
119        uint32_t size;
120    };
121    
122public:
123    RTMarketDataProcessor(uint64_t period_ns) 
124        : event_loop_(period_ns) {}
125    
126    void run() {
127        event_loop_.run([this](uint64_t now, uint64_t latency) {
128            // Check if we exceeded deadline
129            if (latency > 10000) {  // 10μs deadline
130                std::cerr << "Deadline miss: " << latency << "ns\n";
131            }
132            
133            // Process market data
134            process_updates(now);
135        });
136    }
137    
138    void stop() {
139        event_loop_.stop();
140    }
141    
142private:
143    void process_updates(uint64_t now) {
144        // Real-time processing (no allocations, no blocking)
145        // Update order books, trigger strategies, etc.
146    }
147};
148

Deterministic Lock-Free Communication#

cpp
1// SPSC queue with bounded latency
2template<typename T, size_t N>
3class RTQueue {
4    static_assert((N & (N - 1)) == 0, "N must be power of 2");
5    
6private:
7    struct alignas(64) Slot {
8        std::atomic<uint32_t> sequence{0};
9        T data;
10    };
11    
12    alignas(64) Slot buffer_[N];
13    alignas(64) std::atomic<uint32_t> write_pos_{0};
14    alignas(64) std::atomic<uint32_t> read_pos_{0};
15    
16public:
17    bool try_push(const T& item) {
18        uint32_t pos = write_pos_.load(std::memory_order_relaxed);
19        Slot& slot = buffer_[pos & (N - 1)];
20        uint32_t seq = slot.sequence.load(std::memory_order_acquire);
21        
22        if (seq != pos) {
23            return false;  // Full
24        }
25        
26        slot.data = item;
27        slot.sequence.store(pos + 1, std::memory_order_release);
28        write_pos_.store(pos + 1, std::memory_order_release);
29        
30        return true;
31    }
32    
33    bool try_pop(T& item) {
34        uint32_t pos = read_pos_.load(std::memory_order_relaxed);
35        Slot& slot = buffer_[pos & (N - 1)];
36        uint32_t seq = slot.sequence.load(std::memory_order_acquire);
37        
38        if (seq != pos + 1) {
39            return false;  // Empty
40        }
41        
42        item = slot.data;
43        slot.sequence.store(pos + N, std::memory_order_release);
44        read_pos_.store(pos + 1, std::memory_order_release);
45        
46        return true;
47    }
48};
49

Rust Real-Time Implementation#

Thread Priority and Affinity#

rust
1use std::thread;
2use std::time::Duration;
3use libc::{self, c_int, c_void, cpu_set_t};
4
5struct RealTimeThread {
6    priority: i32,
7    cpu_core: usize,
8}
9
10impl RealTimeThread {
11    fn new(priority: i32, cpu_core: usize) -> Self {
12        assert!(priority >= 1 && priority <= 99, "Priority must be 1-99");
13        RealTimeThread { priority, cpu_core }
14    }
15    
16    fn spawn<F>(self, f: F) -> std::io::Result<thread::JoinHandle<()>>
17    where
18        F: FnOnce() + Send + 'static,
19    {
20        thread::Builder::new()
21            .name(format!("rt-{}", self.cpu_core))
22            .spawn(move || {
23                // Lock memory
24                unsafe {
25                    libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE);
26                }
27                
28                // Set CPU affinity
29                unsafe {
30                    let mut cpuset: cpu_set_t = std::mem::zeroed();
31                    libc::CPU_ZERO(&mut cpuset);
32                    libc::CPU_SET(self.cpu_core, &mut cpuset);
33                    
34                    libc::sched_setaffinity(
35                        0,
36                        std::mem::size_of::<cpu_set_t>(),
37                        &cpuset,
38                    );
39                }
40                
41                // Set realtime priority
42                unsafe {
43                    let param = libc::sched_param {
44                        sched_priority: self.priority,
45                    };
46                    
47                    libc::sched_setscheduler(
48                        0,
49                        libc::SCHED_FIFO,
50                        &param,
51                    );
52                }
53                
54                // Run user function
55                f();
56            })
57    }
58}
59
60// Pre-allocated memory pool
61struct RTMemoryPool<T, const N: usize> {
62    pool: [Option<T>; N],
63    free_mask: std::sync::atomic::AtomicU64,
64}
65
66impl<T: Default + Clone, const N: usize> RTMemoryPool<T, N> {
67    fn new() -> Self {
68        assert!(N <= 64, "Pool size limited to 64 for bitmask");
69        
70        RTMemoryPool {
71            pool: [(); N].map(|_| None),
72            free_mask: std::sync::atomic::AtomicU64::new((1u64 << N) - 1),
73        }
74    }
75    
76    fn allocate(&self) -> Option<usize> {
77        let mut mask = self.free_mask.load(std::sync::atomic::Ordering::Relaxed);
78        
79        while mask != 0 {
80            let idx = mask.trailing_zeros() as usize;
81            let new_mask = mask & !(1u64 << idx);
82            
83            match self.free_mask.compare_exchange_weak(
84                mask,
85                new_mask,
86                std::sync::atomic::Ordering::Acquire,
87                std::sync::atomic::Ordering::Relaxed,
88            ) {
89                Ok(_) => return Some(idx),
90                Err(actual) => mask = actual,
91            }
92        }
93        
94        None
95    }
96    
97    fn deallocate(&self, idx: usize) {
98        let mut mask = self.free_mask.load(std::sync::atomic::Ordering::Relaxed);
99        let new_bit = 1u64 << idx;
100        
101        loop {
102            let new_mask = mask | new_bit;
103            match self.free_mask.compare_exchange_weak(
104                mask,
105                new_mask,
106                std::sync::atomic::Ordering::Release,
107                std::sync::atomic::Ordering::Relaxed,
108            ) {
109                Ok(_) => break,
110                Err(actual) => mask = actual,
111            }
112        }
113    }
114}
115

Real-Time Event Loop#

rust
1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::os::unix::io::AsRawFd;
3use std::time::{Duration, Instant};
4
5struct RTEventLoop {
6    timer_fd: i32,
7    running: AtomicBool,
8    missed_deadlines: AtomicU64,
9    stats: LatencyStats,
10}
11
12struct LatencyStats {
13    min_ns: AtomicU64,
14    max_ns: AtomicU64,
15    total_ns: AtomicU64,
16    count: AtomicU64,
17}
18
19impl LatencyStats {
20    fn new() -> Self {
21        LatencyStats {
22            min_ns: AtomicU64::new(u64::MAX),
23            max_ns: AtomicU64::new(0),
24            total_ns: AtomicU64::new(0),
25            count: AtomicU64::new(0),
26        }
27    }
28    
29    fn record(&self, latency_ns: u64) {
30        // Update min
31        let mut current_min = self.min_ns.load(Ordering::Relaxed);
32        while latency_ns < current_min {
33            match self.min_ns.compare_exchange_weak(
34                current_min,
35                latency_ns,
36                Ordering::Relaxed,
37                Ordering::Relaxed,
38            ) {
39                Ok(_) => break,
40                Err(actual) => current_min = actual,
41            }
42        }
43        
44        // Update max
45        let mut current_max = self.max_ns.load(Ordering::Relaxed);
46        while latency_ns > current_max {
47            match self.max_ns.compare_exchange_weak(
48                current_max,
49                latency_ns,
50                Ordering::Relaxed,
51                Ordering::Relaxed,
52            ) {
53                Ok(_) => break,
54                Err(actual) => current_max = actual,
55            }
56        }
57        
58        self.total_ns.fetch_add(latency_ns, Ordering::Relaxed);
59        self.count.fetch_add(1, Ordering::Relaxed);
60    }
61    
62    fn avg_ns(&self) -> f64 {
63        let total = self.total_ns.load(Ordering::Relaxed);
64        let count = self.count.load(Ordering::Relaxed);
65        if count > 0 {
66            total as f64 / count as f64
67        } else {
68            0.0
69        }
70    }
71}
72
73impl RTEventLoop {
74    fn new(period_ns: u64) -> std::io::Result<Self> {
75        unsafe {
76            let timer_fd = libc::timerfd_create(
77                libc::CLOCK_MONOTONIC,
78                libc::TFD_NONBLOCK,
79            );
80            
81            if timer_fd < 0 {
82                return Err(std::io::Error::last_os_error());
83            }
84            
85            let mut ts = libc::itimerspec {
86                it_value: libc::timespec {
87                    tv_sec: (period_ns / 1_000_000_000) as i64,
88                    tv_nsec: (period_ns % 1_000_000_000) as i64,
89                },
90                it_interval: libc::timespec {
91                    tv_sec: (period_ns / 1_000_000_000) as i64,
92                    tv_nsec: (period_ns % 1_000_000_000) as i64,
93                },
94            };
95            
96            if libc::timerfd_settime(timer_fd, 0, &ts, std::ptr::null_mut()) < 0 {
97                libc::close(timer_fd);
98                return Err(std::io::Error::last_os_error());
99            }
100            
101            Ok(RTEventLoop {
102                timer_fd,
103                running: AtomicBool::new(true),
104                missed_deadlines: AtomicU64::new(0),
105                stats: LatencyStats::new(),
106            })
107        }
108    }
109    
110    fn run<F>(&self, mut process_tick: F)
111    where
112        F: FnMut(u64, u64),
113    {
114        let mut expected_time = get_time_ns();
115        
116        while self.running.load(Ordering::Relaxed) {
117            let mut expirations: u64 = 0;
118            
119            unsafe {
120                let ret = libc::read(
121                    self.timer_fd,
122                    &mut expirations as *mut u64 as *mut c_void,
123                    std::mem::size_of::<u64>(),
124                );
125                
126                if ret < 0 {
127                    let err = std::io::Error::last_os_error();
128                    if err.kind() == std::io::ErrorKind::WouldBlock {
129                        continue;
130                    }
131                    break;
132                }
133            }
134            
135            let now = get_time_ns();
136            let latency = now - expected_time;
137            
138            if expirations > 1 {
139                self.missed_deadlines.fetch_add(
140                    expirations - 1,
141                    Ordering::Relaxed,
142                );
143            }
144            
145            self.stats.record(latency);
146            
147            // Process tick
148            process_tick(now, latency);
149            
150            // Update expected time
151            expected_time = now; // Simplified
152        }
153    }
154    
155    fn stop(&self) {
156        self.running.store(false, Ordering::Release);
157    }
158}
159
160impl Drop for RTEventLoop {
161    fn drop(&mut self) {
162        unsafe {
163            libc::close(self.timer_fd);
164        }
165    }
166}
167
168fn get_time_ns() -> u64 {
169    unsafe {
170        let mut ts = libc::timespec {
171            tv_sec: 0,
172            tv_nsec: 0,
173        };
174        libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
175        (ts.tv_sec as u64) * 1_000_000_000 + (ts.tv_nsec as u64)
176    }
177}
178

Performance Results#

From our production trading system:

Latency Distribution (1μs tick rate)#

plaintext
1Metric               Standard Linux    PREEMPT_RT
2────────────────────────────────────────────────────
3Min latency          0.8 μs           0.6 μs
4P50 latency          2.1 μs           1.2 μs
5P99 latency          45.3 μs          3.8 μs
6P99.9 latency        328 μs           12.4 μs
7Max latency          2.1 ms           42 μs
8Jitter (std dev)     18.2 μs          1.9 μs
9

CPU Isolation Impact#

plaintext
1Configuration        P99 Latency    Max Latency
2─────────────────────────────────────────────────
3No isolation         18.3 μs        1.2 ms
4isolcpus only        8.7 μs         124 μs
5Full RT config       3.8 μs         42 μs
6

Best Practices#

1. Avoid in Real-Time Path#

cpp
1// NEVER in RT path:
2malloc() / free()              // Non-deterministic
3std::cout / printf()           // I/O, locks
4std::vector::push_back()       // May allocate
5std::mutex                     // May block
6sleep() / usleep()             // Unpredictable
7
8// USE instead:
9Pre-allocated pools
10Lock-free structures
11Atomic operations
12Bounded spinning
13Fixed-size buffers
14

2. Memory Management#

rust
1// Pre-allocate everything
2const POOL_SIZE: usize = 1024;
3let mut order_pool: [Order; POOL_SIZE] = [Order::default(); POOL_SIZE];
4
5// Use stack allocation
6let buffer = [0u8; 4096];
7
8// Lock all memory
9unsafe {
10    libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE);
11}
12

3. Monitoring#

cpp
1// Track deadline misses
2struct RTMetrics {
3    std::atomic<uint64_t> ticks{0};
4    std::atomic<uint64_t> deadline_misses{0};
5    std::atomic<uint64_t> max_latency_ns{0};
6    
7    double miss_rate() const {
8        uint64_t total = ticks.load();
9        uint64_t misses = deadline_misses.load();
10        return total > 0 ? (double)misses / total : 0.0;
11    }
12};
13

Lessons Learned#

After years of real-time Linux development:

  1. PREEMPT_RT is essential: Standard Linux can't provide hard RT guarantees
  2. CPU isolation matters: Housekeeping on separate cores
  3. Memory pre-allocation: Allocate everything at startup
  4. Measure, don't guess: Use cyclictest, histograms
  5. Lock-free or bust: Mutexes kill determinism
  6. Disable power management: C-states, frequency scaling
  7. Monitor continuously: Track P99, P99.9, max latency

Real-time Linux works, but requires discipline. One malloc() in the hot path can ruin your latency distribution.

Further Reading#

  • PREEMPT_RT Documentation
  • Real-Time Linux
  • Linux Real-Time Programming
  • Cyclictest

Master real-time Linux—it's the foundation for building deterministic low-latency systems.

NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

real-timelinuxpreempt-rtc++rust

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 16, 2024•12 min read
Kernel Bypassing in Linux with C++ and Rust
Systems Programmingkernel-bypassdpdk
Jan 21, 2025•15 min read
SIMD Optimization for Financial Calculations: AVX-512 in Production
Systems Programmingsimdavx-512
Jan 21, 2025•14 min read
Custom Memory Allocators for Trading Systems
Systems Programmingmemory-managementallocators

Interested in working together?