NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

October 15, 2024
•
NordVarg Engineering Team
•

Building a Real-Time Risk Engine with Event Sourcing

Designing and implementing a high-performance risk management system using event sourcing patterns, CQRS, and stream processing for real-time trade monitoring.

System DesignEvent SourcingCQRSRisk ManagementKafkaReal-time Systems
9 min read
Share:

Building a Real-Time Risk Engine with Event Sourcing

Risk management is critical in financial trading systems. A risk engine must track positions, calculate exposure, enforce limits, and prevent unauthorized trades—all in real-time. In this article, we'll explore how event sourcing and CQRS patterns enable building a robust, auditable risk management system.

Why Event Sourcing for Risk Management?#

Traditional state-based systems store only the current state of positions and limits. Event sourcing instead stores the complete history of events that led to that state. This is ideal for risk management because:

  1. Complete Audit Trail: Every trade, position change, and limit adjustment is recorded
  2. Temporal Queries: "What was this account's position at 2:30 PM yesterday?"
  3. Replay Capability: Reconstruct any historical state by replaying events
  4. Debugging: Understand exactly how a position or limit violation occurred
  5. Regulatory Compliance: Immutable event log satisfies audit requirements

System Architecture#

plaintext
1┌──────────────┐          ┌─────────────────┐          ┌──────────────┐
2│   Trading    │──events─>│  Kafka/Event    │──stream─>│ Risk Engine  │
3│   System     │          │     Store       │          │  (Processor) │
4└──────────────┘          └──────┬──────────┘          └──────┬───────┘
5                                 │                            │
6                                 │                            ▼
7                                 │                     ┌──────┴───────┐
8                                 │                     │  Read Models │
9                                 │                     │  (Redis/PG)  │
10                                 │                     └──────┬───────┘
11                                 │                            │
12                                 ▼                            ▼
13                          ┌──────┴──────────┐          ┌──────┴───────┐
14                          │  Event Store    │          │   Risk UI    │
15                          │  (PostgreSQL)   │          │  Dashboard   │
16                          └─────────────────┘          └──────────────┘
17

Event Design#

Event Schema#

Define events as immutable facts about what happened:

rust
1use serde::{Deserialize, Serialize};
2use chrono::{DateTime, Utc};
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5#[serde(tag = "type")]
6enum RiskEvent {
7    TradeExecuted {
8        trade_id: String,
9        account_id: String,
10        symbol: String,
11        side: TradeSide,
12        quantity: f64,
13        price: f64,
14        executed_at: DateTime<Utc>,
15    },
16    PositionUpdated {
17        account_id: String,
18        symbol: String,
19        previous_quantity: f64,
20        new_quantity: f64,
21        updated_at: DateTime<Utc>,
22    },
23    LimitSet {
24        account_id: String,
25        limit_type: LimitType,
26        value: f64,
27        set_by: String,
28        set_at: DateTime<Utc>,
29    },
30    LimitViolationDetected {
31        account_id: String,
32        limit_type: LimitType,
33        current_value: f64,
34        limit_value: f64,
35        detected_at: DateTime<Utc>,
36    },
37    AccountBlocked {
38        account_id: String,
39        reason: String,
40        blocked_by: String,
41        blocked_at: DateTime<Utc>,
42    },
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46enum TradeSide {
47    Buy,
48    Sell,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum LimitType {
53    MaxPositionSize,
54    MaxDailyVolume,
55    MaxDrawdown,
56    MaxLeverage,
57}
58

Event Envelope#

Wrap events with metadata:

rust
1#[derive(Debug, Clone, Serialize, Deserialize)]
2struct EventEnvelope {
3    event_id: String,         // UUID
4    event_type: String,       // "TradeExecuted", etc.
5    aggregate_id: String,     // account_id
6    aggregate_type: String,   // "Account"
7    sequence: i64,            // Monotonic sequence per aggregate
8    timestamp: DateTime<Utc>,
9    causation_id: Option<String>,  // ID of event that caused this
10    correlation_id: String,   // Request/session ID
11    metadata: serde_json::Value,
12    payload: RiskEvent,
13}
14

Event Store Implementation#

PostgreSQL Schema#

sql
1CREATE TABLE events (
2    event_id UUID PRIMARY KEY,
3    event_type VARCHAR(100) NOT NULL,
4    aggregate_id VARCHAR(100) NOT NULL,
5    aggregate_type VARCHAR(50) NOT NULL,
6    sequence BIGSERIAL NOT NULL,
7    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
8    causation_id UUID,
9    correlation_id VARCHAR(100) NOT NULL,
10    metadata JSONB NOT NULL DEFAULT '{}',
11    payload JSONB NOT NULL,
12    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
13);
14
15-- Indexes for efficient querying
16CREATE UNIQUE INDEX idx_events_aggregate_sequence 
17    ON events(aggregate_id, sequence);
18
19CREATE INDEX idx_events_type_timestamp 
20    ON events(event_type, timestamp DESC);
21
22CREATE INDEX idx_events_correlation 
23    ON events(correlation_id);
24
25-- GIN index for metadata queries
26CREATE INDEX idx_events_metadata ON events USING gin(metadata);
27

Append-Only Event Store#

rust
1use sqlx::PgPool;
2use uuid::Uuid;
3
4pub struct EventStore {
5    pool: PgPool,
6}
7
8impl EventStore {
9    pub async fn append(
10        &self,
11        aggregate_id: &str,
12        events: Vec<RiskEvent>,
13        expected_version: Option<i64>,
14    ) -> Result<Vec<EventEnvelope>, StoreError> {
15        let mut tx = self.pool.begin().await?;
16        
17        // Optimistic concurrency check
18        if let Some(expected) = expected_version {
19            let current = sqlx::query_scalar::<_, i64>(
20                "SELECT COALESCE(MAX(sequence), 0) FROM events 
21                 WHERE aggregate_id = $1"
22            )
23            .bind(aggregate_id)
24            .fetch_one(&mut *tx)
25            .await?;
26            
27            if current != expected {
28                return Err(StoreError::ConcurrencyConflict {
29                    expected,
30                    actual: current,
31                });
32            }
33        }
34        
35        // Append events
36        let mut envelopes = Vec::new();
37        
38        for event in events {
39            let envelope = EventEnvelope {
40                event_id: Uuid::new_v4().to_string(),
41                event_type: event.event_type(),
42                aggregate_id: aggregate_id.to_string(),
43                aggregate_type: "Account".to_string(),
44                sequence: 0, // Will be set by database
45                timestamp: Utc::now(),
46                causation_id: None,
47                correlation_id: Uuid::new_v4().to_string(),
48                metadata: serde_json::json!({}),
49                payload: event,
50            };
51            
52            let sequence = sqlx::query_scalar::<_, i64>(
53                "INSERT INTO events (
54                    event_id, event_type, aggregate_id, aggregate_type,
55                    timestamp, causation_id, correlation_id, metadata, payload
56                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
57                RETURNING sequence"
58            )
59            .bind(&envelope.event_id)
60            .bind(&envelope.event_type)
61            .bind(&envelope.aggregate_id)
62            .bind(&envelope.aggregate_type)
63            .bind(envelope.timestamp)
64            .bind(&envelope.causation_id)
65            .bind(&envelope.correlation_id)
66            .bind(&envelope.metadata)
67            .bind(serde_json::to_value(&envelope.payload)?)
68            .fetch_one(&mut *tx)
69            .await?;
70            
71            let mut final_envelope = envelope;
72            final_envelope.sequence = sequence;
73            envelopes.push(final_envelope);
74        }
75        
76        tx.commit().await?;
77        Ok(envelopes)
78    }
79    
80    pub async fn load_events(
81        &self,
82        aggregate_id: &str,
83        from_sequence: i64,
84    ) -> Result<Vec<EventEnvelope>, StoreError> {
85        let rows = sqlx::query_as::<_, EventRow>(
86            "SELECT * FROM events 
87             WHERE aggregate_id = $1 AND sequence >= $2
88             ORDER BY sequence ASC"
89        )
90        .bind(aggregate_id)
91        .bind(from_sequence)
92        .fetch_all(&self.pool)
93        .await?;
94        
95        rows.into_iter()
96            .map(|row| row.try_into())
97            .collect()
98    }
99}
100

CQRS: Projections for Queries#

Position Projection#

Maintain current positions as a read model:

rust
1use std::collections::HashMap;
2
3#[derive(Debug, Clone)]
4struct Position {
5    account_id: String,
6    symbol: String,
7    quantity: f64,
8    average_price: f64,
9    market_value: f64,
10    unrealized_pnl: f64,
11    last_updated: DateTime<Utc>,
12}
13
14struct PositionProjection {
15    positions: HashMap<(String, String), Position>,
16}
17
18impl PositionProjection {
19    fn apply(&mut self, event: &RiskEvent) {
20        match event {
21            RiskEvent::TradeExecuted {
22                account_id,
23                symbol,
24                side,
25                quantity,
26                price,
27                executed_at,
28            } => {
29                let key = (account_id.clone(), symbol.clone());
30                let position = self.positions.entry(key.clone())
31                    .or_insert_with(|| Position {
32                        account_id: account_id.clone(),
33                        symbol: symbol.clone(),
34                        quantity: 0.0,
35                        average_price: 0.0,
36                        market_value: 0.0,
37                        unrealized_pnl: 0.0,
38                        last_updated: *executed_at,
39                    });
40                
41                // Update position based on trade
42                let signed_quantity = match side {
43                    TradeSide::Buy => *quantity,
44                    TradeSide::Sell => -quantity,
45                };
46                
47                // Calculate new average price
48                let old_value = position.quantity * position.average_price;
49                let trade_value = signed_quantity * price;
50                let new_quantity = position.quantity + signed_quantity;
51                
52                if new_quantity != 0.0 {
53                    position.average_price = (old_value + trade_value) / new_quantity;
54                } else {
55                    position.average_price = 0.0;
56                }
57                
58                position.quantity = new_quantity;
59                position.last_updated = *executed_at;
60            }
61            RiskEvent::PositionUpdated { .. } => {
62                // Handle position adjustments (corporate actions, etc.)
63            }
64            _ => {}
65        }
66    }
67    
68    fn get_position(&self, account_id: &str, symbol: &str) -> Option<&Position> {
69        self.positions.get(&(account_id.to_string(), symbol.to_string()))
70    }
71}
72

Materialized View in PostgreSQL#

For complex queries, maintain materialized views:

sql
1CREATE MATERIALIZED VIEW account_positions AS
2SELECT 
3    account_id,
4    symbol,
5    SUM(CASE 
6        WHEN (payload->>'side')::text = 'Buy' THEN (payload->>'quantity')::numeric
7        ELSE -(payload->>'quantity')::numeric
8    END) as net_quantity,
9    AVG((payload->>'price')::numeric) as avg_price,
10    COUNT(*) as trade_count,
11    MAX(timestamp) as last_trade_at
12FROM events
13WHERE event_type = 'TradeExecuted'
14GROUP BY account_id, symbol
15HAVING SUM(CASE 
16    WHEN (payload->>'side')::text = 'Buy' THEN (payload->>'quantity')::numeric
17    ELSE -(payload->>'quantity')::numeric
18END) != 0;
19
20CREATE UNIQUE INDEX ON account_positions(account_id, symbol);
21
22-- Refresh periodically or on demand
23REFRESH MATERIALIZED VIEW CONCURRENTLY account_positions;
24

Real-Time Risk Checking#

Stream Processing with Kafka#

rust
1use rdkafka::consumer::{Consumer, StreamConsumer};
2use rdkafka::Message;
3
4pub struct RiskProcessor {
5    consumer: StreamConsumer,
6    position_projection: PositionProjection,
7    limits: HashMap<String, AccountLimits>,
8}
9
10#[derive(Debug, Clone)]
11struct AccountLimits {
12    max_position_size: f64,
13    max_daily_volume: f64,
14    max_leverage: f64,
15}
16
17impl RiskProcessor {
18    pub async fn process_events(&mut self) -> Result<(), ProcessorError> {
19        loop {
20            match self.consumer.recv().await {
21                Ok(message) => {
22                    let payload = message.payload()
23                        .ok_or(ProcessorError::EmptyMessage)?;
24                    
25                    let envelope: EventEnvelope = serde_json::from_slice(payload)?;
26                    
27                    // Apply to projection
28                    self.position_projection.apply(&envelope.payload);
29                    
30                    // Check risk limits
31                    if let Some(violations) = self.check_limits(&envelope) {
32                        self.handle_violations(violations).await?;
33                    }
34                    
35                    // Commit offset
36                    self.consumer.commit_message(&message, CommitMode::Async)?;
37                }
38                Err(e) => {
39                    eprintln!("Error receiving message: {}", e);
40                }
41            }
42        }
43    }
44    
45    fn check_limits(&self, envelope: &EventEnvelope) -> Option<Vec<LimitViolation>> {
46        match &envelope.payload {
47            RiskEvent::TradeExecuted { account_id, symbol, .. } => {
48                let mut violations = Vec::new();
49                
50                // Check position size limit
51                if let Some(position) = self.position_projection
52                    .get_position(account_id, symbol) 
53                {
54                    if let Some(limits) = self.limits.get(account_id) {
55                        if position.quantity.abs() > limits.max_position_size {
56                            violations.push(LimitViolation {
57                                account_id: account_id.clone(),
58                                limit_type: LimitType::MaxPositionSize,
59                                current_value: position.quantity.abs(),
60                                limit_value: limits.max_position_size,
61                            });
62                        }
63                    }
64                }
65                
66                if violations.is_empty() {
67                    None
68                } else {
69                    Some(violations)
70                }
71            }
72            _ => None,
73        }
74    }
75    
76    async fn handle_violations(
77        &self,
78        violations: Vec<LimitViolation>,
79    ) -> Result<(), ProcessorError> {
80        for violation in violations {
81            // Emit violation event
82            let event = RiskEvent::LimitViolationDetected {
83                account_id: violation.account_id.clone(),
84                limit_type: violation.limit_type,
85                current_value: violation.current_value,
86                limit_value: violation.limit_value,
87                detected_at: Utc::now(),
88            };
89            
90            // Publish to alert topic
91            self.publish_alert(&violation).await?;
92            
93            // Potentially block account
94            if violation.is_critical() {
95                self.block_account(&violation.account_id).await?;
96            }
97        }
98        
99        Ok(())
100    }
101}
102

Temporal Queries#

One of event sourcing's superpowers: time travel:

rust
1impl EventStore {
2    pub async fn rebuild_state_at(
3        &self,
4        aggregate_id: &str,
5        point_in_time: DateTime<Utc>,
6    ) -> Result<PositionProjection, StoreError> {
7        // Load events up to the point in time
8        let events = sqlx::query_as::<_, EventRow>(
9            "SELECT * FROM events 
10             WHERE aggregate_id = $1 AND timestamp <= $2
11             ORDER BY sequence ASC"
12        )
13        .bind(aggregate_id)
14        .bind(point_in_time)
15        .fetch_all(&self.pool)
16        .await?;
17        
18        // Rebuild projection
19        let mut projection = PositionProjection::new();
20        for row in events {
21            let envelope: EventEnvelope = row.try_into()?;
22            projection.apply(&envelope.payload);
23        }
24        
25        Ok(projection)
26    }
27}
28

This enables powerful queries:

  • "Show me all accounts that had limit violations yesterday"
  • "What was the total risk exposure at market close?"
  • "Replay this trade sequence to debug the violation"

Performance Optimizations#

Snapshots#

For long event streams, create periodic snapshots:

sql
1CREATE TABLE position_snapshots (
2    snapshot_id UUID PRIMARY KEY,
3    account_id VARCHAR(100) NOT NULL,
4    sequence BIGINT NOT NULL,
5    snapshot_at TIMESTAMPTZ NOT NULL,
6    state JSONB NOT NULL,
7    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
8);
9
10CREATE INDEX idx_snapshots_account ON position_snapshots(account_id, sequence DESC);
11
rust
1impl EventStore {
2    pub async fn load_from_snapshot(
3        &self,
4        account_id: &str,
5    ) -> Result<(PositionProjection, i64), StoreError> {
6        // Load latest snapshot
7        let snapshot = sqlx::query_as::<_, SnapshotRow>(
8            "SELECT * FROM position_snapshots 
9             WHERE account_id = $1 
10             ORDER BY sequence DESC 
11             LIMIT 1"
12        )
13        .bind(account_id)
14        .fetch_optional(&self.pool)
15        .await?;
16        
17        let (mut projection, from_sequence) = if let Some(snap) = snapshot {
18            (
19                serde_json::from_value(snap.state)?,
20                snap.sequence + 1,
21            )
22        } else {
23            (PositionProjection::new(), 0)
24        };
25        
26        // Replay events since snapshot
27        let events = self.load_events(account_id, from_sequence).await?;
28        for event in events {
29            projection.apply(&event.payload);
30        }
31        
32        Ok((projection, from_sequence))
33    }
34}
35

Production Metrics#

Our event-sourced risk engine handles:

  • 150K events/second sustained throughput
  • < 50ms p99 latency from trade to risk check
  • 100TB+ event store (3 years of data)
  • Zero data loss with Kafka replication
  • Complete audit trail for regulatory compliance

Conclusion#

Event sourcing and CQRS provide an excellent foundation for building risk management systems:

Benefits:

  • Complete audit trail for compliance
  • Temporal queries for analysis
  • Easy to add new projections/views
  • Natural fit for event-driven architectures
  • Debugging becomes easier with full history

Challenges:

  • Event schema evolution requires care
  • Eventual consistency in read models
  • Storage grows over time (mitigated with snapshots)
  • Need to handle idempotency carefully

For financial systems where auditability and correctness are paramount, these tradeoffs are well worth it.

Further Reading#

  • Event Sourcing by Martin Fowler
  • CQRS Journey by Microsoft
  • Kafka Streams Documentation
  • Building Event-Driven Microservices
NET

NordVarg Engineering Team

Technical Writer

NordVarg Engineering Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

Event SourcingCQRSRisk ManagementKafkaReal-time Systems

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Oct 28, 2024•10 min read
AI-Powered Risk Management: Real-time Portfolio Risk Monitoring
Building intelligent risk management systems that combine ML with traditional risk models for real-time portfolio protection
Machine LearningAIRisk Management
Sep 28, 2024•5 min read
Event Sourcing in Financial Systems: Patterns and Practices
How event sourcing provides auditability, temporal queries, and debugging superpowers in financial applications
ArchitectureEvent SourcingCQRS
Nov 27, 2025•8 min read
Event Sourcing the Risk Engine: The Regulatory Audit That Saved $50M
Backend Engineeringevent-sourcingrisk-engine

Interested in working together?