NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 31, 2024
•
NordVarg Team
•

Case Study: Event Sourcing Migration

Case Studiesevent-sourcingmigrationcase-studyarchitecturepostgresql
7 min read
Share:

Migrating a production trading system to event sourcing while maintaining 99.99% uptime was our biggest technical challenge. After completing the 18-month migration (2021-2023), I've learned that incremental cutover, dual-writes, and comprehensive testing are essential. This article shares our journey.

The Legacy System#

Architecture (2020):

  • Monolithic Python application
  • PostgreSQL with mutable state
  • Order updates via UPDATE statements
  • Limited audit trail (only current state)
  • 50k orders/day, 15 traders

Problems:

  • No audit trail: Regulators required complete order history
  • Race conditions: Concurrent updates caused data corruption (12 incidents/year)
  • Can't reconstruct: Unable to replay events for debugging
  • Slow queries: Historical analysis required expensive JOINs

Target Architecture#

Event-Sourced System:

  • Immutable event log (PostgreSQL + Kafka)
  • Materialized views for current state
  • Event replay for debugging
  • Complete audit trail
  • Horizontal scalability

Migration Strategy#

Phase 1: Shadow Write (Months 1-3)#

Write events alongside legacy updates:

python
1class OrderService:
2    def __init__(self, db, event_store, kafka_producer):
3        self.db = db
4        self.event_store = event_store
5        self.kafka = kafka_producer
6        self.shadow_mode = True  # Dual-write mode
7    
8    def create_order(self, order_data):
9        """Create order with dual-write pattern."""
10        # Legacy: INSERT into orders table
11        with self.db.transaction():
12            order_id = self._legacy_create_order(order_data)
13            
14            if self.shadow_mode:
15                # New: Append event to event store
16                event = OrderCreatedEvent(
17                    order_id=order_id,
18                    timestamp=datetime.utcnow(),
19                    trader_id=order_data['trader_id'],
20                    symbol=order_data['symbol'],
21                    side=order_data['side'],
22                    quantity=order_data['quantity'],
23                    price=order_data['price']
24                )
25                
26                try:
27                    self.event_store.append(event)
28                    self.kafka.send('orders', event.to_json())
29                except Exception as e:
30                    # Log but don't fail (shadow mode)
31                    logger.error(f"Shadow write failed: {e}")
32        
33        return order_id
34    
35    def _legacy_create_order(self, order_data):
36        """Legacy implementation."""
37        cursor = self.db.cursor()
38        cursor.execute("""
39            INSERT INTO orders (trader_id, symbol, side, quantity, price, status)
40            VALUES (%s, %s, %s, %s, %s, 'OPEN')
41            RETURNING order_id
42        """, (
43            order_data['trader_id'],
44            order_data['symbol'],
45            order_data['side'],
46            order_data['quantity'],
47            order_data['price']
48        ))
49        
50        return cursor.fetchone()[0]
51

Results:

  • Event store received 100% of new orders
  • 0 production failures (shadow mode isolated errors)
  • Validated event schema and serialization

Phase 2: Backfill History (Months 4-6)#

Reconstruct events from legacy database:

