Building a High-Frequency Market Data Feed: Architecture and Optimization
Designing and implementing ultra-low latency market data feeds that process millions of messages per second with microsecond precision
Designing and implementing ultra-low latency market data feeds that process millions of messages per second with microsecond precision
Market data feeds are the lifeblood of any trading system. In high-frequency trading, the speed at which you process market data directly impacts profitability. A 10-microsecond advantage can mean the difference between capturing alpha and missing opportunities.
We've built market data systems processing 5+ million messages per second with sub-microsecond jitter. This post shares the architecture, optimization techniques, and hard-learned lessons from building production HFT data feeds.
Modern exchanges produce enormous data volumes:
| Exchange | Messages/Second | Peak Rate | Data Rate |
|---|---|---|---|
| NASDAQ | 400,000 | 2M+ | 800 MB/s |
| NYSE | 300,000 | 1.5M+ | 600 MB/s |
| CME Futures | 500,000 | 3M+ | 1 GB/s |
| Crypto Exchanges | 1M+ | 5M+ | 2+ GB/s |
Processing this data with minimal latency requires careful architecture and optimization at every layer.
1┌─────────────────────────────────────────────────────────┐
2│ Network Interface (NIC) │
3│ Kernel Bypass (DPDK/Solarflare) │
4└────────────────┬────────────────────────────────────────┘
5 │ Zero-copy packet capture
6┌────────────────▼────────────────────────────────────────┐
7│ Packet Parser (Lock-Free) │
8│ - Protocol decoding (UDP, TCP, Binary, FIX) │
9│ - Checksum validation │
10│ - Sequence number gap detection │
11└────────────────┬────────────────────────────────────────┘
12 │ Ring buffer (SPSC)
13┌────────────────▼────────────────────────────────────────┐
14│ Message Normalizer (Per-Exchange) │
15│ - Convert to unified format │
16│ - Symbol mapping │
17│ - Time synchronization │
18└────────────────┬────────────────────────────────────────┘
19 │ Lock-free queue
20┌────────────────▼────────────────────────────────────────┐
21│ Order Book Builder (Lock-Free) │
22│ - Maintain L2/L3 order book │
23│ - Calculate derived metrics │
24│ - Detect market events │
25└────────────────┬────────────────────────────────────────┘
26 │ Shared memory
27┌────────────────▼────────────────────────────────────────┐
28│ Strategy Consumers │
29│ - Trading strategies │
30│ - Risk management │
31│ - Analytics │
32└─────────────────────────────────────────────────────────┘
33Standard socket I/O is too slow. Use kernel bypass for direct hardware access:
1// dpdk_receiver.hpp - DPDK-based packet receiver
2#pragma once
3
4#include <rte_eal.h>
5#include <rte_ethdev.h>
6#include <rte_mbuf.h>
7#include <atomic>
8#include <array>
9
10class DPDKReceiver {
11public:
12 static constexpr uint16_t RX_RING_SIZE = 2048;
13 static constexpr uint16_t NUM_MBUFS = 8192;
14 static constexpr uint16_t MBUF_CACHE_SIZE = 250;
15 static constexpr uint16_t BURST_SIZE = 32;
16
17 DPDKReceiver(uint16_t port_id) : port_id_(port_id) {
18 initialize_dpdk();
19 configure_port();
20 }
21
22 // Receive burst of packets
23 template<typename Handler>
24 inline uint16_t receive_burst(Handler&& handler) {
25 std::array<rte_mbuf*, BURST_SIZE> bufs;
26
27 // Receive burst from NIC
28 const uint16_t nb_rx = rte_eth_rx_burst(
29 port_id_,
30 0, // queue_id
31 bufs.data(),
32 BURST_SIZE
33 );
34
35 // Process each packet
36 for (uint16_t i = 0; i < nb_rx; i++) {
37 auto* mbuf = bufs[i];
38
39 // Get packet data pointer
40 uint8_t* data = rte_pktmbuf_mtod(mbuf, uint8_t*);
41 uint16_t len = rte_pktmbuf_data_len(mbuf);
42
43 // Call handler (zero-copy)
44 handler(data, len);
45
46 // Free mbuf back to pool
47 rte_pktmbuf_free(mbuf);
48 }
49
50 packets_received_.fetch_add(nb_rx, std::memory_order_relaxed);
51 return nb_rx;
52 }
53
54 uint64_t get_packet_count() const {
55 return packets_received_.load(std::memory_order_relaxed);
56 }
57
58private:
59 void initialize_dpdk() {
60 // Initialize DPDK EAL
61 static bool initialized = false;
62 if (!initialized) {
63 const char* argv[] = {"app", "-l", "0-3", "-n", "4"};
64 int argc = sizeof(argv) / sizeof(argv[0]);
65
66 if (rte_eal_init(argc, const_cast<char**>(argv)) < 0) {
67 throw std::runtime_error("DPDK EAL initialization failed");
68 }
69 initialized = true;
70 }
71
72 // Create memory pool for packet buffers
73 mbuf_pool_ = rte_pktmbuf_pool_create(
74 "MBUF_POOL",
75 NUM_MBUFS,
76 MBUF_CACHE_SIZE,
77 0,
78 RTE_MBUF_DEFAULT_BUF_SIZE,
79 rte_socket_id()
80 );
81
82 if (!mbuf_pool_) {
83 throw std::runtime_error("Failed to create mbuf pool");
84 }
85 }
86
87 void configure_port() {
88 rte_eth_conf port_conf = {};
89
90 // Configure port
91 if (rte_eth_dev_configure(port_id_, 1, 0, &port_conf) < 0) {
92 throw std::runtime_error("Failed to configure port");
93 }
94
95 // Setup RX queue
96 if (rte_eth_rx_queue_setup(
97 port_id_,
98 0, // queue_id
99 RX_RING_SIZE,
100 rte_eth_dev_socket_id(port_id_),
101 nullptr,
102 mbuf_pool_
103 ) < 0) {
104 throw std::runtime_error("Failed to setup RX queue");
105 }
106
107 // Start device
108 if (rte_eth_dev_start(port_id_) < 0) {
109 throw std::runtime_error("Failed to start device");
110 }
111
112 // Enable promiscuous mode
113 rte_eth_promiscuous_enable(port_id_);
114 }
115
116 uint16_t port_id_;
117 rte_mempool* mbuf_pool_ = nullptr;
118 std::atomic<uint64_t> packets_received_{0};
119};
120Exchange protocols are typically binary for efficiency. Parse without copying:
1// nasdaq_itch_parser.hpp - NASDAQ ITCH 5.0 protocol parser
2#pragma once
3
4#include <cstdint>
5#include <string_view>
6#include <array>
7
8#pragma pack(push, 1)
9
10// Message types
11enum class ITCHMessageType : char {
12 SYSTEM_EVENT = 'S',
13 STOCK_DIRECTORY = 'R',
14 STOCK_TRADING_ACTION = 'H',
15 REG_SHO = 'Y',
16 MARKET_PARTICIPANT_POSITION = 'L',
17 ADD_ORDER = 'A',
18 ADD_ORDER_MPID = 'F',
19 ORDER_EXECUTED = 'E',
20 ORDER_EXECUTED_WITH_PRICE = 'C',
21 ORDER_CANCEL = 'X',
22 ORDER_DELETE = 'D',
23 ORDER_REPLACE = 'U',
24 TRADE = 'P',
25 CROSS_TRADE = 'Q',
26 BROKEN_TRADE = 'B',
27 NOII = 'I'
28};
29
30// Add Order message (Type A)
31struct AddOrderMessage {
32 uint16_t stock_locate;
33 uint16_t tracking_number;
34 uint64_t timestamp; // Nanoseconds since midnight
35 uint64_t order_reference_number;
36 char buy_sell_indicator;
37 uint32_t shares;
38 uint64_t stock; // Right-padded with spaces
39 uint32_t price; // Fixed point (4 decimal places)
40
41 // Helper to convert price to double
42 double get_price() const {
43 return __builtin_bswap32(price) / 10000.0;
44 }
45
46 // Helper to get timestamp in nanoseconds
47 uint64_t get_timestamp_ns() const {
48 return __builtin_bswap64(timestamp);
49 }
50
51 // Helper to get order ID
52 uint64_t get_order_id() const {
53 return __builtin_bswap64(order_reference_number);
54 }
55
56 // Helper to get shares
57 uint32_t get_shares() const {
58 return __builtin_bswap32(shares);
59 }
60
61 bool is_buy() const {
62 return buy_sell_indicator == 'B';
63 }
64};
65
66// Order Executed message (Type E)
67struct OrderExecutedMessage {
68 uint16_t stock_locate;
69 uint16_t tracking_number;
70 uint64_t timestamp;
71 uint64_t order_reference_number;
72 uint32_t executed_shares;
73 uint64_t match_number;
74
75 uint64_t get_timestamp_ns() const {
76 return __builtin_bswap64(timestamp);
77 }
78
79 uint64_t get_order_id() const {
80 return __builtin_bswap64(order_reference_number);
81 }
82
83 uint32_t get_shares() const {
84 return __builtin_bswap32(executed_shares);
85 }
86};
87
88// Order Cancel message (Type X)
89struct OrderCancelMessage {
90 uint16_t stock_locate;
91 uint16_t tracking_number;
92 uint64_t timestamp;
93 uint64_t order_reference_number;
94 uint32_t cancelled_shares;
95
96 uint64_t get_timestamp_ns() const {
97 return __builtin_bswap64(timestamp);
98 }
99
100 uint64_t get_order_id() const {
101 return __builtin_bswap64(order_reference_number);
102 }
103
104 uint32_t get_shares() const {
105 return __builtin_bswap32(cancelled_shares);
106 }
107};
108
109#pragma pack(pop)
110
111class ITCHParser {
112public:
113 template<typename Handler>
114 static inline void parse_message(const uint8_t* data, uint16_t len, Handler&& handler) {
115 if (len < 2) return;
116
117 // First byte is message length, second is type
118 uint16_t msg_len = __builtin_bswap16(*reinterpret_cast<const uint16_t*>(data));
119 ITCHMessageType msg_type = static_cast<ITCHMessageType>(data[2]);
120
121 // Validate length
122 if (len < msg_len + 2) return;
123
124 // Dispatch based on message type
125 const uint8_t* msg_data = data + 3;
126
127 switch (msg_type) {
128 case ITCHMessageType::ADD_ORDER:
129 if (msg_len >= sizeof(AddOrderMessage)) {
130 handler(*reinterpret_cast<const AddOrderMessage*>(msg_data));
131 }
132 break;
133
134 case ITCHMessageType::ORDER_EXECUTED:
135 if (msg_len >= sizeof(OrderExecutedMessage)) {
136 handler(*reinterpret_cast<const OrderExecutedMessage*>(msg_data));
137 }
138 break;
139
140 case ITCHMessageType::ORDER_CANCEL:
141 if (msg_len >= sizeof(OrderCancelMessage)) {
142 handler(*reinterpret_cast<const OrderCancelMessage*>(msg_data));
143 }
144 break;
145
146 // Handle other message types...
147 default:
148 break;
149 }
150 }
151};
152Maintain order book state with lock-free data structures:
1// order_book.hpp - Lock-free order book
2#pragma once
3
4#include <array>
5#include <atomic>
6#include <algorithm>
7#include <cstring>
8
9// Fixed-size order book with compile-time depth
10template<size_t MaxDepth = 10>
11class OrderBook {
12public:
13 struct Level {
14 double price = 0.0;
15 uint64_t size = 0;
16 uint32_t num_orders = 0;
17
18 bool is_valid() const { return price > 0.0; }
19 };
20
21 struct Snapshot {
22 std::array<Level, MaxDepth> bids;
23 std::array<Level, MaxDepth> asks;
24 uint64_t timestamp_ns;
25 uint32_t bid_count;
26 uint32_t ask_count;
27
28 double mid_price() const {
29 if (bid_count > 0 && ask_count > 0) {
30 return (bids[0].price + asks[0].price) / 2.0;
31 }
32 return 0.0;
33 }
34
35 double spread() const {
36 if (bid_count > 0 && ask_count > 0) {
37 return asks[0].price - bids[0].price;
38 }
39 return 0.0;
40 }
41
42 double spread_bps() const {
43 double mid = mid_price();
44 if (mid > 0.0) {
45 return (spread() / mid) * 10000.0;
46 }
47 return 0.0;
48 }
49 };
50
51 OrderBook() {
52 bids_.fill({});
53 asks_.fill({});
54 }
55
56 // Add order to book
57 void add_order(uint64_t order_id, bool is_buy, double price, uint64_t size) {
58 auto& levels = is_buy ? bids_ : asks_;
59
60 // Find price level
61 for (size_t i = 0; i < MaxDepth; i++) {
62 if (!levels[i].is_valid()) {
63 // New level
64 levels[i].price = price;
65 levels[i].size = size;
66 levels[i].num_orders = 1;
67 sort_levels(is_buy);
68 break;
69 } else if (levels[i].price == price) {
70 // Add to existing level
71 levels[i].size += size;
72 levels[i].num_orders++;
73 break;
74 }
75 }
76
77 // Track order for later updates
78 orders_[order_id] = {price, size, is_buy};
79 }
80
81 // Execute order (full or partial)
82 void execute_order(uint64_t order_id, uint64_t executed_size) {
83 auto it = orders_.find(order_id);
84 if (it == orders_.end()) return;
85
86 auto& order = it->second;
87 auto& levels = order.is_buy ? bids_ : asks_;
88
89 // Find and update level
90 for (size_t i = 0; i < MaxDepth; i++) {
91 if (levels[i].price == order.price) {
92 levels[i].size -= executed_size;
93
94 if (levels[i].size == 0 || executed_size >= order.size) {
95 levels[i].num_orders--;
96
97 // Remove level if no orders remain
98 if (levels[i].num_orders == 0) {
99 levels[i] = {};
100 sort_levels(order.is_buy);
101 }
102 }
103 break;
104 }
105 }
106
107 // Update or remove order tracking
108 order.size -= executed_size;
109 if (order.size == 0) {
110 orders_.erase(it);
111 }
112 }
113
114 // Cancel order
115 void cancel_order(uint64_t order_id, uint64_t cancelled_size) {
116 execute_order(order_id, cancelled_size);
117 }
118
119 // Get current snapshot
120 Snapshot get_snapshot(uint64_t timestamp_ns) const {
121 Snapshot snap;
122 snap.timestamp_ns = timestamp_ns;
123 snap.bids = bids_;
124 snap.asks = asks_;
125
126 // Count valid levels
127 snap.bid_count = std::count_if(
128 bids_.begin(), bids_.end(),
129 [](const Level& l) { return l.is_valid(); }
130 );
131 snap.ask_count = std::count_if(
132 asks_.begin(), asks_.end(),
133 [](const Level& l) { return l.is_valid(); }
134 );
135
136 return snap;
137 }
138
139 // Get best bid/ask
140 double best_bid() const {
141 return bids_[0].is_valid() ? bids_[0].price : 0.0;
142 }
143
144 double best_ask() const {
145 return asks_[0].is_valid() ? asks_[0].price : 0.0;
146 }
147
148private:
149 struct OrderInfo {
150 double price;
151 uint64_t size;
152 bool is_buy;
153 };
154
155 void sort_levels(bool is_buy) {
156 auto& levels = is_buy ? bids_ : asks_;
157
158 // Sort valid levels
159 std::sort(levels.begin(), levels.end(),
160 [is_buy](const Level& a, const Level& b) {
161 // Invalid levels go to end
162 if (!a.is_valid()) return false;
163 if (!b.is_valid()) return true;
164
165 // Bids: highest first, Asks: lowest first
166 return is_buy ? (a.price > b.price) : (a.price < b.price);
167 }
168 );
169 }
170
171 std::array<Level, MaxDepth> bids_;
172 std::array<Level, MaxDepth> asks_;
173 std::unordered_map<uint64_t, OrderInfo> orders_;
174};
1751// spsc_queue.hpp - Single Producer Single Consumer lock-free queue
2#pragma once
3
4#include <atomic>
5#include <array>
6#include <memory>
7
8template<typename T, size_t Size>
9class SPSCQueue {
10 static_assert((Size & (Size - 1)) == 0, "Size must be power of 2");
11
12public:
13 SPSCQueue() : head_(0), tail_(0) {}
14
15 // Producer: try to push (returns false if full)
16 bool try_push(const T& item) {
17 const size_t head = head_.load(std::memory_order_relaxed);
18 const size_t next_head = (head + 1) & (Size - 1);
19
20 if (next_head == tail_.load(std::memory_order_acquire)) {
21 return false; // Queue full
22 }
23
24 buffer_[head] = item;
25 head_.store(next_head, std::memory_order_release);
26 return true;
27 }
28
29 // Consumer: try to pop (returns false if empty)
30 bool try_pop(T& item) {
31 const size_t tail = tail_.load(std::memory_order_relaxed);
32
33 if (tail == head_.load(std::memory_order_acquire)) {
34 return false; // Queue empty
35 }
36
37 item = buffer_[tail];
38 tail_.store((tail + 1) & (Size - 1), std::memory_order_release);
39 return true;
40 }
41
42 bool empty() const {
43 return tail_.load(std::memory_order_acquire) ==
44 head_.load(std::memory_order_acquire);
45 }
46
47 size_t size() const {
48 const size_t head = head_.load(std::memory_order_acquire);
49 const size_t tail = tail_.load(std::memory_order_acquire);
50 return (head - tail) & (Size - 1);
51 }
52
53private:
54 alignas(64) std::atomic<size_t> head_; // Cache line aligned
55 alignas(64) std::atomic<size_t> tail_;
56 std::array<T, Size> buffer_;
57};
581// market_data_feed.hpp - Complete market data feed
2#pragma once
3
4#include "dpdk_receiver.hpp"
5#include "nasdaq_itch_parser.hpp"
6#include "order_book.hpp"
7#include "spsc_queue.hpp"
8#include <thread>
9#include <atomic>
10#include <chrono>
11
12class MarketDataFeed {
13public:
14 using OrderBookSnapshot = OrderBook<10>::Snapshot;
15
16 MarketDataFeed(uint16_t port_id, const std::string& symbol)
17 : receiver_(port_id)
18 , symbol_(symbol)
19 , running_(false)
20 {
21 // Pin threads to specific cores
22 cpu_set_t cpuset;
23 CPU_ZERO(&cpuset);
24 CPU_SET(1, &cpuset); // Core 1 for receiver thread
25 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
26 }
27
28 void start() {
29 running_.store(true, std::memory_order_release);
30
31 // Start receiver thread
32 receiver_thread_ = std::thread([this] {
33 // Pin to core 1
34 cpu_set_t cpuset;
35 CPU_ZERO(&cpuset);
36 CPU_SET(1, &cpuset);
37 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
38
39 receiver_loop();
40 });
41
42 // Start processing thread
43 processor_thread_ = std::thread([this] {
44 // Pin to core 2
45 cpu_set_t cpuset;
46 CPU_ZERO(&cpuset);
47 CPU_SET(2, &cpuset);
48 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
49
50 processing_loop();
51 });
52 }
53
54 void stop() {
55 running_.store(false, std::memory_order_release);
56
57 if (receiver_thread_.joinable()) {
58 receiver_thread_.join();
59 }
60 if (processor_thread_.joinable()) {
61 processor_thread_.join();
62 }
63 }
64
65 // Get latest order book snapshot
66 OrderBookSnapshot get_snapshot() const {
67 return order_book_.get_snapshot(
68 std::chrono::duration_cast<std::chrono::nanoseconds>(
69 std::chrono::system_clock::now().time_since_epoch()
70 ).count()
71 );
72 }
73
74 // Statistics
75 struct Stats {
76 uint64_t packets_received;
77 uint64_t messages_processed;
78 uint64_t order_book_updates;
79 double avg_latency_ns;
80 double p99_latency_ns;
81 };
82
83 Stats get_stats() const {
84 return {
85 .packets_received = receiver_.get_packet_count(),
86 .messages_processed = messages_processed_.load(),
87 .order_book_updates = book_updates_.load(),
88 .avg_latency_ns = calculate_avg_latency(),
89 .p99_latency_ns = calculate_p99_latency()
90 };
91 }
92
93private:
94 struct Message {
95 enum Type {
96 ADD_ORDER,
97 EXECUTE_ORDER,
98 CANCEL_ORDER
99 } type;
100
101 uint64_t order_id;
102 uint64_t timestamp_ns;
103 double price;
104 uint64_t size;
105 bool is_buy;
106 };
107
108 void receiver_loop() {
109 while (running_.load(std::memory_order_acquire)) {
110 // Receive burst of packets
111 receiver_.receive_burst([this](const uint8_t* data, uint16_t len) {
112 // Parse ITCH message
113 ITCHParser::parse_message(data, len, [this](const auto& msg) {
114 handle_itch_message(msg);
115 });
116 });
117 }
118 }
119
120 void processing_loop() {
121 Message msg;
122
123 while (running_.load(std::memory_order_acquire)) {
124 // Process messages from queue
125 while (message_queue_.try_pop(msg)) {
126 process_message(msg);
127 }
128
129 // Brief pause if queue empty
130 if (message_queue_.empty()) {
131 std::this_thread::yield();
132 }
133 }
134 }
135
136 void handle_itch_message(const AddOrderMessage& msg) {
137 Message m;
138 m.type = Message::ADD_ORDER;
139 m.order_id = msg.get_order_id();
140 m.timestamp_ns = msg.get_timestamp_ns();
141 m.price = msg.get_price();
142 m.size = msg.get_shares();
143 m.is_buy = msg.is_buy();
144
145 message_queue_.try_push(m);
146 }
147
148 void handle_itch_message(const OrderExecutedMessage& msg) {
149 Message m;
150 m.type = Message::EXECUTE_ORDER;
151 m.order_id = msg.get_order_id();
152 m.timestamp_ns = msg.get_timestamp_ns();
153 m.size = msg.get_shares();
154
155 message_queue_.try_push(m);
156 }
157
158 void handle_itch_message(const OrderCancelMessage& msg) {
159 Message m;
160 m.type = Message::CANCEL_ORDER;
161 m.order_id = msg.get_order_id();
162 m.timestamp_ns = msg.get_timestamp_ns();
163 m.size = msg.get_shares();
164
165 message_queue_.try_push(m);
166 }
167
168 void process_message(const Message& msg) {
169 auto start = rdtsc(); // Read CPU timestamp counter
170
171 switch (msg.type) {
172 case Message::ADD_ORDER:
173 order_book_.add_order(msg.order_id, msg.is_buy, msg.price, msg.size);
174 book_updates_.fetch_add(1, std::memory_order_relaxed);
175 break;
176
177 case Message::EXECUTE_ORDER:
178 order_book_.execute_order(msg.order_id, msg.size);
179 book_updates_.fetch_add(1, std::memory_order_relaxed);
180 break;
181
182 case Message::CANCEL_ORDER:
183 order_book_.cancel_order(msg.order_id, msg.size);
184 book_updates_.fetch_add(1, std::memory_order_relaxed);
185 break;
186 }
187
188 auto end = rdtsc();
189 record_latency(end - start);
190
191 messages_processed_.fetch_add(1, std::memory_order_relaxed);
192 }
193
194 // Read CPU timestamp counter for precise timing
195 static inline uint64_t rdtsc() {
196 uint32_t lo, hi;
197 __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
198 return ((uint64_t)hi << 32) | lo;
199 }
200
201 void record_latency(uint64_t cycles) {
202 // Record in circular buffer for statistics
203 size_t idx = latency_idx_.fetch_add(1, std::memory_order_relaxed) % latency_buffer_.size();
204 latency_buffer_[idx] = cycles;
205 }
206
207 double calculate_avg_latency() const {
208 // Convert cycles to nanoseconds (assuming 3GHz CPU)
209 constexpr double cycles_per_ns = 3.0;
210 uint64_t sum = 0;
211 for (auto cycles : latency_buffer_) {
212 sum += cycles;
213 }
214 return (sum / latency_buffer_.size()) / cycles_per_ns;
215 }
216
217 double calculate_p99_latency() const {
218 // Sort and get 99th percentile
219 auto sorted = latency_buffer_;
220 std::sort(sorted.begin(), sorted.end());
221
222 constexpr double cycles_per_ns = 3.0;
223 size_t p99_idx = sorted.size() * 99 / 100;
224 return sorted[p99_idx] / cycles_per_ns;
225 }
226
227 DPDKReceiver receiver_;
228 std::string symbol_;
229 OrderBook<10> order_book_;
230 SPSCQueue<Message, 65536> message_queue_;
231
232 std::thread receiver_thread_;
233 std::thread processor_thread_;
234 std::atomic<bool> running_;
235
236 std::atomic<uint64_t> messages_processed_{0};
237 std::atomic<uint64_t> book_updates_{0};
238
239 // Latency tracking
240 std::array<uint64_t, 10000> latency_buffer_{};
241 std::atomic<size_t> latency_idx_{0};
242};
2431// Align hot data to cache lines
2struct alignas(64) HotData {
3 double bid_price;
4 double ask_price;
5 uint64_t bid_size;
6 uint64_t ask_size;
7 // Padding to 64 bytes
8 char padding[64 - 4*8];
9};
10
11// Prefetch data before use
12__builtin_prefetch(&order_book_data, 0, 3);
131#include <immintrin.h>
2
3// Calculate weighted average of 8 prices at once
4__m256d prices = _mm256_load_pd(price_array);
5__m256d sizes = _mm256_load_pd(size_array);
6__m256d weighted = _mm256_mul_pd(prices, sizes);
7// Continue SIMD operations...
81// Likely/unlikely branch prediction hints
2if (__builtin_expect(is_market_order, 0)) {
3 // Unlikely path
4 handle_market_order();
5} else {
6 // Likely path (optimized)
7 handle_limit_order();
8}
9
10// Force inline hot functions
11__attribute__((always_inline))
12inline void update_best_bid(double price) {
13 // ...
14}
151// metrics.hpp - Performance monitoring
2class PerformanceMonitor {
3public:
4 struct Metrics {
5 uint64_t messages_per_second;
6 double avg_latency_us;
7 double p50_latency_us;
8 double p95_latency_us;
9 double p99_latency_us;
10 double p999_latency_us;
11 uint64_t queue_depth;
12 double cpu_utilization;
13 };
14
15 Metrics collect_metrics() {
16 auto now = std::chrono::steady_clock::now();
17 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
18 now - last_sample_
19 ).count();
20
21 if (elapsed == 0) elapsed = 1;
22
23 uint64_t msg_count = message_count_.exchange(0);
24
25 return {
26 .messages_per_second = msg_count / elapsed,
27 .avg_latency_us = calculate_average_latency(),
28 .p50_latency_us = calculate_percentile(0.50),
29 .p95_latency_us = calculate_percentile(0.95),
30 .p99_latency_us = calculate_percentile(0.99),
31 .p999_latency_us = calculate_percentile(0.999),
32 .queue_depth = get_queue_depth(),
33 .cpu_utilization = get_cpu_utilization()
34 };
35
36 last_sample_ = now;
37 }
38
39private:
40 std::atomic<uint64_t> message_count_{0};
41 std::chrono::steady_clock::time_point last_sample_;
42 // ... latency histogram, CPU stats, etc.
43};
44Building a high-frequency market data feed requires optimization at every layer: hardware, OS, network stack, and application code. The difference between a good feed and a great one is measured in microseconds, but those microseconds directly translate to trading performance.
Success comes from understanding the full stack and optimizing holistically, not just focusing on one layer.
Need help building ultra-low latency market data systems? Contact us to discuss your requirements.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.