NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 10, 2024
•
NordVarg Team
•

Concurrency and Parallelism in C++, Rust, OCaml, Python and TypeScript

Systems Programmingconcurrencyparallelismc++rustocamlpythontypescriptperformance
15 min read
Share:

Concurrency and parallelism are often conflated, but they're fundamentally different concepts. After implementing trading systems in all these languages, I've learned the strengths and pitfalls of each approach. This article compares how C++, Rust, OCaml, Python, and TypeScript handle concurrent and parallel execution.

Concurrency vs Parallelism#

Concurrency: Dealing with multiple things at once (structure) Parallelism: Doing multiple things at once (execution)

A concurrent system may run on a single core (time-slicing), while a parallel system requires multiple cores. Trading systems need both: concurrent handling of orders, market data, and risk checks, plus parallel processing of backtests and analytics.

C++: Full Control, Full Complexity#

C++ offers complete control over threading, synchronization, and memory ordering.

Threads and Synchronization#

cpp
1#include <thread>
2#include <mutex>
3#include <condition_variable>
4#include <queue>
5#include <atomic>
6#include <vector>
7
8// Thread-safe queue for work distribution
9template<typename T>
10class ConcurrentQueue {
11private:
12    std::queue<T> queue_;
13    mutable std::mutex mutex_;
14    std::condition_variable cv_;
15    std::atomic<bool> done_{false};
16
17public:
18    void push(T value) {
19        {
20            std::lock_guard<std::mutex> lock(mutex_);
21            queue_.push(std::move(value));
22        }
23        cv_.notify_one();
24    }
25
26    bool try_pop(T& value) {
27        std::unique_lock<std::mutex> lock(mutex_);
28        cv_.wait(lock, [this] { 
29            return !queue_.empty() || done_.load(); 
30        });
31
32        if (queue_.empty()) {
33            return false;
34        }
35
36        value = std::move(queue_.front());
37        queue_.pop();
38        return true;
39    }
40
41    void finish() {
42        done_.store(true);
43        cv_.notify_all();
44    }
45};
46
47// Worker pool for parallel task execution
48class ThreadPool {
49private:
50    std::vector<std::thread> workers_;
51    ConcurrentQueue<std::function<void()>> tasks_;
52    std::atomic<size_t> active_tasks_{0};
53
54public:
55    explicit ThreadPool(size_t num_threads) {
56        for (size_t i = 0; i < num_threads; ++i) {
57            workers_.emplace_back([this] {
58                std::function<void()> task;
59                while (tasks_.try_pop(task)) {
60                    active_tasks_++;
61                    task();
62                    active_tasks_--;
63                }
64            });
65        }
66    }
67
68    ~ThreadPool() {
69        tasks_.finish();
70        for (auto& worker : workers_) {
71            if (worker.joinable()) {
72                worker.join();
73            }
74        }
75    }
76
77    template<typename F>
78    void submit(F&& task) {
79        tasks_.push(std::forward<F>(task));
80    }
81
82    void wait_idle() {
83        while (active_tasks_.load(std::memory_order_acquire) > 0) {
84            std::this_thread::yield();
85        }
86    }
87};
88
89// Example: Parallel order book processing
90struct Order {
91    uint64_t id;
92    std::string symbol;
93    double price;
94    int quantity;
95};
96
97class OrderProcessor {
98private:
99    ThreadPool pool_;
100    std::unordered_map<std::string, std::vector<Order>> results_;
101    std::mutex results_mutex_;
102
103public:
104    explicit OrderProcessor(size_t num_threads) : pool_(num_threads) {}
105
106    void process_orders(const std::vector<Order>& orders) {
107        // Partition by symbol for parallel processing
108        std::unordered_map<std::string, std::vector<Order>> partitions;
109        
110        for (const auto& order : orders) {
111            partitions[order.symbol].push_back(order);
112        }
113
114        // Submit tasks for each symbol
115        for (const auto& [symbol, symbol_orders] : partitions) {
116            pool_.submit([this, symbol, symbol_orders] {
117                auto processed = process_symbol_orders(symbol_orders);
118                
119                std::lock_guard<std::mutex> lock(results_mutex_);
120                results_[symbol] = std::move(processed);
121            });
122        }
123
124        pool_.wait_idle();
125    }
126
127private:
128    std::vector<Order> process_symbol_orders(
129        const std::vector<Order>& orders) {
130        // Heavy processing per symbol
131        std::vector<Order> result;
132        for (const auto& order : orders) {
133            // Validate, enrich, route...
134            result.push_back(order);
135        }
136        return result;
137    }
138};
139

