In 2020, a prop trading firm faced a regulatory audit investigating suspicious trading activity. The regulator demanded complete reconstruction of all risk decisions for a specific trading day—every order validated, every limit checked, every rejection reason.
The firm's traditional database couldn't provide this. They had current state (today's positions, limits) but no history of how they got there. Risk checks were calculated, decisions made, then overwritten. The audit trail was incomplete.
The firm faced a $50M fine for inadequate record-keeping. Their lawyers argued the data didn't exist. The regulator didn't care—inadequate systems are the firm's problem, not the regulator's.
A competitor in the same audit had event-sourced their risk engine. They reconstructed the entire day's risk decisions in 2 hours, showing every calculation, every decision, every timestamp. The regulator was satisfied. No fine.
This incident catalyzed industry-wide adoption of event sourcing for risk engines. Today, it's standard practice. This article covers why event sourcing is mandatory for financial risk systems, how to implement it, and the operational challenges.
Traditional databases store current state. Event sourcing stores every state change as an immutable event.
Traditional approach:
Position: {symbol: "AAPL", quantity: 1000, avg_price: 150.00}
You know current position but not how you got there.
Event sourcing approach:
Event 1: OrderFilled {symbol: "AAPL", quantity: 500, price: 149.50}
Event 2: OrderFilled {symbol: "AAPL", quantity: 500, price: 150.50}
Current Position = sum(events) = 1000 shares @ \$150.00 avg
You have complete history of every trade.
1. Complete audit trail Every risk decision is an immutable event. Regulators can reconstruct any point in time.
2. Replayability Test new risk models against historical data without rebuilding databases.
3. Debugging When a bug causes bad risk decisions, replay events to reproduce the exact conditions.
4. Schema evolution Add new fields to events without migrating old data. Old events remain unchanged.
Command: Intent from upstream (e.g., "Validate this order") Event: Immutable fact (e.g., "Order validated" or "Risk limit exceeded") Aggregate: Domain object that applies events to rebuild state Snapshot: Periodic state dump to avoid replaying millions of events CQRS: Separate write model (event store) from read model (materialized view)
1use serde::{Serialize, Deserialize};
2use uuid::Uuid;
3
4#[derive(Debug, Serialize, Deserialize, Clone)]
5pub enum RiskEvent {
6 OrderValidated {
7 event_id: Uuid,
8 order_id: Uuid,
9 user_id: Uuid,
10 symbol: String,
11 quantity: f64,
12 price: f64,
13 timestamp_ns: i64,
14 risk_checks_passed: Vec<String>,
15 },
16
17 RiskLimitExceeded {
18 event_id: Uuid,
19 order_id: Uuid,
20 limit_type: String, // "position", "notional", "concentration"
21 current_value: f64,
22 limit_value: f64,
23 timestamp_ns: i64,
24 },
25
26 PositionUpdated {
27 event_id: Uuid,
28 symbol: String,
29 quantity_delta: f64,
30 new_total_quantity: f64,
31 timestamp_ns: i64,
32 },
33}
34
35// Aggregate: Risk State
36#[derive(Default, Clone)]
37pub struct RiskState {
38 pub positions: std::collections::HashMap<String, f64>,
39 pub notional_exposure: f64,
40 pub last_update_ns: i64,
41}
42
43impl RiskState {
44 pub fn apply(&mut self, event: &RiskEvent) {
45 match event {
46 RiskEvent::OrderValidated { symbol, quantity, price, timestamp_ns, .. } => {
47 let delta = quantity * price;
48 *self.positions.entry(symbol.clone()).or_default() += quantity;
49 self.notional_exposure += delta;
50 self.last_update_ns = *timestamp_ns;
51 },
52
53 RiskEvent::PositionUpdated { symbol, quantity_delta, timestamp_ns, .. } => {
54 *self.positions.entry(symbol.clone()).or_default() += quantity_delta;
55 self.last_update_ns = *timestamp_ns;
56 },
57
58 RiskEvent::RiskLimitExceeded { .. } => {
59 // Audit event only, no state change
60 },
61 }
62 }
63
64 pub fn rebuild_from_events(events: &[RiskEvent]) -> Self {
65 let mut state = RiskState::default();
66 for event in events {
67 state.apply(event);
68 }
69 state
70 }
71}
72Key design decisions:
Commands can be retried (network failures, timeouts). Handlers must be idempotent—processing the same command twice produces the same result.
1use redis::AsyncCommands;
2
3pub async fn handle_validate_order(
4 cmd: ValidateOrderCommand,
5 kafka_producer: &KafkaProducer,
6 redis_client: &mut redis::aio::Connection,
7) -> Result<(), HandlerError> {
8 // 1. Generate deterministic event ID
9 let event_id = format!("{}-{}", cmd.order_id, cmd.sequence_number);
10
11 // 2. Check if already processed (idempotency)
12 let exists: bool = redis_client.exists(&event_id).await?;
13 if exists {
14 return Ok(()); // Already processed, skip
15 }
16
17 // 3. Run risk checks
18 let current_state = load_current_state().await?;
19 let passes_risk = check_risk_limits(&cmd, ¤t_state)?;
20
21 // 4. Create event
22 let event = if passes_risk {
23 RiskEvent::OrderValidated {
24 event_id: Uuid::new_v4(),
25 order_id: cmd.order_id,
26 user_id: cmd.user_id,
27 symbol: cmd.symbol,
28 quantity: cmd.quantity,
29 price: cmd.price,
30 timestamp_ns: chrono::Utc::now().timestamp_nanos(),
31 risk_checks_passed: vec!["position_limit", "notional_limit"],
32 }
33 } else {
34 RiskEvent::RiskLimitExceeded {
35 event_id: Uuid::new_v4(),
36 order_id: cmd.order_id,
37 limit_type: "position_limit".to_string(),
38 current_value: current_state.positions.get(&cmd.symbol).unwrap_or(&0.0) + cmd.quantity,
39 limit_value: 10000.0,
40 timestamp_ns: chrono::Utc::now().timestamp_nanos(),
41 }
42 };
43
44 // 5. Append to Kafka (event store)
45 kafka_producer.send(&event_id, &event).await?;
46
47 // 6. Mark as processed
48 redis_client.set_ex(&event_id, "1", 86400).await?; // 24-hour TTL
49
50 Ok(())
51}
52Idempotency guarantees:
Replaying millions of events is slow. Snapshots provide fast recovery.
Strategy:
1use bincode;
2use chrono::Utc;
3
4pub async fn take_snapshot(
5 state: &RiskState,
6 last_event_offset: u64,
7 s3_client: &aws_sdk_s3::Client,
8) -> Result<(), SnapshotError> {
9 // Serialize state
10 let serialized = bincode::serialize(state)?;
11
12 // Compress
13 let compressed = compress_zstd(&serialized)?;
14
15 // Upload to S3
16 let key = format!("risk-snapshots/{}.snapshot", Utc::now().timestamp());
17 s3_client.put_object()
18 .bucket("risk-engine-snapshots")
19 .key(&key)
20 .body(compressed.into())
21 .metadata("last_event_offset", last_event_offset.to_string())
22 .send()
23 .await?;
24
25 Ok(())
26}
27
28pub async fn load_latest_snapshot(
29 s3_client: &aws_sdk_s3::Client,
30) -> Result<(RiskState, u64), SnapshotError> {
31 // List snapshots, get latest
32 let objects = s3_client.list_objects_v2()
33 .bucket("risk-engine-snapshots")
34 .prefix("risk-snapshots/")
35 .send()
36 .await?;
37
38 let latest = objects.contents()
39 .and_then(|objs| objs.iter().max_by_key(|obj| obj.last_modified()))
40 .ok_or(SnapshotError::NoSnapshotFound)?;
41
42 // Download and decompress
43 let obj = s3_client.get_object()
44 .bucket("risk-engine-snapshots")
45 .key(latest.key().unwrap())
46 .send()
47 .await?;
48
49 let compressed = obj.body.collect().await?.into_bytes();
50 let serialized = decompress_zstd(&compressed)?;
51 let state: RiskState = bincode::deserialize(&serialized)?;
52
53 // Get offset from metadata
54 let offset = obj.metadata()
55 .and_then(|m| m.get("last_event_offset"))
56 .and_then(|s| s.parse().ok())
57 .unwrap_or(0);
58
59 Ok((state, offset))
60}
61Recovery time:
Firm: $500M AUM proprietary trading Strategies: 20+ automated strategies across equities, futures, options Volume: 50,000 orders/day Audit trigger: Unusual trading pattern flagged by exchange
Regulator demanded:
Step 1: Query events for target date
1SELECT * FROM risk_events
2WHERE timestamp_ns >= '2020-10-15 00:00:00'
3 AND timestamp_ns < '2020-10-16 00:00:00'
4ORDER BY timestamp_ns;
5Retrieved 127,000 events (50K orders × ~2.5 events per order)
Step 2: Rebuild state at each decision point For each order, replay events up to that timestamp to show exact risk state when decision was made.
Step 3: Generate audit report
Order ID: abc-123
Timestamp: 2020-10-15 09:32:15.123456789
Symbol: AAPL
Quantity: 1000
Price: \$150.00
Decision: REJECTED
Reason: Position limit exceeded
Current Position: 9,500 shares
Position Limit: 10,000 shares
Proposed Position: 10,500 shares (exceeds limit by 500)
Risk Checks Run: [position_limit, notional_limit, concentration_limit]
Results:
Competitor (traditional database):
Once published, events can never be deleted or modified. Design event schemas carefully—you'll live with them forever.
We use Avro schemas with strict compatibility rules:
Kafka is great for event streaming but terrible for queries. We maintain a read model (PostgreSQL) for fast queries.
Write path: Command → Event → Kafka Read path: Kafka → Projection → PostgreSQL → Query
Event consumers must keep up with producers. If lag grows, the read model becomes stale.
We alert if lag exceeds 1,000 events or 30 seconds.
Event sourcing transforms risk engines from compliance liabilities into audit superpowers. The $50M fine difference between the two firms in the audit shows the stakes.
Implementation requires rethinking data architecture: events instead of state, CQRS instead of CRUD, snapshots for performance. But the benefits—complete audit trails, replayability, debugging—are essential for modern financial systems.
Regulators increasingly expect event-sourced audit trails. Start now, before the audit.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.