NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

November 25, 2025
•
NordVarg Team
•

Mean Reversion Strategies: From Pairs Trading to Baskets

Generalalgorithmic-tradingmean-reversionpairs-tradingcointegrationkalman-filterstatistical-arbitragepython
18 min read
Share:

TL;DR#

Mean reversion is one of the most reliable sources of alpha in quantitative trading. This article covers production-grade implementation of mean reversion strategies:

  • Pairs trading: Cointegration testing, spread construction, and signal generation
  • Ornstein-Uhlenbeck process: Modeling mean reversion dynamics and optimal entry/exit
  • Kalman filtering: Dynamic hedge ratio estimation for pairs trading
  • Basket strategies: Extending pairs to multi-asset mean reversion
  • Production considerations: Transaction costs, slippage, and risk management

Key Insight: Mean reversion works because of fundamental forces (arbitrage, inventory management, market making) that push prices back to equilibrium.


Why Mean Reversion Works#

The Economic Foundation#

Mean reversion isn't just a statistical artifact—it's driven by real economic forces:

  1. Arbitrage: When two similar assets diverge, arbitrageurs step in
  2. Inventory Management: Market makers revert prices to manage inventory
  3. Fundamental Value: Prices oscillate around intrinsic value
  4. Market Microstructure: Bid-ask bounce creates short-term reversals

Example: If Coca-Cola and Pepsi stocks diverge (after controlling for market factors), arbitrageurs will buy the cheap one and sell the expensive one, pushing them back together.

When Mean Reversion Fails#

⚠️ Warning: Mean reversion can fail catastrophically when:

  • Structural breaks: Companies merge, split, or fundamentally change
  • Regime changes: Market conditions shift (e.g., 2008 crisis)
  • Liquidity crises: No arbitrageurs left to enforce mean reversion
  • Trending markets: Strong trends can persist longer than you can stay solvent

Risk Management: Always use stop-losses and position limits!


Pairs Trading: The Foundation#

Step 1: Finding Cointegrated Pairs#

Two price series P1(t)P_1(t)P1​(t) and P2(t)P_2(t)P2​(t) are cointegrated if there exists a linear combination that is stationary:

S(t)=P1(t)−βP2(t)S(t) = P_1(t) - \beta P_2(t)S(t)=P1​(t)−βP2​(t)

Where S(t)S(t)S(t) is stationary (mean-reverting).

Intuition: Even though both prices are non-stationary (random walks), their spread reverts to a mean.