Async with C++20 Coroutines#

cpp
1#include <coroutine>
2#include <optional>
3#include <iostream>
4
5// Simple async task
6template<typename T>
7class Task {
8public:
9    struct promise_type {
10        T value_;
11        std::exception_ptr exception_;
12
13        Task get_return_object() {
14            return Task{
15                std::coroutine_handle<promise_type>::from_promise(*this)
16            };
17        }
18
19        std::suspend_never initial_suspend() { return {}; }
20        std::suspend_always final_suspend() noexcept { return {}; }
21
22        void return_value(T value) {
23            value_ = std::move(value);
24        }
25
26        void unhandled_exception() {
27            exception_ = std::current_exception();
28        }
29    };
30
31    std::coroutine_handle<promise_type> handle_;
32
33    explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
34
35    ~Task() {
36        if (handle_) handle_.destroy();
37    }
38
39    T get() {
40        if (handle_.promise().exception_) {
41            std::rethrow_exception(handle_.promise().exception_);
42        }
43        return handle_.promise().value_;
44    }
45};
46
47// Example: Async market data fetcher
48Task<double> fetch_price_async(const std::string& symbol) {
49    // Simulate async I/O
50    co_await std::suspend_always{};
51    
52    // In reality, would fetch from exchange API
53    double price = 100.0 + (symbol.length() % 10);
54    
55    co_return price;
56}
57
58Task<double> calculate_portfolio_value() {
59    double total = 0.0;
60    
61    total += co_await fetch_price_async("AAPL");
62    total += co_await fetch_price_async("GOOGL");
63    total += co_await fetch_price_async("MSFT");
64    
65    co_return total;
66}
67

Rust: Safety Without Compromise#

Rust's ownership system prevents data races at compile time.

Fearless Concurrency#

rust
1use std::sync::{Arc, Mutex};
2use std::thread;
3use std::sync::mpsc;
4use rayon::prelude::*;
5
6// Thread-safe order book
7#[derive(Debug, Clone)]
8struct Order {
9    id: u64,
10    symbol: String,
11    price: f64,
12    quantity: i32,
13}
14
15struct OrderBook {
16    orders: Arc<Mutex<Vec<Order>>>,
17}
18
19impl OrderBook {
20    fn new() -> Self {
21        OrderBook {
22            orders: Arc::new(Mutex::new(Vec::new())),
23        }
24    }
25
26    fn add_order(&self, order: Order) {
27        let mut orders = self.orders.lock().unwrap();
28        orders.push(order);
29    }
30
31    fn get_orders(&self) -> Vec<Order> {
32        let orders = self.orders.lock().unwrap();
33        orders.clone()
34    }
35}
36
37// Example: Multi-threaded order processing
38fn process_orders_concurrent(orders: Vec<Order>) -> Vec<Order> {
39    let order_book = OrderBook::new();
40    let mut handles = vec![];
41
42    // Chunk orders for parallel processing
43    let chunk_size = orders.len() / 4;
44    for chunk in orders.chunks(chunk_size) {
45        let book = order_book.clone();
46        let chunk = chunk.to_vec();
47        
48        let handle = thread::spawn(move || {
49            for order in chunk {
50                // Process order
51                book.add_order(order);
52            }
53        });
54        
55        handles.push(handle);
56    }
57
58    // Wait for all threads
59    for handle in handles {
60        handle.join().unwrap();
61    }
62
63    order_book.get_orders()
64}
65
66// Channel-based pipeline
67fn pipeline_example() {
68    let (tx1, rx1) = mpsc::channel();
69    let (tx2, rx2) = mpsc::channel();
70
71    // Stage 1: Data ingestion
72    thread::spawn(move || {
73        for i in 0..1000 {
74            tx1.send(Order {
75                id: i,
76                symbol: "AAPL".to_string(),
77                price: 150.0 + (i as f64 * 0.01),
78                quantity: 100,
79            }).unwrap();
80        }
81    });
82
83    // Stage 2: Validation
84    thread::spawn(move || {
85        while let Ok(order) = rx1.recv() {
86            if order.price > 0.0 && order.quantity > 0 {
87                tx2.send(order).unwrap();
88            }
89        }
90    });
91
92    // Stage 3: Execution
93    thread::spawn(move || {
94        while let Ok(order) = rx2.recv() {
95            println!("Executing order: {:?}", order.id);
96        }
97    });
98}
99