python
1class HistoryBackfill:
2    """Backfill historical events from legacy database."""
3    
4    def __init__(self, legacy_db, event_store):
5        self.legacy_db = legacy_db
6        self.event_store = event_store
7    
8    def backfill_orders(self, start_date, end_date, batch_size=10000):
9        """
10        Backfill order events from legacy data.
11        
12        Challenge: Legacy DB only has current state, need to
13        reconstruct event sequence from audit logs and trade history.
14        """
15        cursor = self.legacy_db.cursor()
16        
17        # Get orders in date range
18        cursor.execute("""
19            SELECT
20                o.order_id,
21                o.trader_id,
22                o.symbol,
23                o.side,
24                o.quantity,
25                o.price,
26                o.created_at,
27                o.status,
28                o.filled_quantity,
29                o.avg_fill_price,
30                COALESCE(al.modifications, '[]') AS modifications,
31                COALESCE(t.trades, '[]') AS trades
32            FROM orders o
33            LEFT JOIN LATERAL (
34                SELECT json_agg(
35                    json_build_object(
36                        'timestamp', timestamp,
37                        'field', field_changed,
38                        'old_value', old_value,
39                        'new_value', new_value
40                    )
41                ) AS modifications
42                FROM audit_log
43                WHERE entity_id = o.order_id
44            ) al ON true
45            LEFT JOIN LATERAL (
46                SELECT json_agg(
47                    json_build_object(
48                        'timestamp', executed_at,
49                        'quantity', quantity,
50                        'price', price
51                    )
52                ) AS trades
53                FROM trades
54                WHERE order_id = o.order_id
55            ) t ON true
56            WHERE o.created_at BETWEEN %s AND %s
57            ORDER BY o.created_at
58            LIMIT %s
59        """, (start_date, end_date, batch_size))
60        
61        orders = cursor.fetchall()
62        
63        for order in orders:
64            # Reconstruct event sequence
65            events = self._reconstruct_events(order)
66            
67            # Append to event store
68            for event in events:
69                self.event_store.append(event)
70        
71        return len(orders)
72    
73    def _reconstruct_events(self, order_row):
74        """Reconstruct event sequence from legacy data."""
75        events = []
76        
77        # 1. OrderCreated
78        events.append(OrderCreatedEvent(
79            order_id=order_row['order_id'],
80            timestamp=order_row['created_at'],
81            trader_id=order_row['trader_id'],
82            symbol=order_row['symbol'],
83            side=order_row['side'],
84            quantity=order_row['quantity'],
85            price=order_row['price']
86        ))
87        
88        # 2. OrderModified (from audit log)
89        modifications = json.loads(order_row['modifications'])
90        for mod in modifications:
91            events.append(OrderModifiedEvent(
92                order_id=order_row['order_id'],
93                timestamp=datetime.fromisoformat(mod['timestamp']),
94                field=mod['field'],
95                old_value=mod['old_value'],
96                new_value=mod['new_value']
97            ))
98        
99        # 3. OrderFilled (from trades)
100        trades = json.loads(order_row['trades'])
101        filled_qty = 0
102        
103        for trade in trades:
104            trade_qty = trade['quantity']
105            filled_qty += trade_qty
106            
107            if filled_qty == order_row['quantity']:
108                # Full fill
109                events.append(OrderFilledEvent(
110                    order_id=order_row['order_id'],
111                    timestamp=datetime.fromisoformat(trade['timestamp']),
112                    quantity=trade_qty,
113                    price=trade['price']
114                ))
115            else:
116                # Partial fill
117                events.append(OrderPartiallyFilledEvent(
118                    order_id=order_row['order_id'],
119                    timestamp=datetime.fromisoformat(trade['timestamp']),
120                    filled_quantity=trade_qty,
121                    remaining_quantity=order_row['quantity'] - filled_qty,
122                    price=trade['price']
123                ))
124        
125        # 4. OrderCanceled (if applicable)
126        if order_row['status'] == 'CANCELED':
127            # Approximate timestamp (no exact data)
128            cancel_time = order_row['created_at'] + timedelta(hours=1)
129            events.append(OrderCanceledEvent(
130                order_id=order_row['order_id'],
131                timestamp=cancel_time,
132                reason='TRADER_CANCEL'
133            ))
134        
135        return sorted(events, key=lambda e: e.timestamp)
136

Results:

  • Backfilled 18 months of history (9.2M orders, 47M events)
  • Took 12 days to process (parallelized by date ranges)
  • Found 142 data inconsistencies in legacy system
  • Validated event reconstruction accuracy: 99.7%

Phase 3: Read-Side Projection (Months 7-10)#