python
1import numpy as np
2import pandas as pd
3from statsmodels.tsa.stattools import coint, adfuller
4from statsmodels.regression.linear_model import OLS
5import matplotlib.pyplot as plt
6from typing import Tuple, List, Optional
7
8class PairsFinder:
9    """
10    Find cointegrated pairs from a universe of stocks
11    
12    Uses Engle-Granger two-step method:
13    1. Regress P1 on P2 to find beta
14    2. Test if residuals are stationary (ADF test)
15    """
16    
17    def __init__(self, 
18                 significance_level: float = 0.05,
19                 lookback_days: int = 252):
20        """
21        Args:
22            significance_level: P-value threshold for cointegration
23            lookback_days: Historical window for testing
24        """
25        self.significance_level = significance_level
26        self.lookback_days = lookback_days
27    
28    def find_pairs(self, 
29                   prices: pd.DataFrame,
30                   min_correlation: float = 0.5) -> pd.DataFrame:
31        """
32        Find all cointegrated pairs in a universe
33        
34        Args:
35            prices: DataFrame with assets as columns, dates as index
36            min_correlation: Minimum correlation to consider (pre-filter)
37            
38        Returns:
39            DataFrame with cointegrated pairs and statistics
40        """
41        assets = prices.columns.tolist()
42        n_assets = len(assets)
43        
44        results = []
45        
46        # Test all pairs
47        for i in range(n_assets):
48            for j in range(i + 1, n_assets):
49                asset1 = assets[i]
50                asset2 = assets[j]
51                
52                p1 = prices[asset1].tail(self.lookback_days)
53                p2 = prices[asset2].tail(self.lookback_days)
54                
55                # Pre-filter: check correlation
56                corr = p1.corr(p2)
57                if abs(corr) < min_correlation:
58                    continue
59                
60                # Cointegration test
61                score, pvalue, beta = self._test_cointegration(p1, p2)
62                
63                if pvalue < self.significance_level:
64                    # Calculate additional statistics
65                    spread = p1 - beta * p2
66                    half_life = self._calculate_half_life(spread)
67                    
68                    results.append({
69                        'asset1': asset1,
70                        'asset2': asset2,
71                        'beta': beta,
72                        'pvalue': pvalue,
73                        'correlation': corr,
74                        'half_life_days': half_life,
75                        'spread_mean': spread.mean(),
76                        'spread_std': spread.std()
77                    })
78        
79        if not results:
80            return pd.DataFrame()
81        
82        df_results = pd.DataFrame(results)
83        
84        # Sort by p-value (most significant first)
85        df_results = df_results.sort_values('pvalue')
86        
87        return df_results
88    
89    def _test_cointegration(self, 
90                           p1: pd.Series, 
91                           p2: pd.Series) -> Tuple[float, float, float]:
92        """
93        Test cointegration using Engle-Granger method
94        
95        Returns:
96            Tuple of (test_statistic, p_value, beta)
97        """
98        # Step 1: Regress P1 on P2
99        model = OLS(p1, p2).fit()
100        beta = model.params[0]
101        
102        # Step 2: Test if residuals are stationary
103        residuals = p1 - beta * p2
104        adf_result = adfuller(residuals, maxlag=1)
105        
106        test_stat = adf_result[0]
107        pvalue = adf_result[1]
108        
109        return test_stat, pvalue, beta
110    
111    def _calculate_half_life(self, spread: pd.Series) -> float:
112        """
113        Calculate half-life of mean reversion using AR(1) model
114        
115        Spread follows: dS = -λ(S - μ)dt + σdW
116        Half-life = ln(2) / λ
117        """
118        # Fit AR(1): S(t) = a + b*S(t-1) + ε
119        spread_lag = spread.shift(1).dropna()
120        spread_curr = spread[1:]
121        
122        model = OLS(spread_curr, spread_lag).fit()
123        b = model.params[0]
124        
125        # λ = -ln(b)
126        if b >= 1 or b <= 0:
127            return np.inf  # Not mean-reverting
128        
129        lambda_param = -np.log(b)
130        half_life = np.log(2) / lambda_param
131        
132        return half_life
133
134
135# Example: Find pairs in a universe
136if __name__ == "__main__":
137    # Generate sample price data
138    np.random.seed(42)
139    dates = pd.date_range(end='2025-11-25', periods=500, freq='D')
140    
141    # Create cointegrated pair: Stock A and Stock B
142    common_factor = np.cumsum(np.random.randn(500)) * 0.5
143    stock_a = 100 + common_factor + np.random.randn(500) * 2
144    stock_b = 50 + 0.5 * common_factor + np.random.randn(500) * 1
145    
146    # Add non-cointegrated stock
147    stock_c = 75 + np.cumsum(np.random.randn(500)) * 0.3
148    
149    prices = pd.DataFrame({
150        'STOCK_A': stock_a,
151        'STOCK_B': stock_b,
152        'STOCK_C': stock_c
153    }, index=dates)
154    
155    # Find pairs
156    finder = PairsFinder(significance_level=0.05)
157    pairs = finder.find_pairs(prices, min_correlation=0.3)
158    
159    print("Cointegrated Pairs Found:")
160    print("=" * 80)
161    print(pairs.to_string(index=False))
162

Ornstein-Uhlenbeck Process for Mean Reversion#

The Model#

The spread follows an Ornstein-Uhlenbeck (OU) process:

dSt=θ(μ−St)dt+σdWtdS_t = \theta(\mu - S_t)dt + \sigma dW_tdSt​=θ(μ−St​)dt+σdWt​

Where:

  • θ\thetaθ = speed of mean reversion
  • μ\muμ = long-term mean
  • σ\sigmaσ = volatility
  • WtW_tWt​ = Wiener process

Key Properties:

  • Half-life: t1/2=ln⁡(2)θt_{1/2} = \frac{\ln(2)}{\theta}t1/2​=θln(2)​
  • Equilibrium distribution: St∼N(μ,σ22θ)S_t \sim \mathcal{N}(\mu, \frac{\sigma^2}{2\theta})St​∼N(μ,2θσ2​)

Parameter Estimation#

