Trading systems fail in production in ways you can't predict. After 5 years of chaos engineering experiments on our trading infrastructure, I've learned that controlled failure injection is the only way to build truly resilient systems. This article shares production chaos experiments.
Traditional testing checks that systems work correctly. Chaos engineering verifies they fail safely.
Production incidents we prevented with chaos engineering:
1from dataclasses import dataclass
2from typing import Callable, Optional, Dict, Any
3import time
4import logging
5from contextlib import contextmanager
6
7@dataclass
8class ChaosExperiment:
9 """Define a chaos engineering experiment."""
10 name: str
11 hypothesis: str
12 steady_state_check: Callable[[], bool]
13 inject_failure: Callable[[], None]
14 cleanup: Callable[[], None]
15 duration_seconds: int = 60
16
17class ChaosRunner:
18 """Execute chaos experiments with safety checks."""
19
20 def __init__(self):
21 self.logger = logging.getLogger(__name__)
22
23 def run_experiment(self, experiment: ChaosExperiment) -> Dict[str, Any]:
24 """
25 Run chaos experiment with steady state verification.
26
27 Returns:
28 Results including whether hypothesis held.
29 """
30 self.logger.info(f"Starting experiment: {experiment.name}")
31 self.logger.info(f"Hypothesis: {experiment.hypothesis}")
32
33 # 1. Verify steady state before
34 if not experiment.steady_state_check():
35 return {
36 'success': False,
37 'error': 'System not in steady state before experiment',
38 'hypothesis_held': False
39 }
40
41 self.logger.info("✓ System in steady state")
42
43 try:
44 # 2. Inject failure
45 self.logger.info("Injecting failure...")
46 experiment.inject_failure()
47
48 # 3. Monitor during failure
49 start = time.time()
50 steady_state_violations = []
51
52 while time.time() - start < experiment.duration_seconds:
53 if not experiment.steady_state_check():
54 steady_state_violations.append({
55 'timestamp': time.time(),
56 'elapsed': time.time() - start
57 })
58 time.sleep(1)
59
60 # 4. Cleanup
61 self.logger.info("Cleaning up...")
62 experiment.cleanup()
63
64 # 5. Verify recovery
65 time.sleep(5) # Grace period
66
67 recovered = experiment.steady_state_check()
68
69 hypothesis_held = len(steady_state_violations) == 0
70
71 result = {
72 'success': True,
73 'hypothesis_held': hypothesis_held,
74 'violations': steady_state_violations,
75 'recovered': recovered,
76 'duration': time.time() - start
77 }
78
79 if hypothesis_held:
80 self.logger.info("✓ Hypothesis held - system remained stable")
81 else:
82 self.logger.warning(
83 f"✗ Hypothesis failed - {len(steady_state_violations)} "
84 f"violations"
85 )
86
87 if recovered:
88 self.logger.info("✓ System recovered after cleanup")
89 else:
90 self.logger.error("✗ System did not recover")
91
92 return result
93
94 except Exception as e:
95 self.logger.error(f"Experiment failed: {e}")
96
97 # Emergency cleanup
98 try:
99 experiment.cleanup()
100 except Exception as cleanup_error:
101 self.logger.error(f"Cleanup failed: {cleanup_error}")
102
103 return {
104 'success': False,
105 'error': str(e),
106 'hypothesis_held': False
107 }
108Using tc (traffic control) to inject latency:
1import subprocess
2from typing import Optional
3
4class NetworkLatencyInjector:
5 """Inject network latency using tc (traffic control)."""
6
7 def __init__(self, interface: str = "eth0"):
8 self.interface = interface
9 self.active = False
10
11 def inject_latency(self, latency_ms: int, jitter_ms: int = 0,
12 correlation_pct: int = 25):
13 """
14 Add latency to network interface.
15
16 Args:
17 latency_ms: Base latency in milliseconds
18 jitter_ms: Jitter (variance) in milliseconds
19 correlation_pct: Correlation between successive delays
20 """
21 if self.active:
22 raise RuntimeError("Latency already injected")
23
24 cmd = [
25 "tc", "qdisc", "add", "dev", self.interface,
26 "root", "netem", "delay",
27 f"{latency_ms}ms"
28 ]
29
30 if jitter_ms > 0:
31 cmd.extend([f"{jitter_ms}ms", f"{correlation_pct}%"])
32
33 subprocess.run(cmd, check=True)
34 self.active = True
35
36 def inject_packet_loss(self, loss_pct: float):
37 """Add packet loss to network interface."""
38 if self.active:
39 raise RuntimeError("Already active")
40
41 cmd = [
42 "tc", "qdisc", "add", "dev", self.interface,
43 "root", "netem", "loss",
44 f"{loss_pct}%"
45 ]
46
47 subprocess.run(cmd, check=True)
48 self.active = True
49
50 def cleanup(self):
51 """Remove tc rules."""
52 if not self.active:
53 return
54
55 cmd = ["tc", "qdisc", "del", "dev", self.interface, "root"]
56 subprocess.run(cmd, check=True)
57 self.active = False
581import requests
2from datetime import datetime, timedelta
3
4def check_market_data_freshness(max_age_seconds: int = 5) -> bool:
5 """
6 Steady state: Market data is fresh.
7
8 Returns:
9 True if latest market data is recent enough.
10 """
11 try:
12 response = requests.get("http://localhost:8000/api/market-data/latest",
13 timeout=2)
14 response.raise_for_status()
15
16 data = response.json()
17 timestamp = datetime.fromisoformat(data['timestamp'])
18 age = (datetime.utcnow() - timestamp).total_seconds()
19
20 return age < max_age_seconds
21 except Exception:
22 return False
23
24def create_latency_experiment() -> ChaosExperiment:
25 """Create experiment to test market data under network latency."""
26 injector = NetworkLatencyInjector(interface="eth0")
27
28 return ChaosExperiment(
29 name="Market Data High Latency",
30 hypothesis=(
31 "Market data service continues serving fresh data even "
32 "when exchange connection has 200ms latency"
33 ),
34 steady_state_check=lambda: check_market_data_freshness(max_age_seconds=5),
35 inject_failure=lambda: injector.inject_latency(
36 latency_ms=200,
37 jitter_ms=50,
38 correlation_pct=75
39 ),
40 cleanup=injector.cleanup,
41 duration_seconds=60
42 )
43
44# Run experiment
45runner = ChaosRunner()
46result = runner.run_experiment(create_latency_experiment())
471from kubernetes import client, config
2import random
3import time
4
5class KubernetesChaos:
6 """Inject failures into Kubernetes pods."""
7
8 def __init__(self, namespace: str = "trading"):
9 config.load_kube_config()
10 self.core_v1 = client.CoreV1Api()
11 self.namespace = namespace
12
13 def kill_random_pod(self, label_selector: str):
14 """Kill a random pod matching label selector."""
15 pods = self.core_v1.list_namespaced_pod(
16 namespace=self.namespace,
17 label_selector=label_selector
18 )
19
20 if not pods.items:
21 raise ValueError(f"No pods found with selector: {label_selector}")
22
23 # Choose random pod
24 pod = random.choice(pods.items)
25 pod_name = pod.metadata.name
26
27 # Delete pod
28 self.core_v1.delete_namespaced_pod(
29 name=pod_name,
30 namespace=self.namespace,
31 grace_period_seconds=0
32 )
33
34 return pod_name
35
36 def kill_pods_continuously(self, label_selector: str,
37 interval_seconds: int = 30,
38 duration_seconds: int = 300):
39 """
40 Continuously kill random pods (simulates cascading failures).
41
42 Args:
43 label_selector: Label to select pods
44 interval_seconds: Time between kills
45 duration_seconds: Total experiment duration
46 """
47 start = time.time()
48 killed_pods = []
49
50 while time.time() - start < duration_seconds:
51 try:
52 pod_name = self.kill_random_pod(label_selector)
53 killed_pods.append({
54 'pod': pod_name,
55 'timestamp': time.time()
56 })
57 print(f"Killed pod: {pod_name}")
58 except Exception as e:
59 print(f"Error killing pod: {e}")
60
61 time.sleep(interval_seconds)
62
63 return killed_pods
64
65def check_order_service_availability() -> bool:
66 """Check if order service is accepting orders."""
67 try:
68 response = requests.post(
69 "http://order-service/api/health",
70 timeout=5
71 )
72 return response.status_code == 200
73 except Exception:
74 return False
75
76def create_pod_chaos_experiment() -> ChaosExperiment:
77 """Test order service resilience to pod failures."""
78 chaos = KubernetesChaos(namespace="trading")
79
80 def inject_failure():
81 # Kill 1 pod every 30 seconds for 2 minutes
82 chaos.kill_pods_continuously(
83 label_selector="app=order-service",
84 interval_seconds=30,
85 duration_seconds=120
86 )
87
88 return ChaosExperiment(
89 name="Order Service Pod Chaos",
90 hypothesis=(
91 "Order service remains available (>99%) even when pods "
92 "are continuously killed"
93 ),
94 steady_state_check=check_order_service_availability,
95 inject_failure=inject_failure,
96 cleanup=lambda: None, # Kubernetes recreates pods automatically
97 duration_seconds=120
98 )
991import psycopg2
2from psycopg2 import pool as pg_pool
3from typing import Optional
4
5class DatabaseChaos:
6 """Inject database failures."""
7
8 def __init__(self, connection_string: str):
9 self.connection_string = connection_string
10 self.admin_conn: Optional[psycopg2.extensions.connection] = None
11
12 def terminate_connections(self, database: str, except_current: bool = True):
13 """Terminate all connections to database."""
14 # Connect as superuser
15 self.admin_conn = psycopg2.connect(self.connection_string)
16 self.admin_conn.autocommit = True
17
18 cursor = self.admin_conn.cursor()
19
20 # Terminate all connections except current
21 if except_current:
22 cursor.execute("""
23 SELECT pg_terminate_backend(pg_stat_activity.pid)
24 FROM pg_stat_activity
25 WHERE pg_stat_activity.datname = %s
26 AND pid <> pg_backend_pid()
27 """, (database,))
28 else:
29 cursor.execute("""
30 SELECT pg_terminate_backend(pg_stat_activity.pid)
31 FROM pg_stat_activity
32 WHERE pg_stat_activity.datname = %s
33 """, (database,))
34
35 terminated = cursor.fetchall()
36 cursor.close()
37
38 return len(terminated)
39
40 def inject_slow_query(self, duration_seconds: int = 30):
41 """Execute a query that locks table for duration."""
42 self.admin_conn = psycopg2.connect(self.connection_string)
43 cursor = self.admin_conn.cursor()
44
45 # Start transaction with lock
46 cursor.execute("BEGIN")
47 cursor.execute("LOCK TABLE trades IN ACCESS EXCLUSIVE MODE")
48 cursor.execute(f"SELECT pg_sleep({duration_seconds})")
49 cursor.execute("ROLLBACK")
50
51 cursor.close()
52
53 def cleanup(self):
54 """Close admin connection."""
55 if self.admin_conn:
56 self.admin_conn.close()
57 self.admin_conn = None
58
59# Connection pool resilience test
60class ResilientConnectionPool:
61 """Database connection pool with automatic retry."""
62
63 def __init__(self, connection_string: str, minconn: int = 2,
64 maxconn: int = 10):
65 self.pool = pg_pool.ThreadedConnectionPool(
66 minconn, maxconn, connection_string
67 )
68
69 def execute_with_retry(self, query: str, params=None,
70 max_retries: int = 3):
71 """Execute query with automatic retry on connection failure."""
72 for attempt in range(max_retries):
73 conn = None
74 try:
75 conn = self.pool.getconn()
76 cursor = conn.cursor()
77 cursor.execute(query, params)
78 result = cursor.fetchall()
79 conn.commit()
80 cursor.close()
81 return result
82
83 except psycopg2.OperationalError as e:
84 if conn:
85 self.pool.putconn(conn, close=True)
86
87 if attempt == max_retries - 1:
88 raise
89
90 time.sleep(2 ** attempt) # Exponential backoff
91
92 finally:
93 if conn:
94 self.pool.putconn(conn)
95
96def check_database_queries() -> bool:
97 """Check if database queries succeed."""
98 pool = ResilientConnectionPool(
99 "postgresql://localhost/trading"
100 )
101
102 try:
103 result = pool.execute_with_retry("SELECT 1")
104 return result == [(1,)]
105 except Exception:
106 return False
107
108def create_db_chaos_experiment() -> ChaosExperiment:
109 """Test database connection resilience."""
110 chaos = DatabaseChaos("postgresql://postgres@localhost/postgres")
111
112 return ChaosExperiment(
113 name="Database Connection Termination",
114 hypothesis=(
115 "Application automatically recovers from database "
116 "connection termination within 5 seconds"
117 ),
118 steady_state_check=check_database_queries,
119 inject_failure=lambda: chaos.terminate_connections("trading"),
120 cleanup=chaos.cleanup,
121 duration_seconds=30
122 )
123We run chaos experiments on a schedule:
1# chaos-schedule.yaml
2apiVersion: v1
3kind: ConfigMap
4metadata:
5 name: chaos-schedule
6data:
7 schedule.json: |
8 {
9 "experiments": [
10 {
11 "name": "network-latency",
12 "schedule": "0 2 * * 1",
13 "enabled": true
14 },
15 {
16 "name": "pod-chaos",
17 "schedule": "0 2 * * 3",
18 "enabled": true
19 },
20 {
21 "name": "database-failure",
22 "schedule": "0 2 * * 5",
23 "enabled": true
24 }
25 ]
26 }
27Chaos engineering impact (2020-2024):
1Metric Before After
2────────────────────────────────────────────────────────
3Mean time to recovery 45 min 8 min
4Incidents per quarter 12 3
5P99 latency during failures 15s 200ms
6False positive alerts 40% 5%
7Chaos engineering transforms how we build systems. Instead of hoping for reliability, we verify it through controlled failure injection.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.