NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 31, 2024
•
NordVarg Team
•

Chaos Engineering for Trading Infrastructure

Testingchaos-engineeringresiliencetestingkubernetespython
8 min read
Share:

Trading systems fail in production in ways you can't predict. After 5 years of chaos engineering experiments on our trading infrastructure, I've learned that controlled failure injection is the only way to build truly resilient systems. This article shares production chaos experiments.

Why Chaos Engineering#

Traditional testing checks that systems work correctly. Chaos engineering verifies they fail safely.

Production incidents we prevented with chaos engineering:

  • Market data gap: System froze when exchange disconnected (2021)
  • Order cascade: Retry storm took down OMS after network blip (2022)
  • Database failover: 15-minute outage during Postgres replica promotion (2022)
  • Memory leak: Gradual degradation under sustained load (2023)

Chaos Experiment Framework#

python
1from dataclasses import dataclass
2from typing import Callable, Optional, Dict, Any
3import time
4import logging
5from contextlib import contextmanager
6
7@dataclass
8class ChaosExperiment:
9    """Define a chaos engineering experiment."""
10    name: str
11    hypothesis: str
12    steady_state_check: Callable[[], bool]
13    inject_failure: Callable[[], None]
14    cleanup: Callable[[], None]
15    duration_seconds: int = 60
16    
17class ChaosRunner:
18    """Execute chaos experiments with safety checks."""
19    
20    def __init__(self):
21        self.logger = logging.getLogger(__name__)
22        
23    def run_experiment(self, experiment: ChaosExperiment) -> Dict[str, Any]:
24        """
25        Run chaos experiment with steady state verification.
26        
27        Returns:
28            Results including whether hypothesis held.
29        """
30        self.logger.info(f"Starting experiment: {experiment.name}")
31        self.logger.info(f"Hypothesis: {experiment.hypothesis}")
32        
33        # 1. Verify steady state before
34        if not experiment.steady_state_check():
35            return {
36                'success': False,
37                'error': 'System not in steady state before experiment',
38                'hypothesis_held': False
39            }
40        
41        self.logger.info("✓ System in steady state")
42        
43        try:
44            # 2. Inject failure
45            self.logger.info("Injecting failure...")
46            experiment.inject_failure()
47            
48            # 3. Monitor during failure
49            start = time.time()
50            steady_state_violations = []
51            
52            while time.time() - start < experiment.duration_seconds:
53                if not experiment.steady_state_check():
54                    steady_state_violations.append({
55                        'timestamp': time.time(),
56                        'elapsed': time.time() - start
57                    })
58                time.sleep(1)
59            
60            # 4. Cleanup
61            self.logger.info("Cleaning up...")
62            experiment.cleanup()
63            
64            # 5. Verify recovery
65            time.sleep(5)  # Grace period
66            
67            recovered = experiment.steady_state_check()
68            
69            hypothesis_held = len(steady_state_violations) == 0
70            
71            result = {
72                'success': True,
73                'hypothesis_held': hypothesis_held,
74                'violations': steady_state_violations,
75                'recovered': recovered,
76                'duration': time.time() - start
77            }
78            
79            if hypothesis_held:
80                self.logger.info("✓ Hypothesis held - system remained stable")
81            else:
82                self.logger.warning(
83                    f"✗ Hypothesis failed - {len(steady_state_violations)} "
84                    f"violations"
85                )
86            
87            if recovered:
88                self.logger.info("✓ System recovered after cleanup")
89            else:
90                self.logger.error("✗ System did not recover")
91            
92            return result
93            
94        except Exception as e:
95            self.logger.error(f"Experiment failed: {e}")
96            
97            # Emergency cleanup
98            try:
99                experiment.cleanup()
100            except Exception as cleanup_error:
101                self.logger.error(f"Cleanup failed: {cleanup_error}")
102            
103            return {
104                'success': False,
105                'error': str(e),
106                'hypothesis_held': False
107            }
108

Network Latency Injection#

Using tc (traffic control) to inject latency:

python
1import subprocess
2from typing import Optional
3
4class NetworkLatencyInjector:
5    """Inject network latency using tc (traffic control)."""
6    
7    def __init__(self, interface: str = "eth0"):
8        self.interface = interface
9        self.active = False
10        
11    def inject_latency(self, latency_ms: int, jitter_ms: int = 0,
12                      correlation_pct: int = 25):
13        """
14        Add latency to network interface.
15        
16        Args:
17            latency_ms: Base latency in milliseconds
18            jitter_ms: Jitter (variance) in milliseconds
19            correlation_pct: Correlation between successive delays
20        """
21        if self.active:
22            raise RuntimeError("Latency already injected")
23        
24        cmd = [
25            "tc", "qdisc", "add", "dev", self.interface,
26            "root", "netem", "delay",
27            f"{latency_ms}ms"
28        ]
29        
30        if jitter_ms > 0:
31            cmd.extend([f"{jitter_ms}ms", f"{correlation_pct}%"])
32        
33        subprocess.run(cmd, check=True)
34        self.active = True
35        
36    def inject_packet_loss(self, loss_pct: float):
37        """Add packet loss to network interface."""
38        if self.active:
39            raise RuntimeError("Already active")
40        
41        cmd = [
42            "tc", "qdisc", "add", "dev", self.interface,
43            "root", "netem", "loss",
44            f"{loss_pct}%"
45        ]
46        
47        subprocess.run(cmd, check=True)
48        self.active = True
49        
50    def cleanup(self):
51        """Remove tc rules."""
52        if not self.active:
53            return
54        
55        cmd = ["tc", "qdisc", "del", "dev", self.interface, "root"]
56        subprocess.run(cmd, check=True)
57        self.active = False
58

Experiment: Market Data Latency#

python
1import requests
2from datetime import datetime, timedelta
3
4def check_market_data_freshness(max_age_seconds: int = 5) -> bool:
5    """
6    Steady state: Market data is fresh.
7    
8    Returns:
9        True if latest market data is recent enough.
10    """
11    try:
12        response = requests.get("http://localhost:8000/api/market-data/latest",
13                               timeout=2)
14        response.raise_for_status()
15        
16        data = response.json()
17        timestamp = datetime.fromisoformat(data['timestamp'])
18        age = (datetime.utcnow() - timestamp).total_seconds()
19        
20        return age < max_age_seconds
21    except Exception:
22        return False
23
24def create_latency_experiment() -> ChaosExperiment:
25    """Create experiment to test market data under network latency."""
26    injector = NetworkLatencyInjector(interface="eth0")
27    
28    return ChaosExperiment(
29        name="Market Data High Latency",
30        hypothesis=(
31            "Market data service continues serving fresh data even "
32            "when exchange connection has 200ms latency"
33        ),
34        steady_state_check=lambda: check_market_data_freshness(max_age_seconds=5),
35        inject_failure=lambda: injector.inject_latency(
36            latency_ms=200,
37            jitter_ms=50,
38            correlation_pct=75
39        ),
40        cleanup=injector.cleanup,
41        duration_seconds=60
42    )
43
44# Run experiment
45runner = ChaosRunner()
46result = runner.run_experiment(create_latency_experiment())
47

Pod Killing (Kubernetes)#