python
1from scipy.optimize import minimize
2
3class OrnsteinUhlenbeckModel:
4    """
5    Ornstein-Uhlenbeck process for mean reversion modeling
6    
7    Estimates parameters and generates trading signals
8    """
9    
10    def __init__(self):
11        self.theta = None  # Mean reversion speed
12        self.mu = None     # Long-term mean
13        self.sigma = None  # Volatility
14        self.half_life = None
15    
16    def fit(self, spread: pd.Series) -> dict:
17        """
18        Estimate OU parameters using maximum likelihood
19        
20        Args:
21            spread: Time series of spread values
22            
23        Returns:
24            Dict with estimated parameters
25        """
26        # Discrete-time OU: S(t+1) = S(t) + θ(μ - S(t))Δt + σ√Δt * ε
27        # Rearrange: ΔS = θμΔt - θS(t)Δt + σ√Δt * ε
28        
29        delta_s = spread.diff().dropna()
30        s_lag = spread.shift(1).dropna()
31        
32        # Align series
33        delta_s = delta_s[s_lag.index]
34        
35        # Assume Δt = 1 (daily data)
36        dt = 1.0
37        
38        # OLS regression: ΔS = a + b*S(t-1) + ε
39        # where a = θμΔt, b = -θΔt
40        model = OLS(delta_s, pd.DataFrame({'const': 1, 's_lag': s_lag})).fit()
41        
42        a = model.params['const']
43        b = model.params['s_lag']
44        
45        # Estimate parameters
46        self.theta = -b / dt
47        self.mu = a / (self.theta * dt) if self.theta != 0 else spread.mean()
48        
49        # Estimate sigma from residuals
50        residuals = model.resid
51        self.sigma = residuals.std() / np.sqrt(dt)
52        
53        # Calculate half-life
54        if self.theta > 0:
55            self.half_life = np.log(2) / self.theta
56        else:
57            self.half_life = np.inf
58        
59        # Equilibrium std dev
60        eq_std = self.sigma / np.sqrt(2 * self.theta) if self.theta > 0 else np.inf
61        
62        return {
63            'theta': self.theta,
64            'mu': self.mu,
65            'sigma': self.sigma,
66            'half_life': self.half_life,
67            'equilibrium_std': eq_std,
68            'r_squared': model.rsquared
69        }
70    
71    def generate_signals(self,
72                        spread: pd.Series,
73                        entry_threshold: float = 2.0,
74                        exit_threshold: float = 0.5) -> pd.DataFrame:
75        """
76        Generate trading signals based on OU model
77        
78        Args:
79            spread: Current spread values
80            entry_threshold: Number of std devs for entry (e.g., 2.0)
81            exit_threshold: Number of std devs for exit (e.g., 0.5)
82            
83        Returns:
84            DataFrame with signals (-1: short spread, 0: flat, 1: long spread)
85        """
86        if self.theta is None:
87            raise ValueError("Model not fitted. Call fit() first.")
88        
89        # Calculate z-score: (S - μ) / σ_eq
90        eq_std = self.sigma / np.sqrt(2 * self.theta)
91        z_score = (spread - self.mu) / eq_std
92        
93        # Generate signals
94        signals = pd.Series(0, index=spread.index)
95        
96        # Long spread when z < -entry_threshold (spread too low)
97        signals[z_score < -entry_threshold] = 1
98        
99        # Short spread when z > entry_threshold (spread too high)
100        signals[z_score > entry_threshold] = -1
101        
102        # Exit when |z| < exit_threshold
103        signals[abs(z_score) < exit_threshold] = 0
104        
105        # Forward-fill to maintain positions
106        signals = signals.replace(0, np.nan).ffill().fillna(0)
107        
108        return pd.DataFrame({
109            'spread': spread,
110            'z_score': z_score,
111            'signal': signals,
112            'mu': self.mu,
113            'upper_band': self.mu + entry_threshold * eq_std,
114            'lower_band': self.mu - entry_threshold * eq_std
115        })
116    
117    def expected_return_time(self, 
118                            current_spread: float,
119                            target_spread: float) -> float:
120        """
121        Expected time for spread to reach target
122        
123        E[T] = (1/θ) * ln|(S_current - μ)/(S_target - μ)|
124        """
125        if self.theta is None or self.theta <= 0:
126            return np.inf
127        
128        numerator = abs(current_spread - self.mu)
129        denominator = abs(target_spread - self.mu)
130        
131        if denominator == 0:
132            return 0
133        
134        expected_time = (1 / self.theta) * np.log(numerator / denominator)
135        
136        return max(0, expected_time)
137
138
139# Example: OU model for pairs trading
140if __name__ == "__main__":
141    # Generate OU process
142    np.random.seed(42)
143    n_days = 500
144    dt = 1.0
145    
146    theta_true = 0.1
147    mu_true = 0.0
148    sigma_true = 1.0
149    
150    spread = np.zeros(n_days)
151    spread[0] = mu_true
152    
153    for t in range(1, n_days):
154        dW = np.random.randn() * np.sqrt(dt)
155        spread[t] = spread[t-1] + theta_true * (mu_true - spread[t-1]) * dt + sigma_true * dW
156    
157    spread_series = pd.Series(spread, index=pd.date_range(end='2025-11-25', periods=n_days, freq='D'))
158    
159    # Fit OU model
160    ou_model = OrnsteinUhlenbeckModel()
161    params = ou_model.fit(spread_series)
162    
163    print("OU Parameters:")
164    print(f"θ (speed): {params['theta']:.4f} (true: {theta_true})")
165    print(f"μ (mean): {params['mu']:.4f} (true: {mu_true})")
166    print(f"σ (vol): {params['sigma']:.4f} (true: {sigma_true})")
167    print(f"Half-life: {params['half_life']:.2f} days")
168    
169    # Generate signals
170    signals_df = ou_model.generate_signals(spread_series, entry_threshold=2.0)
171    
172    print(f"\nSignal distribution:")
173    print(signals_df['signal'].value_counts())
174