Async/Await with Tokio#

rust
1use tokio::time::{sleep, Duration};
2use tokio::task;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6// Async order book
7struct AsyncOrderBook {
8    orders: Arc<Mutex<Vec<Order>>>,
9}
10
11impl AsyncOrderBook {
12    fn new() -> Self {
13        AsyncOrderBook {
14            orders: Arc::new(Mutex::new(Vec::new())),
15        }
16    }
17
18    async fn add_order(&self, order: Order) {
19        let mut orders = self.orders.lock().await;
20        orders.push(order);
21    }
22
23    async fn get_count(&self) -> usize {
24        let orders = self.orders.lock().await;
25        orders.len()
26    }
27}
28
29// Async market data fetcher
30async fn fetch_market_data(symbol: &str) -> f64 {
31    // Simulate network delay
32    sleep(Duration::from_millis(100)).await;
33    
34    // Return mock price
35    100.0 + (symbol.len() as f64)
36}
37
38// Concurrent async tasks
39async fn process_multiple_symbols() {
40    let symbols = vec!["AAPL", "GOOGL", "MSFT", "AMZN"];
41    
42    // Launch all fetches concurrently
43    let mut handles = vec![];
44    
45    for symbol in symbols {
46        let handle = task::spawn(async move {
47            let price = fetch_market_data(symbol).await;
48            println!("{}: ${}", symbol, price);
49            price
50        });
51        handles.push(handle);
52    }
53
54    // Wait for all to complete
55    let mut total = 0.0;
56    for handle in handles {
57        total += handle.await.unwrap();
58    }
59    
60    println!("Total portfolio value: ${}", total);
61}
62
63// Parallel data processing with Rayon
64fn parallel_backtest(prices: Vec<f64>) -> Vec<f64> {
65    prices
66        .par_iter()
67        .map(|&price| {
68            // Simulate heavy computation
69            (0..1000).fold(price, |acc, _| acc * 1.0001)
70        })
71        .collect()
72}
73

Lock-Free Structures#

rust
1use std::sync::atomic::{AtomicU64, Ordering};
2use crossbeam::queue::ArrayQueue;
3
4// Lock-free counter for high-frequency updates
5struct LockFreeCounter {
6    count: AtomicU64,
7}
8
9impl LockFreeCounter {
10    fn new() -> Self {
11        LockFreeCounter {
12            count: AtomicU64::new(0),
13        }
14    }
15
16    fn increment(&self) -> u64 {
17        self.count.fetch_add(1, Ordering::Relaxed)
18    }
19
20    fn get(&self) -> u64 {
21        self.count.load(Ordering::Relaxed)
22    }
23}
24
25// Lock-free queue for order flow
26fn lockfree_order_queue_example() {
27    let queue = Arc::new(ArrayQueue::new(1024));
28    let queue_clone = queue.clone();
29
30    // Producer thread
31    let producer = thread::spawn(move || {
32        for i in 0..1000 {
33            let order = Order {
34                id: i,
35                symbol: "AAPL".to_string(),
36                price: 150.0,
37                quantity: 100,
38            };
39            
40            while queue_clone.push(order.clone()).is_err() {
41                thread::yield_now();
42            }
43        }
44    });
45
46    // Consumer thread
47    let consumer = thread::spawn(move || {
48        let mut count = 0;
49        while count < 1000 {
50            if let Some(order) = queue.pop() {
51                println!("Processed order {}", order.id);
52                count += 1;
53            } else {
54                thread::yield_now();
55            }
56        }
57    });
58
59    producer.join().unwrap();
60    consumer.join().unwrap();
61}
62