python
1from kubernetes import client, config
2import random
3import time
4
5class KubernetesChaos:
6    """Inject failures into Kubernetes pods."""
7    
8    def __init__(self, namespace: str = "trading"):
9        config.load_kube_config()
10        self.core_v1 = client.CoreV1Api()
11        self.namespace = namespace
12        
13    def kill_random_pod(self, label_selector: str):
14        """Kill a random pod matching label selector."""
15        pods = self.core_v1.list_namespaced_pod(
16            namespace=self.namespace,
17            label_selector=label_selector
18        )
19        
20        if not pods.items:
21            raise ValueError(f"No pods found with selector: {label_selector}")
22        
23        # Choose random pod
24        pod = random.choice(pods.items)
25        pod_name = pod.metadata.name
26        
27        # Delete pod
28        self.core_v1.delete_namespaced_pod(
29            name=pod_name,
30            namespace=self.namespace,
31            grace_period_seconds=0
32        )
33        
34        return pod_name
35    
36    def kill_pods_continuously(self, label_selector: str, 
37                              interval_seconds: int = 30,
38                              duration_seconds: int = 300):
39        """
40        Continuously kill random pods (simulates cascading failures).
41        
42        Args:
43            label_selector: Label to select pods
44            interval_seconds: Time between kills
45            duration_seconds: Total experiment duration
46        """
47        start = time.time()
48        killed_pods = []
49        
50        while time.time() - start < duration_seconds:
51            try:
52                pod_name = self.kill_random_pod(label_selector)
53                killed_pods.append({
54                    'pod': pod_name,
55                    'timestamp': time.time()
56                })
57                print(f"Killed pod: {pod_name}")
58            except Exception as e:
59                print(f"Error killing pod: {e}")
60            
61            time.sleep(interval_seconds)
62        
63        return killed_pods
64
65def check_order_service_availability() -> bool:
66    """Check if order service is accepting orders."""
67    try:
68        response = requests.post(
69            "http://order-service/api/health",
70            timeout=5
71        )
72        return response.status_code == 200
73    except Exception:
74        return False
75
76def create_pod_chaos_experiment() -> ChaosExperiment:
77    """Test order service resilience to pod failures."""
78    chaos = KubernetesChaos(namespace="trading")
79    
80    def inject_failure():
81        # Kill 1 pod every 30 seconds for 2 minutes
82        chaos.kill_pods_continuously(
83            label_selector="app=order-service",
84            interval_seconds=30,
85            duration_seconds=120
86        )
87    
88    return ChaosExperiment(
89        name="Order Service Pod Chaos",
90        hypothesis=(
91            "Order service remains available (>99%) even when pods "
92            "are continuously killed"
93        ),
94        steady_state_check=check_order_service_availability,
95        inject_failure=inject_failure,
96        cleanup=lambda: None,  # Kubernetes recreates pods automatically
97        duration_seconds=120
98    )
99

Database Failure Injection#

python
1import psycopg2
2from psycopg2 import pool as pg_pool
3from typing import Optional
4
5class DatabaseChaos:
6    """Inject database failures."""
7    
8    def __init__(self, connection_string: str):
9        self.connection_string = connection_string
10        self.admin_conn: Optional[psycopg2.extensions.connection] = None
11        
12    def terminate_connections(self, database: str, except_current: bool = True):
13        """Terminate all connections to database."""
14        # Connect as superuser
15        self.admin_conn = psycopg2.connect(self.connection_string)
16        self.admin_conn.autocommit = True
17        
18        cursor = self.admin_conn.cursor()
19        
20        # Terminate all connections except current
21        if except_current:
22            cursor.execute("""
23                SELECT pg_terminate_backend(pg_stat_activity.pid)
24                FROM pg_stat_activity
25                WHERE pg_stat_activity.datname = %s
26                  AND pid <> pg_backend_pid()
27            """, (database,))
28        else:
29            cursor.execute("""
30                SELECT pg_terminate_backend(pg_stat_activity.pid)
31                FROM pg_stat_activity
32                WHERE pg_stat_activity.datname = %s
33            """, (database,))
34        
35        terminated = cursor.fetchall()
36        cursor.close()
37        
38        return len(terminated)
39    
40    def inject_slow_query(self, duration_seconds: int = 30):
41        """Execute a query that locks table for duration."""
42        self.admin_conn = psycopg2.connect(self.connection_string)
43        cursor = self.admin_conn.cursor()
44        
45        # Start transaction with lock
46        cursor.execute("BEGIN")
47        cursor.execute("LOCK TABLE trades IN ACCESS EXCLUSIVE MODE")
48        cursor.execute(f"SELECT pg_sleep({duration_seconds})")
49        cursor.execute("ROLLBACK")
50        
51        cursor.close()
52    
53    def cleanup(self):
54        """Close admin connection."""
55        if self.admin_conn:
56            self.admin_conn.close()
57            self.admin_conn = None
58
59# Connection pool resilience test
60class ResilientConnectionPool:
61    """Database connection pool with automatic retry."""
62    
63    def __init__(self, connection_string: str, minconn: int = 2,
64                 maxconn: int = 10):
65        self.pool = pg_pool.ThreadedConnectionPool(
66            minconn, maxconn, connection_string
67        )
68        
69    def execute_with_retry(self, query: str, params=None,
70                          max_retries: int = 3):
71        """Execute query with automatic retry on connection failure."""
72        for attempt in range(max_retries):
73            conn = None
74            try:
75                conn = self.pool.getconn()
76                cursor = conn.cursor()
77                cursor.execute(query, params)
78                result = cursor.fetchall()
79                conn.commit()
80                cursor.close()
81                return result
82                
83            except psycopg2.OperationalError as e:
84                if conn:
85                    self.pool.putconn(conn, close=True)
86                
87                if attempt == max_retries - 1:
88                    raise
89                
90                time.sleep(2 ** attempt)  # Exponential backoff
91                
92            finally:
93                if conn:
94                    self.pool.putconn(conn)
95
96def check_database_queries() -> bool:
97    """Check if database queries succeed."""
98    pool = ResilientConnectionPool(
99        "postgresql://localhost/trading"
100    )
101    
102    try:
103        result = pool.execute_with_retry("SELECT 1")
104        return result == [(1,)]
105    except Exception:
106        return False
107
108def create_db_chaos_experiment() -> ChaosExperiment:
109    """Test database connection resilience."""
110    chaos = DatabaseChaos("postgresql://postgres@localhost/postgres")
111    
112    return ChaosExperiment(
113        name="Database Connection Termination",
114        hypothesis=(
115            "Application automatically recovers from database "
116            "connection termination within 5 seconds"
117        ),
118        steady_state_check=check_database_queries,
119        inject_failure=lambda: chaos.terminate_connections("trading"),
120        cleanup=chaos.cleanup,
121        duration_seconds=30
122    )
123