Kalman Filter for Dynamic Hedge Ratios#

The Problem with Static Beta#

In classic pairs trading, we use a fixed hedge ratio β\betaβ:

Spread=P1−βP2\text{Spread} = P_1 - \beta P_2Spread=P1​−βP2​

But β\betaβ changes over time! Companies' fundamentals evolve, correlations shift.

Solution: Use a Kalman filter to estimate β\betaβ dynamically.

Kalman Filter Formulation#

State equation (beta evolves as random walk):

βt=βt−1+wt,wt∼N(0,Q)\beta_t = \beta_{t-1} + w_t, \quad w_t \sim \mathcal{N}(0, Q)βt​=βt−1​+wt​,wt​∼N(0,Q)

Observation equation:

P1,t=βtP2,t+vt,vt∼N(0,R)P_{1,t} = \beta_t P_{2,t} + v_t, \quad v_t \sim \mathcal{N}(0, R)P1,t​=βt​P2,t​+vt​,vt​∼N(0,R)
python
1from filterpy.kalman import KalmanFilter
2
3class KalmanPairsTrading:
4    """
5    Pairs trading with Kalman filter for dynamic hedge ratio
6    
7    Estimates time-varying beta using Kalman filter
8    """
9    
10    def __init__(self,
11                 delta: float = 1e-4,
12                 vega: float = 1e-3):
13        """
14        Args:
15            delta: Process noise (how much beta can change per period)
16            vega: Observation noise (measurement error)
17        """
18        self.delta = delta  # Q (process noise)
19        self.vega = vega    # R (observation noise)
20        self.kf = None
21        self.betas = []
22        self.spreads = []
23    
24    def fit(self, 
25            p1: pd.Series, 
26            p2: pd.Series) -> pd.DataFrame:
27        """
28        Estimate dynamic hedge ratio using Kalman filter
29        
30        Args:
31            p1: Price series for asset 1
32            p2: Price series for asset 2
33            
34        Returns:
35            DataFrame with betas, spreads, and signals
36        """
37        # Initialize Kalman filter
38        # State: [beta]
39        # Observation: P1 = beta * P2 + noise
40        
41        self.kf = KalmanFilter(dim_x=1, dim_z=1)
42        
43        # Initial state: use OLS beta
44        initial_beta = (p1 * p2).sum() / (p2 * p2).sum()
45        self.kf.x = np.array([[initial_beta]])
46        
47        # Initial covariance
48        self.kf.P = np.array([[1.0]])
49        
50        # Process noise
51        self.kf.Q = np.array([[self.delta]])
52        
53        # Observation noise
54        self.kf.R = np.array([[self.vega]])
55        
56        # State transition: beta(t) = beta(t-1)
57        self.kf.F = np.array([[1.0]])
58        
59        # Observation model: P1 = beta * P2
60        # H will be updated each step (depends on P2)
61        
62        betas = []
63        spreads = []
64        spread_stds = []
65        
66        for i in range(len(p1)):
67            # Predict
68            self.kf.predict()
69            
70            # Update observation matrix H = [P2(t)]
71            self.kf.H = np.array([[p2.iloc[i]]])
72            
73            # Update with observation P1(t)
74            self.kf.update(np.array([[p1.iloc[i]]]))
75            
76            # Extract beta
77            beta = self.kf.x[0, 0]
78            betas.append(beta)
79            
80            # Calculate spread
81            spread = p1.iloc[i] - beta * p2.iloc[i]
82            spreads.append(spread)
83            
84            # Spread uncertainty
85            spread_std = np.sqrt(self.kf.P[0, 0] * p2.iloc[i]**2 + self.vega)
86            spread_stds.append(spread_std)
87        
88        results = pd.DataFrame({
89            'p1': p1.values,
90            'p2': p2.values,
91            'beta': betas,
92            'spread': spreads,
93            'spread_std': spread_stds
94        }, index=p1.index)
95        
96        # Calculate z-score
97        spread_series = pd.Series(spreads, index=p1.index)
98        rolling_mean = spread_series.rolling(window=20).mean()
99        rolling_std = spread_series.rolling(window=20).std()
100        
101        results['z_score'] = (spread_series - rolling_mean) / rolling_std
102        
103        return results
104    
105    def generate_signals(self,
106                        results: pd.DataFrame,
107                        entry_threshold: float = 2.0,
108                        exit_threshold: float = 0.5) -> pd.DataFrame:
109        """Generate trading signals from Kalman filter results"""
110        
111        z_score = results['z_score']
112        
113        signals = pd.Series(0, index=results.index)
114        
115        # Long spread when z < -entry_threshold
116        signals[z_score < -entry_threshold] = 1
117        
118        # Short spread when z > entry_threshold
119        signals[z_score > entry_threshold] = -1
120        
121        # Exit when |z| < exit_threshold
122        signals[abs(z_score) < exit_threshold] = 0
123        
124        # Forward-fill
125        signals = signals.replace(0, np.nan).ffill().fillna(0)
126        
127        results['signal'] = signals
128        
129        return results
130
131
132# Example: Kalman filter pairs trading
133if __name__ == "__main__":
134    # Generate pair with time-varying beta
135    np.random.seed(42)
136    n_days = 500
137    
138    # Beta changes over time
139    true_beta = 0.5 + 0.1 * np.sin(np.linspace(0, 4*np.pi, n_days))
140    
141    p2 = 100 + np.cumsum(np.random.randn(n_days) * 0.5)
142    p1 = true_beta * p2 + np.random.randn(n_days) * 2
143    
144    dates = pd.date_range(end='2025-11-25', periods=n_days, freq='D')
145    p1_series = pd.Series(p1, index=dates)
146    p2_series = pd.Series(p2, index=dates)
147    
148    # Fit Kalman filter
149    kf_pairs = KalmanPairsTrading(delta=1e-4, vega=1e-2)
150    results = kf_pairs.fit(p1_series, p2_series)
151    
152    # Generate signals
153    results = kf_pairs.generate_signals(results, entry_threshold=2.0)
154    
155    print("Kalman Filter Pairs Trading Results:")
156    print(f"Average beta: {results['beta'].mean():.4f}")
157    print(f"Beta std dev: {results['beta'].std():.4f}")
158    print(f"Number of trades: {results['signal'].diff().abs().sum() / 2:.0f}")
159    
160    # Plot
161    fig, axes = plt.subplots(3, 1, figsize=(12, 10))
162    
163    # Beta evolution
164    axes[0].plot(results.index, true_beta, label='True Beta', alpha=0.7)
165    axes[0].plot(results.index, results['beta'], label='Estimated Beta', alpha=0.7)
166    axes[0].set_title('Dynamic Hedge Ratio (Beta)')
167    axes[0].legend()
168    axes[0].grid(True)
169    
170    # Spread and z-score
171    axes[1].plot(results.index, results['spread'], label='Spread', alpha=0.7)
172    axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.3)
173    axes[1].set_title('Spread')
174    axes[1].legend()
175    axes[1].grid(True)
176    
177    # Signals
178    axes[2].plot(results.index, results['z_score'], label='Z-Score', alpha=0.7)
179    axes[2].axhline(y=2, color='red', linestyle='--', alpha=0.5, label='Entry Threshold')
180    axes[2].axhline(y=-2, color='red', linestyle='--', alpha=0.5)
181    axes[2].axhline(y=0, color='black', linestyle='-', alpha=0.3)
182    axes[2].fill_between(results.index, 0, results['signal'], alpha=0.3, label='Position')
183    axes[2].set_title('Z-Score and Signals')
184    axes[2].legend()
185    axes[2].grid(True)
186    
187    plt.tight_layout()
188    plt.savefig('/tmp/kalman_pairs_trading.png', dpi=150)
189    print("\nPlot saved to /tmp/kalman_pairs_trading.png")
190