OCaml: Structured Concurrency#

OCaml 5.0 introduced multicore support with effect handlers and domains.

Domains for Parallelism#

ocaml
1(* Parallel map using domains *)
2let parallel_map f list num_domains =
3  let module D = Domain in
4  let len = List.length list in
5  let chunk_size = (len + num_domains - 1) / num_domains in
6  
7  let rec chunks acc lst n =
8    if n <= 0 || lst = [] then List.rev acc
9    else
10      let chunk, rest = List.split_n lst chunk_size in
11      chunks (chunk :: acc) rest (n - 1)
12  in
13  
14  let list_chunks = chunks [] list num_domains in
15  
16  let process_chunk chunk =
17    D.spawn (fun () -> List.map f chunk)
18  in
19  
20  let domains = List.map process_chunk list_chunks in
21  let results = List.map D.join domains in
22  List.concat results
23
24(* Example: Parallel order processing *)
25type order = {
26  id : int;
27  symbol : string;
28  price : float;
29  quantity : int;
30}
31
32let process_order order =
33  (* Heavy processing *)
34  { order with price = order.price *. 1.01 }
35
36let process_orders_parallel orders =
37  parallel_map process_order orders 4
38
39(* Concurrent order book *)
40module OrderBook = struct
41  type t = {
42    mutable orders : order list;
43    mutex : Mutex.t;
44  }
45
46  let create () = {
47    orders = [];
48    mutex = Mutex.create ();
49  }
50
51  let add_order book order =
52    Mutex.lock book.mutex;
53    book.orders <- order :: book.orders;
54    Mutex.unlock book.mutex
55
56  let get_orders book =
57    Mutex.lock book.mutex;
58    let orders = book.orders in
59    Mutex.unlock book.mutex;
60    orders
61end
62
63(* Effect handlers for async operations *)
64type _ Effect.t +=
65  | Fetch_price : string -> float Effect.t
66  | Sleep : float -> unit Effect.t
67
68let fetch_portfolio_value symbols =
69  let open Effect.Deep in
70  let rec loop acc = function
71    | [] -> acc
72    | symbol :: rest ->
73        let price = perform (Fetch_price symbol) in
74        loop (acc +. price) rest
75  in
76  loop 0.0 symbols
77
78(* Effect handler implementation *)
79let run_with_effects f =
80  let open Effect.Deep in
81  match_with f ()
82    { retc = (fun x -> x);
83      exnc = raise;
84      effc = (fun (type a) (eff : a Effect.t) ->
85        match eff with
86        | Fetch_price symbol ->
87            Some (fun (k : (a, _) continuation) ->
88              (* Simulate price fetch *)
89              let price = 100.0 +. float_of_int (String.length symbol) in
90              continue k price)
91        | Sleep duration ->
92            Some (fun (k : (a, _) continuation) ->
93              Unix.sleepf duration;
94              continue k ())
95        | _ -> None)
96    }
97

Lwt for Cooperative Concurrency#

ocaml
1open Lwt.Syntax
2
3(* Async market data fetcher *)
4let fetch_price symbol =
5  let* () = Lwt_unix.sleep 0.1 in
6  Lwt.return (100.0 +. float_of_int (String.length symbol))
7
8(* Concurrent operations *)
9let fetch_multiple_prices symbols =
10  let tasks = List.map fetch_price symbols in
11  Lwt.all tasks
12
13(* Example usage *)
14let calculate_portfolio_value () =
15  let* prices = fetch_multiple_prices ["AAPL"; "GOOGL"; "MSFT"] in
16  let total = List.fold_left (+.) 0.0 prices in
17  Lwt_io.printf "Total value: %.2f\n" total
18
19(* Lwt promise pipeline *)
20let order_pipeline () =
21  let* raw_order = fetch_order_from_exchange () in
22  let* validated_order = validate_order raw_order in
23  let* enriched_order = enrich_order validated_order in
24  let* () = send_to_execution enriched_order in
25  Lwt.return ()
26

Python: GIL Challenges#

Python's Global Interpreter Lock (GIL) limits true parallelism for CPU-bound tasks.