Production Chaos Schedule#

We run chaos experiments on a schedule:

yaml
1# chaos-schedule.yaml
2apiVersion: v1
3kind: ConfigMap
4metadata:
5  name: chaos-schedule
6data:
7  schedule.json: |
8    {
9      "experiments": [
10        {
11          "name": "network-latency",
12          "schedule": "0 2 * * 1",
13          "enabled": true
14        },
15        {
16          "name": "pod-chaos",
17          "schedule": "0 2 * * 3",
18          "enabled": true
19        },
20        {
21          "name": "database-failure",
22          "schedule": "0 2 * * 5",
23          "enabled": true
24        }
25      ]
26    }
27

Results#

Chaos engineering impact (2020-2024):

plaintext
1Metric                        Before      After
2────────────────────────────────────────────────────────
3Mean time to recovery         45 min      8 min
4Incidents per quarter         12          3
5P99 latency during failures   15s         200ms
6False positive alerts         40%         5%
7

Specific Improvements#

  1. Connection pooling: Added automatic retry → 0 DB outages (was 4/year)
  2. Circuit breakers: Prevent cascade failures → 95% fewer incidents
  3. Graceful degradation: Market data stale fallback → 99.99% uptime
  4. Kubernetes replicas: Min 3 pods → survived 2 simultaneous pod failures

Chaos Engineering Best Practices#

  1. Start small: Single service, dev environment first
  2. Define steady state: Clear, measurable SLOs
  3. Automate experiments: Run on schedule, not ad-hoc
  4. Abort on emergency: Kill switch for production issues
  5. Learn from failures: Update runbooks after each experiment
  6. Gradual rollout: Dev → staging → production
  7. Communicate: Announce experiments to team beforehand
  8. Measure everything: Latency, errors, saturation during experiment

Lessons Learned#

  1. Chaos finds real issues: 100% of production incidents were predictable from chaos experiments
  2. Culture is hard: Teams resist intentional failures initially
  3. Automation essential: Manual chaos experiments are skipped
  4. Start in dev: Production chaos requires mature monitoring
  5. Runbooks critical: Document recovery procedures during experiments
  6. Feedback loops: Use chaos results to prioritize reliability work

Chaos engineering transforms how we build systems. Instead of hoping for reliability, we verify it through controlled failure injection.

Further Reading#

  • Principles of Chaos Engineering
  • Chaos Engineering: System Resilience in Practice
  • Netflix Chaos Monkey
  • LitmusChaos
NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

chaos-engineeringresiliencetestingkubernetespython

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Nov 11, 2025•9 min read
Property-Based Testing in Finance: From Hypothesis to Production
Testingproperty-based-testinghypothesis
Dec 31, 2024•5 min read
Property-Based Testing for Financial Systems
Testingproperty-based-testinghypothesis
Dec 31, 2024•9 min read
Performance Regression Testing in CI/CD
Testingperformanceci-cd

Interested in working together?