Basket Mean Reversion#

Extending to Multiple Assets#

Instead of pairs, trade a basket of assets against a benchmark:

Spread=∑i=1NwiPi−β⋅Benchmark\text{Spread} = \sum_{i=1}^N w_i P_i - \beta \cdot \text{Benchmark}Spread=i=1∑N​wi​Pi​−β⋅Benchmark

Advantages:

  • More diversification
  • Reduced idiosyncratic risk
  • More stable spreads

Example: Trade a basket of tech stocks vs. QQQ

python
1from sklearn.linear_model import Ridge
2
3class BasketMeanReversion:
4    """
5    Mean reversion strategy for baskets of assets
6    
7    Constructs a basket that mean-reverts against a benchmark
8    """
9    
10    def __init__(self,
11                 lookback_days: int = 60,
12                 rebalance_days: int = 20,
13                 regularization: float = 0.1):
14        """
15        Args:
16            lookback_days: Window for cointegration testing
17            rebalance_days: How often to recompute weights
18            regularization: Ridge regression alpha (prevents overfitting)
19        """
20        self.lookback_days = lookback_days
21        self.rebalance_days = rebalance_days
22        self.regularization = regularization
23    
24    def find_basket(self,
25                   asset_prices: pd.DataFrame,
26                   benchmark_price: pd.Series) -> dict:
27        """
28        Find optimal basket weights that cointegrate with benchmark
29        
30        Args:
31            asset_prices: DataFrame with asset prices
32            benchmark_price: Benchmark price series
33            
34        Returns:
35            Dict with weights and statistics
36        """
37        # Use recent data
38        asset_prices_recent = asset_prices.tail(self.lookback_days)
39        benchmark_recent = benchmark_price.tail(self.lookback_days)
40        
41        # Regress benchmark on assets using Ridge regression
42        # This finds weights w such that: Benchmark ≈ w^T * Assets
43        model = Ridge(alpha=self.regularization)
44        model.fit(asset_prices_recent, benchmark_recent)
45        
46        weights = model.coef_
47        intercept = model.intercept_
48        
49        # Calculate spread
50        basket_value = asset_prices_recent @ weights
51        spread = benchmark_recent - basket_value
52        
53        # Test stationarity
54        adf_result = adfuller(spread, maxlag=1)
55        is_stationary = adf_result[1] < 0.05
56        
57        # Calculate half-life
58        spread_lag = spread.shift(1).dropna()
59        spread_curr = spread[1:]
60        ar_model = OLS(spread_curr, spread_lag).fit()
61        b = ar_model.params[0]
62        
63        if 0 < b < 1:
64            half_life = np.log(2) / (-np.log(b))
65        else:
66            half_life = np.inf
67        
68        return {
69            'weights': pd.Series(weights, index=asset_prices.columns),
70            'intercept': intercept,
71            'r_squared': model.score(asset_prices_recent, benchmark_recent),
72            'is_stationary': is_stationary,
73            'adf_pvalue': adf_result[1],
74            'half_life': half_life,
75            'spread_mean': spread.mean(),
76            'spread_std': spread.std()
77        }
78    
79    def backtest(self,
80                asset_prices: pd.DataFrame,
81                benchmark_price: pd.Series,
82                entry_z: float = 2.0,
83                exit_z: float = 0.5,
84                transaction_cost: float = 0.001) -> pd.DataFrame:
85        """
86        Backtest basket mean reversion strategy
87        
88        Args:
89            asset_prices: Asset prices
90            benchmark_price: Benchmark price
91            entry_z: Z-score threshold for entry
92            exit_z: Z-score threshold for exit
93            transaction_cost: Transaction cost (fraction of trade value)
94            
95        Returns:
96            DataFrame with positions, P&L, and statistics
97        """
98        results = []
99        current_weights = None
100        position = 0  # 0: flat, 1: long spread, -1: short spread
101        
102        for i in range(self.lookback_days, len(asset_prices), self.rebalance_days):
103            # Recompute basket weights
104            window_start = max(0, i - self.lookback_days)
105            window_end = i
106            
107            basket_result = self.find_basket(
108                asset_prices.iloc[window_start:window_end],
109                benchmark_price.iloc[window_start:window_end]
110            )
111            
112            current_weights = basket_result['weights']
113            spread_mean = basket_result['spread_mean']
114            spread_std = basket_result['spread_std']
115            
116            # Calculate spread for next rebalance_days
117            for j in range(i, min(i + self.rebalance_days, len(asset_prices))):
118                basket_value = asset_prices.iloc[j] @ current_weights
119                spread = benchmark_price.iloc[j] - basket_value
120                z_score = (spread - spread_mean) / spread_std if spread_std > 0 else 0
121                
122                # Generate signal
123                new_position = position
124                
125                if position == 0:
126                    if z_score > entry_z:
127                        new_position = -1  # Short spread (short benchmark, long basket)
128                    elif z_score < -entry_z:
129                        new_position = 1   # Long spread (long benchmark, short basket)
130                else:
131                    if abs(z_score) < exit_z:
132                        new_position = 0  # Exit
133                
134                # Calculate P&L
135                if j > 0:
136                    spread_change = spread - results[-1]['spread']
137                    pnl = position * spread_change
138                    
139                    # Transaction costs
140                    if new_position != position:
141                        tc = transaction_cost * abs(new_position - position)
142                        pnl -= tc
143                    else:
144                        tc = 0
145                else:
146                    pnl = 0
147                    tc = 0
148                
149                position = new_position
150                
151                results.append({
152                    'date': asset_prices.index[j],
153                    'spread': spread,
154                    'z_score': z_score,
155                    'position': position,
156                    'pnl': pnl,
157                    'transaction_cost': tc
158                })
159        
160        df_results = pd.DataFrame(results)
161        df_results['cumulative_pnl'] = df_results['pnl'].cumsum()
162        
163        return df_results
164
165
166# Example: Basket mean reversion
167if __name__ == "__main__":
168    # Generate sample data: 5 assets + benchmark
169    np.random.seed(42)
170    n_days = 500
171    n_assets = 5
172    
173    # Common factor (market)
174    market_factor = np.cumsum(np.random.randn(n_days)) * 0.5
175    
176    # Assets follow market + idiosyncratic noise
177    asset_prices = pd.DataFrame({
178        f'Asset_{i}': 100 + 0.8 * market_factor + np.cumsum(np.random.randn(n_days) * 0.2)
179        for i in range(n_assets)
180    }, index=pd.date_range(end='2025-11-25', periods=n_days, freq='D'))
181    
182    # Benchmark is pure market factor
183    benchmark_price = pd.Series(
184        100 + market_factor + np.random.randn(n_days) * 0.1,
185        index=asset_prices.index
186    )
187    
188    # Find basket
189    basket_strategy = BasketMeanReversion(lookback_days=60, rebalance_days=20)
190    basket_result = basket_strategy.find_basket(asset_prices, benchmark_price)
191    
192    print("Basket Weights:")
193    print(basket_result['weights'])
194    print(f"\nR-squared: {basket_result['r_squared']:.4f}")
195    print(f"Is stationary: {basket_result['is_stationary']}")
196    print(f"Half-life: {basket_result['half_life']:.2f} days")
197    
198    # Backtest
199    backtest_results = basket_strategy.backtest(
200        asset_prices, benchmark_price,
201        entry_z=2.0, exit_z=0.5,
202        transaction_cost=0.001
203    )
204    
205    print(f"\nBacktest Results:")
206    print(f"Total P&L: ${backtest_results['cumulative_pnl'].iloc[-1]:.2f}")
207    print(f"Sharpe Ratio: {backtest_results['pnl'].mean() / backtest_results['pnl'].std() * np.sqrt(252):.2f}")
208    print(f"Number of trades: {backtest_results['position'].diff().abs().sum() / 2:.0f}")
209