Threading (I/O Bound)#

python
1import threading
2import queue
3import time
4from typing import List, Dict
5from dataclasses import dataclass
6
7@dataclass
8class Order:
9    id: int
10    symbol: str
11    price: float
12    quantity: int
13
14class ThreadSafeOrderBook:
15    def __init__(self):
16        self.orders: List[Order] = []
17        self.lock = threading.Lock()
18    
19    def add_order(self, order: Order):
20        with self.lock:
21            self.orders.append(order)
22    
23    def get_orders(self) -> List[Order]:
24        with self.lock:
25            return self.orders.copy()
26
27# Thread pool for I/O operations
28class OrderProcessor:
29    def __init__(self, num_threads: int = 4):
30        self.num_threads = num_threads
31        self.task_queue = queue.Queue()
32        self.result_queue = queue.Queue()
33        self.workers = []
34    
35    def worker(self):
36        while True:
37            task = self.task_queue.get()
38            if task is None:
39                break
40            
41            # Process task (e.g., fetch from API)
42            result = self.process_order(task)
43            self.result_queue.put(result)
44            self.task_queue.task_done()
45    
46    def process_order(self, order: Order) -> Order:
47        # Simulate I/O operation
48        time.sleep(0.01)
49        return order
50    
51    def start(self):
52        for _ in range(self.num_threads):
53            t = threading.Thread(target=self.worker)
54            t.start()
55            self.workers.append(t)
56    
57    def submit(self, order: Order):
58        self.task_queue.put(order)
59    
60    def stop(self):
61        for _ in range(self.num_threads):
62            self.task_queue.put(None)
63        for worker in self.workers:
64            worker.join()
65

Multiprocessing (CPU Bound)#

python
1import multiprocessing as mp
2from multiprocessing import Pool, Manager
3import numpy as np
4
5def calculate_indicators(prices: np.ndarray) -> Dict[str, float]:
6    """CPU-intensive calculation"""
7    return {
8        'sma_20': np.mean(prices[-20:]),
9        'sma_50': np.mean(prices[-50:]),
10        'volatility': np.std(prices[-20:]),
11        'momentum': prices[-1] - prices[-20]
12    }
13
14def parallel_backtest(symbols: List[str], num_processes: int = 4):
15    """Parallel backtesting across symbols"""
16    with Pool(num_processes) as pool:
17        # Generate mock data for each symbol
18        data = [(symbol, np.random.randn(1000) * 10 + 100) 
19                for symbol in symbols]
20        
21        # Map calculation across processes
22        results = pool.starmap(process_symbol, data)
23        
24        return dict(zip(symbols, results))
25
26def process_symbol(symbol: str, prices: np.ndarray) -> Dict:
27    indicators = calculate_indicators(prices)
28    return {
29        'symbol': symbol,
30        'indicators': indicators
31    }
32
33# Shared memory for inter-process communication
34def shared_memory_example():
35    manager = Manager()
36    shared_dict = manager.dict()
37    shared_list = manager.list()
38    
39    def worker(worker_id, shared_dict, shared_list):
40        shared_dict[worker_id] = f"Worker {worker_id} data"
41        shared_list.append(worker_id)
42    
43    processes = []
44    for i in range(4):
45        p = mp.Process(target=worker, args=(i, shared_dict, shared_list))
46        p.start()
47        processes.append(p)
48    
49    for p in processes:
50        p.join()
51    
52    print(f"Shared dict: {dict(shared_dict)}")
53    print(f"Shared list: {list(shared_list)}")
54

AsyncIO (Concurrent I/O)#