Build materialized views from events:

sql
1-- Materialized view: Current order state
2CREATE MATERIALIZED VIEW current_orders AS
3SELECT
4    order_id,
5    trader_id,
6    symbol,
7    side,
8    quantity,
9    price,
10    status,
11    filled_quantity,
12    avg_fill_price,
13    created_at,
14    updated_at
15FROM (
16    SELECT
17        *,
18        ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY timestamp DESC) AS rn
19    FROM orders_projection
20) sub
21WHERE rn = 1;
22
23CREATE UNIQUE INDEX ON current_orders(order_id);
24CREATE INDEX ON current_orders(trader_id, created_at DESC);
25CREATE INDEX ON current_orders(symbol, created_at DESC);
26
27-- Refresh policy (incremental)
28CREATE OR REPLACE FUNCTION refresh_current_orders()
29RETURNS void AS $$
30BEGIN
31    -- Get latest event timestamp from materialized view
32    DECLARE
33        last_timestamp TIMESTAMP;
34    BEGIN
35        SELECT MAX(updated_at) INTO last_timestamp FROM current_orders;
36        
37        -- Apply new events since last refresh
38        INSERT INTO orders_projection
39        SELECT
40            order_id,
41            trader_id,
42            symbol,
43            side,
44            quantity,
45            price,
46            status,
47            filled_quantity,
48            avg_fill_price,
49            created_at,
50            timestamp AS updated_at
51        FROM event_store
52        WHERE event_type IN ('OrderCreated', 'OrderModified', 'OrderFilled', 'OrderCanceled')
53          AND timestamp > COALESCE(last_timestamp, '1970-01-01')
54        ORDER BY timestamp;
55        
56        -- Refresh materialized view
57        REFRESH MATERIALIZED VIEW CONCURRENTLY current_orders;
58    END;
59END;
60$$ LANGUAGE plpgsql;
61
62-- Schedule refresh every 1 minute
63SELECT cron.schedule('refresh-orders', '* * * * *', 'SELECT refresh_current_orders()');
64

Results:

  • Read queries 40% faster than legacy (indexed materialized view)
  • Incremental refresh took 2-5 seconds (vs full rebuild 45 seconds)
  • Query latency P99: 15ms (was 45ms on legacy)

Phase 4: Cutover (Month 11)#

Zero-downtime migration:

python
1class MigrationCoordinator:
2    """Coordinate cutover from legacy to event-sourced system."""
3    
4    def __init__(self):
5        self.cutover_flag = False
6        self.validation_errors = []
7    
8    def validate_consistency(self):
9        """Validate legacy and event-sourced views match."""
10        cursor = self.db.cursor()
11        
12        # Compare row counts
13        cursor.execute("SELECT COUNT(*) FROM orders")
14        legacy_count = cursor.fetchone()[0]
15        
16        cursor.execute("SELECT COUNT(*) FROM current_orders")
17        event_count = cursor.fetchone()[0]
18        
19        if legacy_count != event_count:
20            self.validation_errors.append(
21                f"Row count mismatch: legacy={legacy_count}, events={event_count}"
22            )
23        
24        # Sample 10k random orders and compare values
25        cursor.execute("""
26            SELECT l.order_id,
27                   l.status AS legacy_status,
28                   e.status AS event_status,
29                   l.filled_quantity AS legacy_filled,
30                   e.filled_quantity AS event_filled
31            FROM orders l
32            JOIN current_orders e USING (order_id)
33            WHERE random() < 0.01  -- 1% sample
34            LIMIT 10000
35        """)
36        
37        mismatches = 0
38        for row in cursor:
39            if row['legacy_status'] != row['event_status']:
40                mismatches += 1
41                self.validation_errors.append(
42                    f"Order {row['order_id']}: status mismatch"
43                )
44            
45            if abs(row['legacy_filled'] - row['event_filled']) > 0.01:
46                mismatches += 1
47                self.validation_errors.append(
48                    f"Order {row['order_id']}: filled quantity mismatch"
49                )
50        
51        return len(self.validation_errors) == 0
52    
53    def perform_cutover(self):
54        """
55        Cutover to event-sourced system.
56        
57        Steps:
58        1. Enable read-only mode on legacy
59        2. Final validation
60        3. Switch reads to event-sourced views
61        4. Switch writes to event-first
62        5. Re-enable writes
63        """
64        # 1. Read-only mode (5-minute maintenance window)
65        self._set_legacy_readonly(True)
66        
67        # 2. Final sync and validation
68        time.sleep(10)  # Wait for in-flight writes
69        self.refresh_materialized_views()
70        
71        if not self.validate_consistency():
72            # Rollback
73            self._set_legacy_readonly(False)
74            raise Exception(f"Validation failed: {self.validation_errors}")
75        
76        # 3. Switch reads (update app config)
77        self._update_config('read_source', 'event_sourced')
78        
79        # 4. Switch writes (event-first mode)
80        self._update_config('write_mode', 'event_first')
81        
82        # 5. Re-enable writes
83        self._set_legacy_readonly(False)
84        
85        self.cutover_flag = True
86        
87        logger.info("Cutover completed successfully!")
88