Production Considerations#

Transaction Costs and Slippage#

Mean reversion strategies trade frequently—transaction costs matter!

python
1class TransactionCostModel:
2    """
3    Realistic transaction cost model
4    
5    Includes:
6    - Bid-ask spread
7    - Market impact
8    - Commissions
9    """
10    
11    def __init__(self,
12                 bid_ask_spread_bps: float = 5.0,
13                 market_impact_coef: float = 0.1,
14                 commission_bps: float = 1.0):
15        """
16        Args:
17            bid_ask_spread_bps: Bid-ask spread in basis points
18            market_impact_coef: Market impact coefficient (bps per $1M traded)
19            commission_bps: Commission in basis points
20        """
21        self.bid_ask_spread_bps = bid_ask_spread_bps
22        self.market_impact_coef = market_impact_coef
23        self.commission_bps = commission_bps
24    
25    def calculate_cost(self,
26                      trade_value: float,
27                      adv: float = 10_000_000) -> float:
28        """
29        Calculate total transaction cost
30        
31        Args:
32            trade_value: Dollar value of trade
33            adv: Average daily volume (for market impact)
34            
35        Returns:
36            Total cost in dollars
37        """
38        # Bid-ask spread cost
39        spread_cost = abs(trade_value) * self.bid_ask_spread_bps / 10000
40        
41        # Market impact (proportional to trade size / ADV)
42        participation_rate = abs(trade_value) / adv
43        impact_bps = self.market_impact_coef * participation_rate * 10000
44        impact_cost = abs(trade_value) * impact_bps / 10000
45        
46        # Commission
47        commission = abs(trade_value) * self.commission_bps / 10000
48        
49        total_cost = spread_cost + impact_cost + commission
50        
51        return total_cost
52