python
1import asyncio
2import aiohttp
3from typing import List
4
5class AsyncMarketDataFetcher:
6    def __init__(self):
7        self.session = None
8    
9    async def __aenter__(self):
10        self.session = aiohttp.ClientSession()
11        return self
12    
13    async def __aexit__(self, exc_type, exc_val, exc_tb):
14        await self.session.close()
15    
16    async def fetch_price(self, symbol: str) -> float:
17        # Simulate API call
18        await asyncio.sleep(0.1)
19        return 100.0 + len(symbol)
20    
21    async def fetch_multiple_prices(self, symbols: List[str]) -> Dict[str, float]:
22        tasks = [self.fetch_price(symbol) for symbol in symbols]
23        prices = await asyncio.gather(*tasks)
24        return dict(zip(symbols, prices))
25
26# Async order processing pipeline
27async def order_pipeline():
28    queue = asyncio.Queue()
29    
30    async def producer():
31        for i in range(100):
32            order = Order(i, "AAPL", 150.0, 100)
33            await queue.put(order)
34        await queue.put(None)  # Sentinel
35    
36    async def consumer(consumer_id):
37        while True:
38            order = await queue.get()
39            if order is None:
40                queue.task_done()
41                break
42            
43            # Process order
44            await asyncio.sleep(0.01)
45            print(f"Consumer {consumer_id} processed order {order.id}")
46            queue.task_done()
47    
48    # Start pipeline
49    prod_task = asyncio.create_task(producer())
50    consumer_tasks = [asyncio.create_task(consumer(i)) for i in range(4)]
51    
52    await prod_task
53    await queue.join()
54    
55    # Signal consumers to stop
56    for _ in range(4):
57        await queue.put(None)
58    
59    await asyncio.gather(*consumer_tasks)
60
61# Run async code
62async def main():
63    async with AsyncMarketDataFetcher() as fetcher:
64        prices = await fetcher.fetch_multiple_prices(
65            ["AAPL", "GOOGL", "MSFT", "AMZN"]
66        )
67        print(f"Prices: {prices}")
68
69# asyncio.run(main())
70

TypeScript: Event Loop Concurrency#

TypeScript (Node.js) uses single-threaded event loop with async I/O.

Promises and Async/Await#

typescript
1interface Order {
2    id: number;
3    symbol: string;
4    price: number;
5    quantity: number;
6}
7
8class MarketDataFetcher {
9    async fetchPrice(symbol: string): Promise<number> {
10        // Simulate API call
11        await new Promise(resolve => setTimeout(resolve, 100));
12        return 100 + symbol.length;
13    }
14
15    async fetchMultiplePrices(symbols: string[]): Promise<Map<string, number>> {
16        const pricePromises = symbols.map(async symbol => {
17            const price = await this.fetchPrice(symbol);
18            return [symbol, price] as [string, number];
19        });
20
21        const results = await Promise.all(pricePromises);
22        return new Map(results);
23    }
24
25    // Parallel with concurrency limit
26    async fetchWithLimit(
27        symbols: string[], 
28        limit: number
29    ): Promise<Map<string, number>> {
30        const results = new Map<string, number>();
31        
32        for (let i = 0; i < symbols.length; i += limit) {
33            const batch = symbols.slice(i, i + limit);
34            const batchResults = await this.fetchMultiplePrices(batch);
35            
36            for (const [symbol, price] of batchResults) {
37                results.set(symbol, price);
38            }
39        }
40        
41        return results;
42    }
43}
44
45// Event-driven order processing
46class OrderProcessor {
47    private queue: Order[] = [];
48    private processing = false;
49
50    async submitOrder(order: Order): Promise<void> {
51        this.queue.push(order);
52        
53        if (!this.processing) {
54            await this.processQueue();
55        }
56    }
57
58    private async processQueue(): Promise<void> {
59        this.processing = true;
60
61        while (this.queue.length > 0) {
62            const order = this.queue.shift()!;
63            await this.processOrder(order);
64        }
65
66        this.processing = false;
67    }
68
69    private async processOrder(order: Order): Promise<void> {
70        // Simulate async processing
71        await new Promise(resolve => setTimeout(resolve, 10));
72        console.log(`Processed order ${order.id}`);
73    }
74}
75

Worker Threads for CPU-Bound Tasks#