Cutover Results:

  • Downtime: 4 minutes 37 seconds (planned: 5 minutes)
  • 0 orders lost during cutover
  • Rollback plan tested 3 times in staging (worked perfectly)
  • First production order processed 8 seconds after cutover

Production Results#

Performance (Before/After):

plaintext
1Metric                      Legacy      Event-Sourced    Improvement
2──────────────────────────────────────────────────────────────────────────────
3Order creation latency      25ms        12ms             52% faster
4Query latency (P99)         45ms        15ms             67% faster
5Audit trail completeness    65%         100%             Full coverage
6Historical queries          Impossible  2-3 seconds      New capability
7Data corruption incidents   12/year     0                100% reduction
8

Regulatory Compliance:

  • Passed MiFID II audit (first attempt)
  • Complete order lifecycle reconstruction
  • 7-year audit trail retention
  • Timestamp accuracy: nanosecond precision

Lessons Learned#

  1. Shadow write essential: Validated new system before cutover
  2. Backfill challenging: Legacy data incomplete, required heuristics
  3. Incremental refresh: Full materialized view refresh too slow
  4. Cutover practice: Rehearsed cutover 5 times in staging
  5. Monitoring critical: Dashboards showed divergence immediately
  6. Validation thorough: Sampled 10k orders every hour during migration
  7. Rollback plan: Had script to revert in under 2 minutes
  8. Team alignment: Weekly sync across engineering, trading, compliance
  9. Documentation: Runbooks saved 40+ hours during troubleshooting
  10. Post-migration: Kept legacy system read-only for 3 months (safety net)

Key Takeaways#

  • Duration: 18 months (planning 3 months, execution 15 months)
  • Team: 4 engineers full-time
  • Cost: $800k (labor + infrastructure)
  • ROI: Regulatory compliance + 0 data corruption incidents
  • Would do again: Yes, event sourcing solved core business problems

Event sourcing migration was successful. The immutable audit trail and ability to replay events transformed our debugging and compliance capabilities.

Further Reading#

  • Event Sourcing Pattern
  • Turning the Database Inside Out
  • Building Event-Driven Microservices
  • CQRS Journey
NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

event-sourcingmigrationcase-studyarchitecturepostgresql

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 31, 2024•7 min read
Case Study: Latency Reduction Journey
Case Studieslatencyhft
Dec 31, 2024•8 min read
Building Audit Trails for Financial Systems
Compliancecomplianceaudit
Nov 27, 2025•8 min read
Event Sourcing the Risk Engine: The Regulatory Audit That Saved $50M
Backend Engineeringevent-sourcingrisk-engine

Interested in working together?