Risk Management#

python
1class PairsRiskManager:
2    """
3    Risk management for pairs trading
4    
5    Implements:
6    - Position limits
7    - Stop-loss
8    - Correlation monitoring
9    - Drawdown limits
10    """
11    
12    def __init__(self,
13                 max_position_size: float = 1_000_000,
14                 stop_loss_z: float = 4.0,
15                 max_drawdown_pct: float = 0.10,
16                 min_correlation: float = 0.5):
17        self.max_position_size = max_position_size
18        self.stop_loss_z = stop_loss_z
19        self.max_drawdown_pct = max_drawdown_pct
20        self.min_correlation = min_correlation
21        self.peak_equity = 0
22    
23    def check_position_limit(self, position_value: float) -> bool:
24        """Check if position exceeds limit"""
25        return abs(position_value) <= self.max_position_size
26    
27    def check_stop_loss(self, z_score: float, position: int) -> bool:
28        """
29        Check if stop-loss is triggered
30        
31        Returns True if position should be closed
32        """
33        # If long spread and z-score goes even more negative (wrong direction)
34        if position > 0 and z_score < -self.stop_loss_z:
35            return True
36        
37        # If short spread and z-score goes even more positive
38        if position < 0 and z_score > self.stop_loss_z:
39            return True
40        
41        return False
42    
43    def check_correlation(self, p1: pd.Series, p2: pd.Series) -> bool:
44        """Check if correlation is still sufficient"""
45        recent_corr = p1.tail(20).corr(p2.tail(20))
46        return abs(recent_corr) >= self.min_correlation
47    
48    def check_drawdown(self, current_equity: float) -> bool:
49        """
50        Check if drawdown limit is exceeded
51        
52        Returns True if trading should stop
53        """
54        self.peak_equity = max(self.peak_equity, current_equity)
55        
56        if self.peak_equity == 0:
57            return False
58        
59        drawdown = (self.peak_equity - current_equity) / self.peak_equity
60        
61        return drawdown >= self.max_drawdown_pct
62