typescript
1import { Worker } from 'worker_threads';
2
3interface WorkerMessage {
4    type: 'calculate';
5    data: number[];
6}
7
8class ParallelProcessor {
9    private workers: Worker[] = [];
10
11    constructor(numWorkers: number = 4) {
12        for (let i = 0; i < numWorkers; i++) {
13            const worker = new Worker('./worker.js');
14            this.workers.push(worker);
15        }
16    }
17
18    async processInParallel(data: number[][]): Promise<number[]> {
19        const chunkSize = Math.ceil(data.length / this.workers.length);
20        const promises: Promise<number>[] = [];
21
22        for (let i = 0; i < this.workers.length; i++) {
23            const chunk = data.slice(i * chunkSize, (i + 1) * chunkSize);
24            
25            if (chunk.length > 0) {
26                const promise = new Promise<number>((resolve, reject) => {
27                    const worker = this.workers[i];
28                    
29                    worker.once('message', (result) => {
30                        resolve(result);
31                    });
32                    
33                    worker.once('error', reject);
34                    
35                    worker.postMessage({
36                        type: 'calculate',
37                        data: chunk
38                    } as WorkerMessage);
39                });
40                
41                promises.push(promise);
42            }
43        }
44
45        return Promise.all(promises);
46    }
47
48    terminate() {
49        this.workers.forEach(worker => worker.terminate());
50    }
51}
52
53// worker.js content:
54/*
55const { parentPort } = require('worker_threads');
56
57parentPort.on('message', (message) => {
58    if (message.type === 'calculate') {
59        const result = heavyCalculation(message.data);
60        parentPort.postMessage(result);
61    }
62});
63
64function heavyCalculation(data) {
65    return data.reduce((sum, val) => sum + val, 0);
66}
67*/
68

Performance Comparison#

From our production trading systems:

Throughput (messages/second)#

plaintext
1Language     Threading    Async/Await   Parallelism
2────────────────────────────────────────────────────
3C++          2.1M         N/A           8.4M (4 cores)
4Rust         2.0M         1.8M          8.1M (4 cores)
5OCaml        1.2M         1.5M          4.8M (4 cores)
6Python       0.3M         0.8M          1.2M (4 cores)
7TypeScript   N/A          1.1M          0.9M (4 cores)
8

Latency (P99 microseconds)#

plaintext
1Language     Threading    Async/Await   
2──────────────────────────────────────
3C++          12           N/A          
4Rust         14           18           
5OCaml        28           22           
6Python       85           45           
7TypeScript   N/A          38           
8

Choosing the Right Model#

Use C++ when:

  • Maximum performance required
  • Full control over threading needed
  • Trading in ultra-low latency space

Use Rust when:

  • Safety + performance required
  • Building reliable concurrent systems
  • Modern codebase with strict requirements

Use OCaml when:

  • Structured concurrency preferred
  • Functional programming paradigm fits
  • Complex business logic with concurrency

Use Python when:

  • I/O bound workloads (use asyncio)
  • CPU bound (use multiprocessing)
  • Rapid prototyping with libraries

Use TypeScript when:

  • Event-driven architecture
  • Web services and APIs
  • High concurrency I/O operations

Lessons Learned#

After years of concurrent programming in trading:

  1. Async doesn't mean parallel: Async handles concurrency, not CPU parallelism
  2. Measure first: Profile before choosing threading/async/multiprocessing
  3. Lock-free is hard: Only use when absolutely necessary
  4. Python GIL is real: Use multiprocessing for CPU-bound Python
  5. Rust prevents races: Compile-time safety is worth the learning curve
  6. Event loops excel at I/O: Node.js handles 10K+ connections easily
  7. C++ gives control: But with great power comes segfaults

The best choice depends on your workload, team expertise, and performance requirements.

Further Reading#

  • C++ Concurrency in Action by Anthony Williams
  • The Rust Programming Language - Concurrency chapter
  • Real World OCaml - Concurrent Programming
  • Python Asyncio Documentation
  • Node.js Event Loop

Master concurrency patterns in your language of choice, but understand the fundamentals work across all languages.

NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

concurrencyparallelismc++rustocaml

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 13, 2024•12 min read
Atomic Operations in C++ and Rust: Building Lock-Free Data Structures
Systems Programmingatomicslock-free
Jan 21, 2025•15 min read
SIMD Optimization for Financial Calculations: AVX-512 in Production
Systems Programmingsimdavx-512
Jan 21, 2025•14 min read
Custom Memory Allocators for Trading Systems
Systems Programmingmemory-managementallocators

Interested in working together?