Migrating a production trading system to event sourcing while maintaining 99.99% uptime was our biggest technical challenge. After completing the 18-month migration (2021-2023), I've learned that incremental cutover, dual-writes, and comprehensive testing are essential. This article shares our journey.
Architecture (2020):
Problems:
Event-Sourced System:
Write events alongside legacy updates:
1class OrderService:
2 def __init__(self, db, event_store, kafka_producer):
3 self.db = db
4 self.event_store = event_store
5 self.kafka = kafka_producer
6 self.shadow_mode = True # Dual-write mode
7
8 def create_order(self, order_data):
9 """Create order with dual-write pattern."""
10 # Legacy: INSERT into orders table
11 with self.db.transaction():
12 order_id = self._legacy_create_order(order_data)
13
14 if self.shadow_mode:
15 # New: Append event to event store
16 event = OrderCreatedEvent(
17 order_id=order_id,
18 timestamp=datetime.utcnow(),
19 trader_id=order_data['trader_id'],
20 symbol=order_data['symbol'],
21 side=order_data['side'],
22 quantity=order_data['quantity'],
23 price=order_data['price']
24 )
25
26 try:
27 self.event_store.append(event)
28 self.kafka.send('orders', event.to_json())
29 except Exception as e:
30 # Log but don't fail (shadow mode)
31 logger.error(f"Shadow write failed: {e}")
32
33 return order_id
34
35 def _legacy_create_order(self, order_data):
36 """Legacy implementation."""
37 cursor = self.db.cursor()
38 cursor.execute("""
39 INSERT INTO orders (trader_id, symbol, side, quantity, price, status)
40 VALUES (%s, %s, %s, %s, %s, 'OPEN')
41 RETURNING order_id
42 """, (
43 order_data['trader_id'],
44 order_data['symbol'],
45 order_data['side'],
46 order_data['quantity'],
47 order_data['price']
48 ))
49
50 return cursor.fetchone()[0]
51Results:
Reconstruct events from legacy database:
1class HistoryBackfill:
2 """Backfill historical events from legacy database."""
3
4 def __init__(self, legacy_db, event_store):
5 self.legacy_db = legacy_db
6 self.event_store = event_store
7
8 def backfill_orders(self, start_date, end_date, batch_size=10000):
9 """
10 Backfill order events from legacy data.
11
12 Challenge: Legacy DB only has current state, need to
13 reconstruct event sequence from audit logs and trade history.
14 """
15 cursor = self.legacy_db.cursor()
16
17 # Get orders in date range
18 cursor.execute("""
19 SELECT
20 o.order_id,
21 o.trader_id,
22 o.symbol,
23 o.side,
24 o.quantity,
25 o.price,
26 o.created_at,
27 o.status,
28 o.filled_quantity,
29 o.avg_fill_price,
30 COALESCE(al.modifications, '[]') AS modifications,
31 COALESCE(t.trades, '[]') AS trades
32 FROM orders o
33 LEFT JOIN LATERAL (
34 SELECT json_agg(
35 json_build_object(
36 'timestamp', timestamp,
37 'field', field_changed,
38 'old_value', old_value,
39 'new_value', new_value
40 )
41 ) AS modifications
42 FROM audit_log
43 WHERE entity_id = o.order_id
44 ) al ON true
45 LEFT JOIN LATERAL (
46 SELECT json_agg(
47 json_build_object(
48 'timestamp', executed_at,
49 'quantity', quantity,
50 'price', price
51 )
52 ) AS trades
53 FROM trades
54 WHERE order_id = o.order_id
55 ) t ON true
56 WHERE o.created_at BETWEEN %s AND %s
57 ORDER BY o.created_at
58 LIMIT %s
59 """, (start_date, end_date, batch_size))
60
61 orders = cursor.fetchall()
62
63 for order in orders:
64 # Reconstruct event sequence
65 events = self._reconstruct_events(order)
66
67 # Append to event store
68 for event in events:
69 self.event_store.append(event)
70
71 return len(orders)
72
73 def _reconstruct_events(self, order_row):
74 """Reconstruct event sequence from legacy data."""
75 events = []
76
77 # 1. OrderCreated
78 events.append(OrderCreatedEvent(
79 order_id=order_row['order_id'],
80 timestamp=order_row['created_at'],
81 trader_id=order_row['trader_id'],
82 symbol=order_row['symbol'],
83 side=order_row['side'],
84 quantity=order_row['quantity'],
85 price=order_row['price']
86 ))
87
88 # 2. OrderModified (from audit log)
89 modifications = json.loads(order_row['modifications'])
90 for mod in modifications:
91 events.append(OrderModifiedEvent(
92 order_id=order_row['order_id'],
93 timestamp=datetime.fromisoformat(mod['timestamp']),
94 field=mod['field'],
95 old_value=mod['old_value'],
96 new_value=mod['new_value']
97 ))
98
99 # 3. OrderFilled (from trades)
100 trades = json.loads(order_row['trades'])
101 filled_qty = 0
102
103 for trade in trades:
104 trade_qty = trade['quantity']
105 filled_qty += trade_qty
106
107 if filled_qty == order_row['quantity']:
108 # Full fill
109 events.append(OrderFilledEvent(
110 order_id=order_row['order_id'],
111 timestamp=datetime.fromisoformat(trade['timestamp']),
112 quantity=trade_qty,
113 price=trade['price']
114 ))
115 else:
116 # Partial fill
117 events.append(OrderPartiallyFilledEvent(
118 order_id=order_row['order_id'],
119 timestamp=datetime.fromisoformat(trade['timestamp']),
120 filled_quantity=trade_qty,
121 remaining_quantity=order_row['quantity'] - filled_qty,
122 price=trade['price']
123 ))
124
125 # 4. OrderCanceled (if applicable)
126 if order_row['status'] == 'CANCELED':
127 # Approximate timestamp (no exact data)
128 cancel_time = order_row['created_at'] + timedelta(hours=1)
129 events.append(OrderCanceledEvent(
130 order_id=order_row['order_id'],
131 timestamp=cancel_time,
132 reason='TRADER_CANCEL'
133 ))
134
135 return sorted(events, key=lambda e: e.timestamp)
136Results:
Build materialized views from events:
1-- Materialized view: Current order state
2CREATE MATERIALIZED VIEW current_orders AS
3SELECT
4 order_id,
5 trader_id,
6 symbol,
7 side,
8 quantity,
9 price,
10 status,
11 filled_quantity,
12 avg_fill_price,
13 created_at,
14 updated_at
15FROM (
16 SELECT
17 *,
18 ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY timestamp DESC) AS rn
19 FROM orders_projection
20) sub
21WHERE rn = 1;
22
23CREATE UNIQUE INDEX ON current_orders(order_id);
24CREATE INDEX ON current_orders(trader_id, created_at DESC);
25CREATE INDEX ON current_orders(symbol, created_at DESC);
26
27-- Refresh policy (incremental)
28CREATE OR REPLACE FUNCTION refresh_current_orders()
29RETURNS void AS $$
30BEGIN
31 -- Get latest event timestamp from materialized view
32 DECLARE
33 last_timestamp TIMESTAMP;
34 BEGIN
35 SELECT MAX(updated_at) INTO last_timestamp FROM current_orders;
36
37 -- Apply new events since last refresh
38 INSERT INTO orders_projection
39 SELECT
40 order_id,
41 trader_id,
42 symbol,
43 side,
44 quantity,
45 price,
46 status,
47 filled_quantity,
48 avg_fill_price,
49 created_at,
50 timestamp AS updated_at
51 FROM event_store
52 WHERE event_type IN ('OrderCreated', 'OrderModified', 'OrderFilled', 'OrderCanceled')
53 AND timestamp > COALESCE(last_timestamp, '1970-01-01')
54 ORDER BY timestamp;
55
56 -- Refresh materialized view
57 REFRESH MATERIALIZED VIEW CONCURRENTLY current_orders;
58 END;
59END;
60$$ LANGUAGE plpgsql;
61
62-- Schedule refresh every 1 minute
63SELECT cron.schedule('refresh-orders', '* * * * *', 'SELECT refresh_current_orders()');
64Results:
Zero-downtime migration:
1class MigrationCoordinator:
2 """Coordinate cutover from legacy to event-sourced system."""
3
4 def __init__(self):
5 self.cutover_flag = False
6 self.validation_errors = []
7
8 def validate_consistency(self):
9 """Validate legacy and event-sourced views match."""
10 cursor = self.db.cursor()
11
12 # Compare row counts
13 cursor.execute("SELECT COUNT(*) FROM orders")
14 legacy_count = cursor.fetchone()[0]
15
16 cursor.execute("SELECT COUNT(*) FROM current_orders")
17 event_count = cursor.fetchone()[0]
18
19 if legacy_count != event_count:
20 self.validation_errors.append(
21 f"Row count mismatch: legacy={legacy_count}, events={event_count}"
22 )
23
24 # Sample 10k random orders and compare values
25 cursor.execute("""
26 SELECT l.order_id,
27 l.status AS legacy_status,
28 e.status AS event_status,
29 l.filled_quantity AS legacy_filled,
30 e.filled_quantity AS event_filled
31 FROM orders l
32 JOIN current_orders e USING (order_id)
33 WHERE random() < 0.01 -- 1% sample
34 LIMIT 10000
35 """)
36
37 mismatches = 0
38 for row in cursor:
39 if row['legacy_status'] != row['event_status']:
40 mismatches += 1
41 self.validation_errors.append(
42 f"Order {row['order_id']}: status mismatch"
43 )
44
45 if abs(row['legacy_filled'] - row['event_filled']) > 0.01:
46 mismatches += 1
47 self.validation_errors.append(
48 f"Order {row['order_id']}: filled quantity mismatch"
49 )
50
51 return len(self.validation_errors) == 0
52
53 def perform_cutover(self):
54 """
55 Cutover to event-sourced system.
56
57 Steps:
58 1. Enable read-only mode on legacy
59 2. Final validation
60 3. Switch reads to event-sourced views
61 4. Switch writes to event-first
62 5. Re-enable writes
63 """
64 # 1. Read-only mode (5-minute maintenance window)
65 self._set_legacy_readonly(True)
66
67 # 2. Final sync and validation
68 time.sleep(10) # Wait for in-flight writes
69 self.refresh_materialized_views()
70
71 if not self.validate_consistency():
72 # Rollback
73 self._set_legacy_readonly(False)
74 raise Exception(f"Validation failed: {self.validation_errors}")
75
76 # 3. Switch reads (update app config)
77 self._update_config('read_source', 'event_sourced')
78
79 # 4. Switch writes (event-first mode)
80 self._update_config('write_mode', 'event_first')
81
82 # 5. Re-enable writes
83 self._set_legacy_readonly(False)
84
85 self.cutover_flag = True
86
87 logger.info("Cutover completed successfully!")
88Cutover Results:
Performance (Before/After):
1Metric Legacy Event-Sourced Improvement
2──────────────────────────────────────────────────────────────────────────────
3Order creation latency 25ms 12ms 52% faster
4Query latency (P99) 45ms 15ms 67% faster
5Audit trail completeness 65% 100% Full coverage
6Historical queries Impossible 2-3 seconds New capability
7Data corruption incidents 12/year 0 100% reduction
8Regulatory Compliance:
Event sourcing migration was successful. The immutable audit trail and ability to replay events transformed our debugging and compliance capabilities.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.