Building a Real-Time Risk Engine with Event Sourcing
Designing and implementing a high-performance risk management system using event sourcing patterns, CQRS, and stream processing for real-time trade monitoring.
Designing and implementing a high-performance risk management system using event sourcing patterns, CQRS, and stream processing for real-time trade monitoring.
Risk management is critical in financial trading systems. A risk engine must track positions, calculate exposure, enforce limits, and prevent unauthorized trades—all in real-time. In this article, we'll explore how event sourcing and CQRS patterns enable building a robust, auditable risk management system.
Traditional state-based systems store only the current state of positions and limits. Event sourcing instead stores the complete history of events that led to that state. This is ideal for risk management because:
1┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
2│ Trading │──events─>│ Kafka/Event │──stream─>│ Risk Engine │
3│ System │ │ Store │ │ (Processor) │
4└──────────────┘ └──────┬──────────┘ └──────┬───────┘
5 │ │
6 │ ▼
7 │ ┌──────┴───────┐
8 │ │ Read Models │
9 │ │ (Redis/PG) │
10 │ └──────┬───────┘
11 │ │
12 ▼ ▼
13 ┌──────┴──────────┐ ┌──────┴───────┐
14 │ Event Store │ │ Risk UI │
15 │ (PostgreSQL) │ │ Dashboard │
16 └─────────────────┘ └──────────────┘
17Define events as immutable facts about what happened:
1use serde::{Deserialize, Serialize};
2use chrono::{DateTime, Utc};
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5#[serde(tag = "type")]
6enum RiskEvent {
7 TradeExecuted {
8 trade_id: String,
9 account_id: String,
10 symbol: String,
11 side: TradeSide,
12 quantity: f64,
13 price: f64,
14 executed_at: DateTime<Utc>,
15 },
16 PositionUpdated {
17 account_id: String,
18 symbol: String,
19 previous_quantity: f64,
20 new_quantity: f64,
21 updated_at: DateTime<Utc>,
22 },
23 LimitSet {
24 account_id: String,
25 limit_type: LimitType,
26 value: f64,
27 set_by: String,
28 set_at: DateTime<Utc>,
29 },
30 LimitViolationDetected {
31 account_id: String,
32 limit_type: LimitType,
33 current_value: f64,
34 limit_value: f64,
35 detected_at: DateTime<Utc>,
36 },
37 AccountBlocked {
38 account_id: String,
39 reason: String,
40 blocked_by: String,
41 blocked_at: DateTime<Utc>,
42 },
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46enum TradeSide {
47 Buy,
48 Sell,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52enum LimitType {
53 MaxPositionSize,
54 MaxDailyVolume,
55 MaxDrawdown,
56 MaxLeverage,
57}
58Wrap events with metadata:
1#[derive(Debug, Clone, Serialize, Deserialize)]
2struct EventEnvelope {
3 event_id: String, // UUID
4 event_type: String, // "TradeExecuted", etc.
5 aggregate_id: String, // account_id
6 aggregate_type: String, // "Account"
7 sequence: i64, // Monotonic sequence per aggregate
8 timestamp: DateTime<Utc>,
9 causation_id: Option<String>, // ID of event that caused this
10 correlation_id: String, // Request/session ID
11 metadata: serde_json::Value,
12 payload: RiskEvent,
13}
141CREATE TABLE events (
2 event_id UUID PRIMARY KEY,
3 event_type VARCHAR(100) NOT NULL,
4 aggregate_id VARCHAR(100) NOT NULL,
5 aggregate_type VARCHAR(50) NOT NULL,
6 sequence BIGSERIAL NOT NULL,
7 timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
8 causation_id UUID,
9 correlation_id VARCHAR(100) NOT NULL,
10 metadata JSONB NOT NULL DEFAULT '{}',
11 payload JSONB NOT NULL,
12 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
13);
14
15-- Indexes for efficient querying
16CREATE UNIQUE INDEX idx_events_aggregate_sequence
17 ON events(aggregate_id, sequence);
18
19CREATE INDEX idx_events_type_timestamp
20 ON events(event_type, timestamp DESC);
21
22CREATE INDEX idx_events_correlation
23 ON events(correlation_id);
24
25-- GIN index for metadata queries
26CREATE INDEX idx_events_metadata ON events USING gin(metadata);
271use sqlx::PgPool;
2use uuid::Uuid;
3
4pub struct EventStore {
5 pool: PgPool,
6}
7
8impl EventStore {
9 pub async fn append(
10 &self,
11 aggregate_id: &str,
12 events: Vec<RiskEvent>,
13 expected_version: Option<i64>,
14 ) -> Result<Vec<EventEnvelope>, StoreError> {
15 let mut tx = self.pool.begin().await?;
16
17 // Optimistic concurrency check
18 if let Some(expected) = expected_version {
19 let current = sqlx::query_scalar::<_, i64>(
20 "SELECT COALESCE(MAX(sequence), 0) FROM events
21 WHERE aggregate_id = $1"
22 )
23 .bind(aggregate_id)
24 .fetch_one(&mut *tx)
25 .await?;
26
27 if current != expected {
28 return Err(StoreError::ConcurrencyConflict {
29 expected,
30 actual: current,
31 });
32 }
33 }
34
35 // Append events
36 let mut envelopes = Vec::new();
37
38 for event in events {
39 let envelope = EventEnvelope {
40 event_id: Uuid::new_v4().to_string(),
41 event_type: event.event_type(),
42 aggregate_id: aggregate_id.to_string(),
43 aggregate_type: "Account".to_string(),
44 sequence: 0, // Will be set by database
45 timestamp: Utc::now(),
46 causation_id: None,
47 correlation_id: Uuid::new_v4().to_string(),
48 metadata: serde_json::json!({}),
49 payload: event,
50 };
51
52 let sequence = sqlx::query_scalar::<_, i64>(
53 "INSERT INTO events (
54 event_id, event_type, aggregate_id, aggregate_type,
55 timestamp, causation_id, correlation_id, metadata, payload
56 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
57 RETURNING sequence"
58 )
59 .bind(&envelope.event_id)
60 .bind(&envelope.event_type)
61 .bind(&envelope.aggregate_id)
62 .bind(&envelope.aggregate_type)
63 .bind(envelope.timestamp)
64 .bind(&envelope.causation_id)
65 .bind(&envelope.correlation_id)
66 .bind(&envelope.metadata)
67 .bind(serde_json::to_value(&envelope.payload)?)
68 .fetch_one(&mut *tx)
69 .await?;
70
71 let mut final_envelope = envelope;
72 final_envelope.sequence = sequence;
73 envelopes.push(final_envelope);
74 }
75
76 tx.commit().await?;
77 Ok(envelopes)
78 }
79
80 pub async fn load_events(
81 &self,
82 aggregate_id: &str,
83 from_sequence: i64,
84 ) -> Result<Vec<EventEnvelope>, StoreError> {
85 let rows = sqlx::query_as::<_, EventRow>(
86 "SELECT * FROM events
87 WHERE aggregate_id = $1 AND sequence >= $2
88 ORDER BY sequence ASC"
89 )
90 .bind(aggregate_id)
91 .bind(from_sequence)
92 .fetch_all(&self.pool)
93 .await?;
94
95 rows.into_iter()
96 .map(|row| row.try_into())
97 .collect()
98 }
99}
100Maintain current positions as a read model:
1use std::collections::HashMap;
2
3#[derive(Debug, Clone)]
4struct Position {
5 account_id: String,
6 symbol: String,
7 quantity: f64,
8 average_price: f64,
9 market_value: f64,
10 unrealized_pnl: f64,
11 last_updated: DateTime<Utc>,
12}
13
14struct PositionProjection {
15 positions: HashMap<(String, String), Position>,
16}
17
18impl PositionProjection {
19 fn apply(&mut self, event: &RiskEvent) {
20 match event {
21 RiskEvent::TradeExecuted {
22 account_id,
23 symbol,
24 side,
25 quantity,
26 price,
27 executed_at,
28 } => {
29 let key = (account_id.clone(), symbol.clone());
30 let position = self.positions.entry(key.clone())
31 .or_insert_with(|| Position {
32 account_id: account_id.clone(),
33 symbol: symbol.clone(),
34 quantity: 0.0,
35 average_price: 0.0,
36 market_value: 0.0,
37 unrealized_pnl: 0.0,
38 last_updated: *executed_at,
39 });
40
41 // Update position based on trade
42 let signed_quantity = match side {
43 TradeSide::Buy => *quantity,
44 TradeSide::Sell => -quantity,
45 };
46
47 // Calculate new average price
48 let old_value = position.quantity * position.average_price;
49 let trade_value = signed_quantity * price;
50 let new_quantity = position.quantity + signed_quantity;
51
52 if new_quantity != 0.0 {
53 position.average_price = (old_value + trade_value) / new_quantity;
54 } else {
55 position.average_price = 0.0;
56 }
57
58 position.quantity = new_quantity;
59 position.last_updated = *executed_at;
60 }
61 RiskEvent::PositionUpdated { .. } => {
62 // Handle position adjustments (corporate actions, etc.)
63 }
64 _ => {}
65 }
66 }
67
68 fn get_position(&self, account_id: &str, symbol: &str) -> Option<&Position> {
69 self.positions.get(&(account_id.to_string(), symbol.to_string()))
70 }
71}
72For complex queries, maintain materialized views:
1CREATE MATERIALIZED VIEW account_positions AS
2SELECT
3 account_id,
4 symbol,
5 SUM(CASE
6 WHEN (payload->>'side')::text = 'Buy' THEN (payload->>'quantity')::numeric
7 ELSE -(payload->>'quantity')::numeric
8 END) as net_quantity,
9 AVG((payload->>'price')::numeric) as avg_price,
10 COUNT(*) as trade_count,
11 MAX(timestamp) as last_trade_at
12FROM events
13WHERE event_type = 'TradeExecuted'
14GROUP BY account_id, symbol
15HAVING SUM(CASE
16 WHEN (payload->>'side')::text = 'Buy' THEN (payload->>'quantity')::numeric
17 ELSE -(payload->>'quantity')::numeric
18END) != 0;
19
20CREATE UNIQUE INDEX ON account_positions(account_id, symbol);
21
22-- Refresh periodically or on demand
23REFRESH MATERIALIZED VIEW CONCURRENTLY account_positions;
241use rdkafka::consumer::{Consumer, StreamConsumer};
2use rdkafka::Message;
3
4pub struct RiskProcessor {
5 consumer: StreamConsumer,
6 position_projection: PositionProjection,
7 limits: HashMap<String, AccountLimits>,
8}
9
10#[derive(Debug, Clone)]
11struct AccountLimits {
12 max_position_size: f64,
13 max_daily_volume: f64,
14 max_leverage: f64,
15}
16
17impl RiskProcessor {
18 pub async fn process_events(&mut self) -> Result<(), ProcessorError> {
19 loop {
20 match self.consumer.recv().await {
21 Ok(message) => {
22 let payload = message.payload()
23 .ok_or(ProcessorError::EmptyMessage)?;
24
25 let envelope: EventEnvelope = serde_json::from_slice(payload)?;
26
27 // Apply to projection
28 self.position_projection.apply(&envelope.payload);
29
30 // Check risk limits
31 if let Some(violations) = self.check_limits(&envelope) {
32 self.handle_violations(violations).await?;
33 }
34
35 // Commit offset
36 self.consumer.commit_message(&message, CommitMode::Async)?;
37 }
38 Err(e) => {
39 eprintln!("Error receiving message: {}", e);
40 }
41 }
42 }
43 }
44
45 fn check_limits(&self, envelope: &EventEnvelope) -> Option<Vec<LimitViolation>> {
46 match &envelope.payload {
47 RiskEvent::TradeExecuted { account_id, symbol, .. } => {
48 let mut violations = Vec::new();
49
50 // Check position size limit
51 if let Some(position) = self.position_projection
52 .get_position(account_id, symbol)
53 {
54 if let Some(limits) = self.limits.get(account_id) {
55 if position.quantity.abs() > limits.max_position_size {
56 violations.push(LimitViolation {
57 account_id: account_id.clone(),
58 limit_type: LimitType::MaxPositionSize,
59 current_value: position.quantity.abs(),
60 limit_value: limits.max_position_size,
61 });
62 }
63 }
64 }
65
66 if violations.is_empty() {
67 None
68 } else {
69 Some(violations)
70 }
71 }
72 _ => None,
73 }
74 }
75
76 async fn handle_violations(
77 &self,
78 violations: Vec<LimitViolation>,
79 ) -> Result<(), ProcessorError> {
80 for violation in violations {
81 // Emit violation event
82 let event = RiskEvent::LimitViolationDetected {
83 account_id: violation.account_id.clone(),
84 limit_type: violation.limit_type,
85 current_value: violation.current_value,
86 limit_value: violation.limit_value,
87 detected_at: Utc::now(),
88 };
89
90 // Publish to alert topic
91 self.publish_alert(&violation).await?;
92
93 // Potentially block account
94 if violation.is_critical() {
95 self.block_account(&violation.account_id).await?;
96 }
97 }
98
99 Ok(())
100 }
101}
102One of event sourcing's superpowers: time travel:
1impl EventStore {
2 pub async fn rebuild_state_at(
3 &self,
4 aggregate_id: &str,
5 point_in_time: DateTime<Utc>,
6 ) -> Result<PositionProjection, StoreError> {
7 // Load events up to the point in time
8 let events = sqlx::query_as::<_, EventRow>(
9 "SELECT * FROM events
10 WHERE aggregate_id = $1 AND timestamp <= $2
11 ORDER BY sequence ASC"
12 )
13 .bind(aggregate_id)
14 .bind(point_in_time)
15 .fetch_all(&self.pool)
16 .await?;
17
18 // Rebuild projection
19 let mut projection = PositionProjection::new();
20 for row in events {
21 let envelope: EventEnvelope = row.try_into()?;
22 projection.apply(&envelope.payload);
23 }
24
25 Ok(projection)
26 }
27}
28This enables powerful queries:
For long event streams, create periodic snapshots:
1CREATE TABLE position_snapshots (
2 snapshot_id UUID PRIMARY KEY,
3 account_id VARCHAR(100) NOT NULL,
4 sequence BIGINT NOT NULL,
5 snapshot_at TIMESTAMPTZ NOT NULL,
6 state JSONB NOT NULL,
7 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
8);
9
10CREATE INDEX idx_snapshots_account ON position_snapshots(account_id, sequence DESC);
111impl EventStore {
2 pub async fn load_from_snapshot(
3 &self,
4 account_id: &str,
5 ) -> Result<(PositionProjection, i64), StoreError> {
6 // Load latest snapshot
7 let snapshot = sqlx::query_as::<_, SnapshotRow>(
8 "SELECT * FROM position_snapshots
9 WHERE account_id = $1
10 ORDER BY sequence DESC
11 LIMIT 1"
12 )
13 .bind(account_id)
14 .fetch_optional(&self.pool)
15 .await?;
16
17 let (mut projection, from_sequence) = if let Some(snap) = snapshot {
18 (
19 serde_json::from_value(snap.state)?,
20 snap.sequence + 1,
21 )
22 } else {
23 (PositionProjection::new(), 0)
24 };
25
26 // Replay events since snapshot
27 let events = self.load_events(account_id, from_sequence).await?;
28 for event in events {
29 projection.apply(&event.payload);
30 }
31
32 Ok((projection, from_sequence))
33 }
34}
35Our event-sourced risk engine handles:
Event sourcing and CQRS provide an excellent foundation for building risk management systems:
Benefits:
Challenges:
For financial systems where auditability and correctness are paramount, these tradeoffs are well worth it.
Technical Writer
NordVarg Engineering Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.