Mean reversion is one of the most reliable sources of alpha in quantitative trading. This article covers production-grade implementation of mean reversion strategies:
Key Insight: Mean reversion works because of fundamental forces (arbitrage, inventory management, market making) that push prices back to equilibrium.
Mean reversion isn't just a statistical artifact—it's driven by real economic forces:
Example: If Coca-Cola and Pepsi stocks diverge (after controlling for market factors), arbitrageurs will buy the cheap one and sell the expensive one, pushing them back together.
⚠️ Warning: Mean reversion can fail catastrophically when:
Risk Management: Always use stop-losses and position limits!
Two price series and are cointegrated if there exists a linear combination that is stationary:
Where is stationary (mean-reverting).
Intuition: Even though both prices are non-stationary (random walks), their spread reverts to a mean.
1import numpy as np
2import pandas as pd
3from statsmodels.tsa.stattools import coint, adfuller
4from statsmodels.regression.linear_model import OLS
5import matplotlib.pyplot as plt
6from typing import Tuple, List, Optional
7
8class PairsFinder:
9 """
10 Find cointegrated pairs from a universe of stocks
11
12 Uses Engle-Granger two-step method:
13 1. Regress P1 on P2 to find beta
14 2. Test if residuals are stationary (ADF test)
15 """
16
17 def __init__(self,
18 significance_level: float = 0.05,
19 lookback_days: int = 252):
20 """
21 Args:
22 significance_level: P-value threshold for cointegration
23 lookback_days: Historical window for testing
24 """
25 self.significance_level = significance_level
26 self.lookback_days = lookback_days
27
28 def find_pairs(self,
29 prices: pd.DataFrame,
30 min_correlation: float = 0.5) -> pd.DataFrame:
31 """
32 Find all cointegrated pairs in a universe
33
34 Args:
35 prices: DataFrame with assets as columns, dates as index
36 min_correlation: Minimum correlation to consider (pre-filter)
37
38 Returns:
39 DataFrame with cointegrated pairs and statistics
40 """
41 assets = prices.columns.tolist()
42 n_assets = len(assets)
43
44 results = []
45
46 # Test all pairs
47 for i in range(n_assets):
48 for j in range(i + 1, n_assets):
49 asset1 = assets[i]
50 asset2 = assets[j]
51
52 p1 = prices[asset1].tail(self.lookback_days)
53 p2 = prices[asset2].tail(self.lookback_days)
54
55 # Pre-filter: check correlation
56 corr = p1.corr(p2)
57 if abs(corr) < min_correlation:
58 continue
59
60 # Cointegration test
61 score, pvalue, beta = self._test_cointegration(p1, p2)
62
63 if pvalue < self.significance_level:
64 # Calculate additional statistics
65 spread = p1 - beta * p2
66 half_life = self._calculate_half_life(spread)
67
68 results.append({
69 'asset1': asset1,
70 'asset2': asset2,
71 'beta': beta,
72 'pvalue': pvalue,
73 'correlation': corr,
74 'half_life_days': half_life,
75 'spread_mean': spread.mean(),
76 'spread_std': spread.std()
77 })
78
79 if not results:
80 return pd.DataFrame()
81
82 df_results = pd.DataFrame(results)
83
84 # Sort by p-value (most significant first)
85 df_results = df_results.sort_values('pvalue')
86
87 return df_results
88
89 def _test_cointegration(self,
90 p1: pd.Series,
91 p2: pd.Series) -> Tuple[float, float, float]:
92 """
93 Test cointegration using Engle-Granger method
94
95 Returns:
96 Tuple of (test_statistic, p_value, beta)
97 """
98 # Step 1: Regress P1 on P2
99 model = OLS(p1, p2).fit()
100 beta = model.params[0]
101
102 # Step 2: Test if residuals are stationary
103 residuals = p1 - beta * p2
104 adf_result = adfuller(residuals, maxlag=1)
105
106 test_stat = adf_result[0]
107 pvalue = adf_result[1]
108
109 return test_stat, pvalue, beta
110
111 def _calculate_half_life(self, spread: pd.Series) -> float:
112 """
113 Calculate half-life of mean reversion using AR(1) model
114
115 Spread follows: dS = -λ(S - μ)dt + σdW
116 Half-life = ln(2) / λ
117 """
118 # Fit AR(1): S(t) = a + b*S(t-1) + ε
119 spread_lag = spread.shift(1).dropna()
120 spread_curr = spread[1:]
121
122 model = OLS(spread_curr, spread_lag).fit()
123 b = model.params[0]
124
125 # λ = -ln(b)
126 if b >= 1 or b <= 0:
127 return np.inf # Not mean-reverting
128
129 lambda_param = -np.log(b)
130 half_life = np.log(2) / lambda_param
131
132 return half_life
133
134
135# Example: Find pairs in a universe
136if __name__ == "__main__":
137 # Generate sample price data
138 np.random.seed(42)
139 dates = pd.date_range(end='2025-11-25', periods=500, freq='D')
140
141 # Create cointegrated pair: Stock A and Stock B
142 common_factor = np.cumsum(np.random.randn(500)) * 0.5
143 stock_a = 100 + common_factor + np.random.randn(500) * 2
144 stock_b = 50 + 0.5 * common_factor + np.random.randn(500) * 1
145
146 # Add non-cointegrated stock
147 stock_c = 75 + np.cumsum(np.random.randn(500)) * 0.3
148
149 prices = pd.DataFrame({
150 'STOCK_A': stock_a,
151 'STOCK_B': stock_b,
152 'STOCK_C': stock_c
153 }, index=dates)
154
155 # Find pairs
156 finder = PairsFinder(significance_level=0.05)
157 pairs = finder.find_pairs(prices, min_correlation=0.3)
158
159 print("Cointegrated Pairs Found:")
160 print("=" * 80)
161 print(pairs.to_string(index=False))
162The spread follows an Ornstein-Uhlenbeck (OU) process:
Where:
Key Properties:
1from scipy.optimize import minimize
2
3class OrnsteinUhlenbeckModel:
4 """
5 Ornstein-Uhlenbeck process for mean reversion modeling
6
7 Estimates parameters and generates trading signals
8 """
9
10 def __init__(self):
11 self.theta = None # Mean reversion speed
12 self.mu = None # Long-term mean
13 self.sigma = None # Volatility
14 self.half_life = None
15
16 def fit(self, spread: pd.Series) -> dict:
17 """
18 Estimate OU parameters using maximum likelihood
19
20 Args:
21 spread: Time series of spread values
22
23 Returns:
24 Dict with estimated parameters
25 """
26 # Discrete-time OU: S(t+1) = S(t) + θ(μ - S(t))Δt + σ√Δt * ε
27 # Rearrange: ΔS = θμΔt - θS(t)Δt + σ√Δt * ε
28
29 delta_s = spread.diff().dropna()
30 s_lag = spread.shift(1).dropna()
31
32 # Align series
33 delta_s = delta_s[s_lag.index]
34
35 # Assume Δt = 1 (daily data)
36 dt = 1.0
37
38 # OLS regression: ΔS = a + b*S(t-1) + ε
39 # where a = θμΔt, b = -θΔt
40 model = OLS(delta_s, pd.DataFrame({'const': 1, 's_lag': s_lag})).fit()
41
42 a = model.params['const']
43 b = model.params['s_lag']
44
45 # Estimate parameters
46 self.theta = -b / dt
47 self.mu = a / (self.theta * dt) if self.theta != 0 else spread.mean()
48
49 # Estimate sigma from residuals
50 residuals = model.resid
51 self.sigma = residuals.std() / np.sqrt(dt)
52
53 # Calculate half-life
54 if self.theta > 0:
55 self.half_life = np.log(2) / self.theta
56 else:
57 self.half_life = np.inf
58
59 # Equilibrium std dev
60 eq_std = self.sigma / np.sqrt(2 * self.theta) if self.theta > 0 else np.inf
61
62 return {
63 'theta': self.theta,
64 'mu': self.mu,
65 'sigma': self.sigma,
66 'half_life': self.half_life,
67 'equilibrium_std': eq_std,
68 'r_squared': model.rsquared
69 }
70
71 def generate_signals(self,
72 spread: pd.Series,
73 entry_threshold: float = 2.0,
74 exit_threshold: float = 0.5) -> pd.DataFrame:
75 """
76 Generate trading signals based on OU model
77
78 Args:
79 spread: Current spread values
80 entry_threshold: Number of std devs for entry (e.g., 2.0)
81 exit_threshold: Number of std devs for exit (e.g., 0.5)
82
83 Returns:
84 DataFrame with signals (-1: short spread, 0: flat, 1: long spread)
85 """
86 if self.theta is None:
87 raise ValueError("Model not fitted. Call fit() first.")
88
89 # Calculate z-score: (S - μ) / σ_eq
90 eq_std = self.sigma / np.sqrt(2 * self.theta)
91 z_score = (spread - self.mu) / eq_std
92
93 # Generate signals
94 signals = pd.Series(0, index=spread.index)
95
96 # Long spread when z < -entry_threshold (spread too low)
97 signals[z_score < -entry_threshold] = 1
98
99 # Short spread when z > entry_threshold (spread too high)
100 signals[z_score > entry_threshold] = -1
101
102 # Exit when |z| < exit_threshold
103 signals[abs(z_score) < exit_threshold] = 0
104
105 # Forward-fill to maintain positions
106 signals = signals.replace(0, np.nan).ffill().fillna(0)
107
108 return pd.DataFrame({
109 'spread': spread,
110 'z_score': z_score,
111 'signal': signals,
112 'mu': self.mu,
113 'upper_band': self.mu + entry_threshold * eq_std,
114 'lower_band': self.mu - entry_threshold * eq_std
115 })
116
117 def expected_return_time(self,
118 current_spread: float,
119 target_spread: float) -> float:
120 """
121 Expected time for spread to reach target
122
123 E[T] = (1/θ) * ln|(S_current - μ)/(S_target - μ)|
124 """
125 if self.theta is None or self.theta <= 0:
126 return np.inf
127
128 numerator = abs(current_spread - self.mu)
129 denominator = abs(target_spread - self.mu)
130
131 if denominator == 0:
132 return 0
133
134 expected_time = (1 / self.theta) * np.log(numerator / denominator)
135
136 return max(0, expected_time)
137
138
139# Example: OU model for pairs trading
140if __name__ == "__main__":
141 # Generate OU process
142 np.random.seed(42)
143 n_days = 500
144 dt = 1.0
145
146 theta_true = 0.1
147 mu_true = 0.0
148 sigma_true = 1.0
149
150 spread = np.zeros(n_days)
151 spread[0] = mu_true
152
153 for t in range(1, n_days):
154 dW = np.random.randn() * np.sqrt(dt)
155 spread[t] = spread[t-1] + theta_true * (mu_true - spread[t-1]) * dt + sigma_true * dW
156
157 spread_series = pd.Series(spread, index=pd.date_range(end='2025-11-25', periods=n_days, freq='D'))
158
159 # Fit OU model
160 ou_model = OrnsteinUhlenbeckModel()
161 params = ou_model.fit(spread_series)
162
163 print("OU Parameters:")
164 print(f"θ (speed): {params['theta']:.4f} (true: {theta_true})")
165 print(f"μ (mean): {params['mu']:.4f} (true: {mu_true})")
166 print(f"σ (vol): {params['sigma']:.4f} (true: {sigma_true})")
167 print(f"Half-life: {params['half_life']:.2f} days")
168
169 # Generate signals
170 signals_df = ou_model.generate_signals(spread_series, entry_threshold=2.0)
171
172 print(f"\nSignal distribution:")
173 print(signals_df['signal'].value_counts())
174In classic pairs trading, we use a fixed hedge ratio :
But changes over time! Companies' fundamentals evolve, correlations shift.
Solution: Use a Kalman filter to estimate dynamically.
State equation (beta evolves as random walk):
Observation equation:
1from filterpy.kalman import KalmanFilter
2
3class KalmanPairsTrading:
4 """
5 Pairs trading with Kalman filter for dynamic hedge ratio
6
7 Estimates time-varying beta using Kalman filter
8 """
9
10 def __init__(self,
11 delta: float = 1e-4,
12 vega: float = 1e-3):
13 """
14 Args:
15 delta: Process noise (how much beta can change per period)
16 vega: Observation noise (measurement error)
17 """
18 self.delta = delta # Q (process noise)
19 self.vega = vega # R (observation noise)
20 self.kf = None
21 self.betas = []
22 self.spreads = []
23
24 def fit(self,
25 p1: pd.Series,
26 p2: pd.Series) -> pd.DataFrame:
27 """
28 Estimate dynamic hedge ratio using Kalman filter
29
30 Args:
31 p1: Price series for asset 1
32 p2: Price series for asset 2
33
34 Returns:
35 DataFrame with betas, spreads, and signals
36 """
37 # Initialize Kalman filter
38 # State: [beta]
39 # Observation: P1 = beta * P2 + noise
40
41 self.kf = KalmanFilter(dim_x=1, dim_z=1)
42
43 # Initial state: use OLS beta
44 initial_beta = (p1 * p2).sum() / (p2 * p2).sum()
45 self.kf.x = np.array([[initial_beta]])
46
47 # Initial covariance
48 self.kf.P = np.array([[1.0]])
49
50 # Process noise
51 self.kf.Q = np.array([[self.delta]])
52
53 # Observation noise
54 self.kf.R = np.array([[self.vega]])
55
56 # State transition: beta(t) = beta(t-1)
57 self.kf.F = np.array([[1.0]])
58
59 # Observation model: P1 = beta * P2
60 # H will be updated each step (depends on P2)
61
62 betas = []
63 spreads = []
64 spread_stds = []
65
66 for i in range(len(p1)):
67 # Predict
68 self.kf.predict()
69
70 # Update observation matrix H = [P2(t)]
71 self.kf.H = np.array([[p2.iloc[i]]])
72
73 # Update with observation P1(t)
74 self.kf.update(np.array([[p1.iloc[i]]]))
75
76 # Extract beta
77 beta = self.kf.x[0, 0]
78 betas.append(beta)
79
80 # Calculate spread
81 spread = p1.iloc[i] - beta * p2.iloc[i]
82 spreads.append(spread)
83
84 # Spread uncertainty
85 spread_std = np.sqrt(self.kf.P[0, 0] * p2.iloc[i]**2 + self.vega)
86 spread_stds.append(spread_std)
87
88 results = pd.DataFrame({
89 'p1': p1.values,
90 'p2': p2.values,
91 'beta': betas,
92 'spread': spreads,
93 'spread_std': spread_stds
94 }, index=p1.index)
95
96 # Calculate z-score
97 spread_series = pd.Series(spreads, index=p1.index)
98 rolling_mean = spread_series.rolling(window=20).mean()
99 rolling_std = spread_series.rolling(window=20).std()
100
101 results['z_score'] = (spread_series - rolling_mean) / rolling_std
102
103 return results
104
105 def generate_signals(self,
106 results: pd.DataFrame,
107 entry_threshold: float = 2.0,
108 exit_threshold: float = 0.5) -> pd.DataFrame:
109 """Generate trading signals from Kalman filter results"""
110
111 z_score = results['z_score']
112
113 signals = pd.Series(0, index=results.index)
114
115 # Long spread when z < -entry_threshold
116 signals[z_score < -entry_threshold] = 1
117
118 # Short spread when z > entry_threshold
119 signals[z_score > entry_threshold] = -1
120
121 # Exit when |z| < exit_threshold
122 signals[abs(z_score) < exit_threshold] = 0
123
124 # Forward-fill
125 signals = signals.replace(0, np.nan).ffill().fillna(0)
126
127 results['signal'] = signals
128
129 return results
130
131
132# Example: Kalman filter pairs trading
133if __name__ == "__main__":
134 # Generate pair with time-varying beta
135 np.random.seed(42)
136 n_days = 500
137
138 # Beta changes over time
139 true_beta = 0.5 + 0.1 * np.sin(np.linspace(0, 4*np.pi, n_days))
140
141 p2 = 100 + np.cumsum(np.random.randn(n_days) * 0.5)
142 p1 = true_beta * p2 + np.random.randn(n_days) * 2
143
144 dates = pd.date_range(end='2025-11-25', periods=n_days, freq='D')
145 p1_series = pd.Series(p1, index=dates)
146 p2_series = pd.Series(p2, index=dates)
147
148 # Fit Kalman filter
149 kf_pairs = KalmanPairsTrading(delta=1e-4, vega=1e-2)
150 results = kf_pairs.fit(p1_series, p2_series)
151
152 # Generate signals
153 results = kf_pairs.generate_signals(results, entry_threshold=2.0)
154
155 print("Kalman Filter Pairs Trading Results:")
156 print(f"Average beta: {results['beta'].mean():.4f}")
157 print(f"Beta std dev: {results['beta'].std():.4f}")
158 print(f"Number of trades: {results['signal'].diff().abs().sum() / 2:.0f}")
159
160 # Plot
161 fig, axes = plt.subplots(3, 1, figsize=(12, 10))
162
163 # Beta evolution
164 axes[0].plot(results.index, true_beta, label='True Beta', alpha=0.7)
165 axes[0].plot(results.index, results['beta'], label='Estimated Beta', alpha=0.7)
166 axes[0].set_title('Dynamic Hedge Ratio (Beta)')
167 axes[0].legend()
168 axes[0].grid(True)
169
170 # Spread and z-score
171 axes[1].plot(results.index, results['spread'], label='Spread', alpha=0.7)
172 axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.3)
173 axes[1].set_title('Spread')
174 axes[1].legend()
175 axes[1].grid(True)
176
177 # Signals
178 axes[2].plot(results.index, results['z_score'], label='Z-Score', alpha=0.7)
179 axes[2].axhline(y=2, color='red', linestyle='--', alpha=0.5, label='Entry Threshold')
180 axes[2].axhline(y=-2, color='red', linestyle='--', alpha=0.5)
181 axes[2].axhline(y=0, color='black', linestyle='-', alpha=0.3)
182 axes[2].fill_between(results.index, 0, results['signal'], alpha=0.3, label='Position')
183 axes[2].set_title('Z-Score and Signals')
184 axes[2].legend()
185 axes[2].grid(True)
186
187 plt.tight_layout()
188 plt.savefig('/tmp/kalman_pairs_trading.png', dpi=150)
189 print("\nPlot saved to /tmp/kalman_pairs_trading.png")
190Instead of pairs, trade a basket of assets against a benchmark:
Advantages:
Example: Trade a basket of tech stocks vs. QQQ
1from sklearn.linear_model import Ridge
2
3class BasketMeanReversion:
4 """
5 Mean reversion strategy for baskets of assets
6
7 Constructs a basket that mean-reverts against a benchmark
8 """
9
10 def __init__(self,
11 lookback_days: int = 60,
12 rebalance_days: int = 20,
13 regularization: float = 0.1):
14 """
15 Args:
16 lookback_days: Window for cointegration testing
17 rebalance_days: How often to recompute weights
18 regularization: Ridge regression alpha (prevents overfitting)
19 """
20 self.lookback_days = lookback_days
21 self.rebalance_days = rebalance_days
22 self.regularization = regularization
23
24 def find_basket(self,
25 asset_prices: pd.DataFrame,
26 benchmark_price: pd.Series) -> dict:
27 """
28 Find optimal basket weights that cointegrate with benchmark
29
30 Args:
31 asset_prices: DataFrame with asset prices
32 benchmark_price: Benchmark price series
33
34 Returns:
35 Dict with weights and statistics
36 """
37 # Use recent data
38 asset_prices_recent = asset_prices.tail(self.lookback_days)
39 benchmark_recent = benchmark_price.tail(self.lookback_days)
40
41 # Regress benchmark on assets using Ridge regression
42 # This finds weights w such that: Benchmark ≈ w^T * Assets
43 model = Ridge(alpha=self.regularization)
44 model.fit(asset_prices_recent, benchmark_recent)
45
46 weights = model.coef_
47 intercept = model.intercept_
48
49 # Calculate spread
50 basket_value = asset_prices_recent @ weights
51 spread = benchmark_recent - basket_value
52
53 # Test stationarity
54 adf_result = adfuller(spread, maxlag=1)
55 is_stationary = adf_result[1] < 0.05
56
57 # Calculate half-life
58 spread_lag = spread.shift(1).dropna()
59 spread_curr = spread[1:]
60 ar_model = OLS(spread_curr, spread_lag).fit()
61 b = ar_model.params[0]
62
63 if 0 < b < 1:
64 half_life = np.log(2) / (-np.log(b))
65 else:
66 half_life = np.inf
67
68 return {
69 'weights': pd.Series(weights, index=asset_prices.columns),
70 'intercept': intercept,
71 'r_squared': model.score(asset_prices_recent, benchmark_recent),
72 'is_stationary': is_stationary,
73 'adf_pvalue': adf_result[1],
74 'half_life': half_life,
75 'spread_mean': spread.mean(),
76 'spread_std': spread.std()
77 }
78
79 def backtest(self,
80 asset_prices: pd.DataFrame,
81 benchmark_price: pd.Series,
82 entry_z: float = 2.0,
83 exit_z: float = 0.5,
84 transaction_cost: float = 0.001) -> pd.DataFrame:
85 """
86 Backtest basket mean reversion strategy
87
88 Args:
89 asset_prices: Asset prices
90 benchmark_price: Benchmark price
91 entry_z: Z-score threshold for entry
92 exit_z: Z-score threshold for exit
93 transaction_cost: Transaction cost (fraction of trade value)
94
95 Returns:
96 DataFrame with positions, P&L, and statistics
97 """
98 results = []
99 current_weights = None
100 position = 0 # 0: flat, 1: long spread, -1: short spread
101
102 for i in range(self.lookback_days, len(asset_prices), self.rebalance_days):
103 # Recompute basket weights
104 window_start = max(0, i - self.lookback_days)
105 window_end = i
106
107 basket_result = self.find_basket(
108 asset_prices.iloc[window_start:window_end],
109 benchmark_price.iloc[window_start:window_end]
110 )
111
112 current_weights = basket_result['weights']
113 spread_mean = basket_result['spread_mean']
114 spread_std = basket_result['spread_std']
115
116 # Calculate spread for next rebalance_days
117 for j in range(i, min(i + self.rebalance_days, len(asset_prices))):
118 basket_value = asset_prices.iloc[j] @ current_weights
119 spread = benchmark_price.iloc[j] - basket_value
120 z_score = (spread - spread_mean) / spread_std if spread_std > 0 else 0
121
122 # Generate signal
123 new_position = position
124
125 if position == 0:
126 if z_score > entry_z:
127 new_position = -1 # Short spread (short benchmark, long basket)
128 elif z_score < -entry_z:
129 new_position = 1 # Long spread (long benchmark, short basket)
130 else:
131 if abs(z_score) < exit_z:
132 new_position = 0 # Exit
133
134 # Calculate P&L
135 if j > 0:
136 spread_change = spread - results[-1]['spread']
137 pnl = position * spread_change
138
139 # Transaction costs
140 if new_position != position:
141 tc = transaction_cost * abs(new_position - position)
142 pnl -= tc
143 else:
144 tc = 0
145 else:
146 pnl = 0
147 tc = 0
148
149 position = new_position
150
151 results.append({
152 'date': asset_prices.index[j],
153 'spread': spread,
154 'z_score': z_score,
155 'position': position,
156 'pnl': pnl,
157 'transaction_cost': tc
158 })
159
160 df_results = pd.DataFrame(results)
161 df_results['cumulative_pnl'] = df_results['pnl'].cumsum()
162
163 return df_results
164
165
166# Example: Basket mean reversion
167if __name__ == "__main__":
168 # Generate sample data: 5 assets + benchmark
169 np.random.seed(42)
170 n_days = 500
171 n_assets = 5
172
173 # Common factor (market)
174 market_factor = np.cumsum(np.random.randn(n_days)) * 0.5
175
176 # Assets follow market + idiosyncratic noise
177 asset_prices = pd.DataFrame({
178 f'Asset_{i}': 100 + 0.8 * market_factor + np.cumsum(np.random.randn(n_days) * 0.2)
179 for i in range(n_assets)
180 }, index=pd.date_range(end='2025-11-25', periods=n_days, freq='D'))
181
182 # Benchmark is pure market factor
183 benchmark_price = pd.Series(
184 100 + market_factor + np.random.randn(n_days) * 0.1,
185 index=asset_prices.index
186 )
187
188 # Find basket
189 basket_strategy = BasketMeanReversion(lookback_days=60, rebalance_days=20)
190 basket_result = basket_strategy.find_basket(asset_prices, benchmark_price)
191
192 print("Basket Weights:")
193 print(basket_result['weights'])
194 print(f"\nR-squared: {basket_result['r_squared']:.4f}")
195 print(f"Is stationary: {basket_result['is_stationary']}")
196 print(f"Half-life: {basket_result['half_life']:.2f} days")
197
198 # Backtest
199 backtest_results = basket_strategy.backtest(
200 asset_prices, benchmark_price,
201 entry_z=2.0, exit_z=0.5,
202 transaction_cost=0.001
203 )
204
205 print(f"\nBacktest Results:")
206 print(f"Total P&L: ${backtest_results['cumulative_pnl'].iloc[-1]:.2f}")
207 print(f"Sharpe Ratio: {backtest_results['pnl'].mean() / backtest_results['pnl'].std() * np.sqrt(252):.2f}")
208 print(f"Number of trades: {backtest_results['position'].diff().abs().sum() / 2:.0f}")
209Mean reversion strategies trade frequently—transaction costs matter!
1class TransactionCostModel:
2 """
3 Realistic transaction cost model
4
5 Includes:
6 - Bid-ask spread
7 - Market impact
8 - Commissions
9 """
10
11 def __init__(self,
12 bid_ask_spread_bps: float = 5.0,
13 market_impact_coef: float = 0.1,
14 commission_bps: float = 1.0):
15 """
16 Args:
17 bid_ask_spread_bps: Bid-ask spread in basis points
18 market_impact_coef: Market impact coefficient (bps per $1M traded)
19 commission_bps: Commission in basis points
20 """
21 self.bid_ask_spread_bps = bid_ask_spread_bps
22 self.market_impact_coef = market_impact_coef
23 self.commission_bps = commission_bps
24
25 def calculate_cost(self,
26 trade_value: float,
27 adv: float = 10_000_000) -> float:
28 """
29 Calculate total transaction cost
30
31 Args:
32 trade_value: Dollar value of trade
33 adv: Average daily volume (for market impact)
34
35 Returns:
36 Total cost in dollars
37 """
38 # Bid-ask spread cost
39 spread_cost = abs(trade_value) * self.bid_ask_spread_bps / 10000
40
41 # Market impact (proportional to trade size / ADV)
42 participation_rate = abs(trade_value) / adv
43 impact_bps = self.market_impact_coef * participation_rate * 10000
44 impact_cost = abs(trade_value) * impact_bps / 10000
45
46 # Commission
47 commission = abs(trade_value) * self.commission_bps / 10000
48
49 total_cost = spread_cost + impact_cost + commission
50
51 return total_cost
521class PairsRiskManager:
2 """
3 Risk management for pairs trading
4
5 Implements:
6 - Position limits
7 - Stop-loss
8 - Correlation monitoring
9 - Drawdown limits
10 """
11
12 def __init__(self,
13 max_position_size: float = 1_000_000,
14 stop_loss_z: float = 4.0,
15 max_drawdown_pct: float = 0.10,
16 min_correlation: float = 0.5):
17 self.max_position_size = max_position_size
18 self.stop_loss_z = stop_loss_z
19 self.max_drawdown_pct = max_drawdown_pct
20 self.min_correlation = min_correlation
21 self.peak_equity = 0
22
23 def check_position_limit(self, position_value: float) -> bool:
24 """Check if position exceeds limit"""
25 return abs(position_value) <= self.max_position_size
26
27 def check_stop_loss(self, z_score: float, position: int) -> bool:
28 """
29 Check if stop-loss is triggered
30
31 Returns True if position should be closed
32 """
33 # If long spread and z-score goes even more negative (wrong direction)
34 if position > 0 and z_score < -self.stop_loss_z:
35 return True
36
37 # If short spread and z-score goes even more positive
38 if position < 0 and z_score > self.stop_loss_z:
39 return True
40
41 return False
42
43 def check_correlation(self, p1: pd.Series, p2: pd.Series) -> bool:
44 """Check if correlation is still sufficient"""
45 recent_corr = p1.tail(20).corr(p2.tail(20))
46 return abs(recent_corr) >= self.min_correlation
47
48 def check_drawdown(self, current_equity: float) -> bool:
49 """
50 Check if drawdown limit is exceeded
51
52 Returns True if trading should stop
53 """
54 self.peak_equity = max(self.peak_equity, current_equity)
55
56 if self.peak_equity == 0:
57 return False
58
59 drawdown = (self.peak_equity - current_equity) / self.peak_equity
60
61 return drawdown >= self.max_drawdown_pct
62Mean reversion strategies are powerful but require careful implementation:
Next Steps:
About the Author: This article is part of NordVarg's series on production-grade algorithmic trading. For related content, see our articles on statistical arbitrage, market making, and portfolio optimization.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.