Production Deployment Checklist#

Data Quality#

  • Handle corporate actions (splits, dividends, mergers)
  • Adjust for survivorship bias
  • Validate price data (check for errors, gaps)
  • Use adjusted prices (split-adjusted, dividend-adjusted)

Strategy Validation#

  • Backtest over multiple time periods (including crises)
  • Test on out-of-sample data
  • Validate cointegration stability
  • Check half-life consistency

Risk Management#

  • Implement position limits
  • Set stop-loss levels
  • Monitor correlation breakdown
  • Track maximum drawdown

Execution#

  • Model transaction costs realistically
  • Account for slippage
  • Implement smart order routing
  • Monitor execution quality

Monitoring#

  • Track spread z-score in real-time
  • Alert on correlation breakdown
  • Monitor P&L attribution
  • Track half-life changes

Conclusion#

Mean reversion strategies are powerful but require careful implementation:

  1. Cointegration is key: Always test for cointegration, not just correlation
  2. Dynamic hedge ratios: Use Kalman filters for time-varying relationships
  3. Half-life matters: Strategies with half-life < 20 days work best
  4. Transaction costs kill: Model costs realistically and trade less frequently
  5. Risk management is critical: Use stop-losses and correlation monitoring

Next Steps:

  • Implement multi-asset basket strategies
  • Add machine learning for regime detection
  • Build real-time monitoring dashboard
  • Integrate with execution management system

References#

  1. Vidyamurthy, G. (2004). Pairs Trading: Quantitative Methods and Analysis. Wiley.
  2. Pole, A. (2007). Statistical Arbitrage: Algorithmic Trading Insights and Techniques. Wiley.
  3. Engle, R. F., & Granger, C. W. (1987). "Co-integration and error correction". Econometrica, 251-276.
  4. Kalman, R. E. (1960). "A new approach to linear filtering and prediction problems". Journal of Basic Engineering, 82(1), 35-45.
  5. Avellaneda, M., & Lee, J. H. (2010). "Statistical arbitrage in the US equities market". Quantitative Finance, 10(7), 761-782.

About the Author: This article is part of NordVarg's series on production-grade algorithmic trading. For related content, see our articles on statistical arbitrage, market making, and portfolio optimization.

NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

algorithmic-tradingmean-reversionpairs-tradingcointegrationkalman-filter

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Nov 25, 2025•14 min read
News-Based Trading with NLP and LLMs
Generalalgorithmic-tradingnlp
Dec 20, 2024•12 min read
Pairs Trading: Statistical Arbitrage at Scale
Quantitative Financepairs-tradingstatistical-arbitrage
Nov 25, 2025•17 min read
Stress Testing and Scenario Analysis for Portfolios
Generalrisk-managementstress-testing

Interested in working together?