TL;DR – Mean reversion strategies profit from temporary price divergences. This guide covers pairs trading with cointegration, Ornstein-Uhlenbeck parameter estimation, Kalman filters for dynamic hedging, and basket construction with production backtesting.
Mean reversion exploits the tendency of prices to return to equilibrium:
Key assumption: Prices deviate temporarily but revert to long-term relationship.
Two non-stationary series and are cointegrated if:
where is stationary (mean-reverting).
Difference from correlation:
1import numpy as np
2import pandas as pd
3from statsmodels.tsa.stattools import adfuller, coint
4import matplotlib.pyplot as plt
5
6def engle_granger_test(y, x, significance=0.05):
7 """
8 Test for cointegration using Engle-Granger method.
9
10 Returns:
11 cointegrated: bool
12 hedge_ratio: float
13 residuals: array
14 """
15 # Step 1: Estimate hedge ratio via OLS
16 from sklearn.linear_model import LinearRegression
17
18 model = LinearRegression()
19 model.fit(x.reshape(-1, 1), y)
20 hedge_ratio = model.coef_[0]
21
22 # Step 2: Test residuals for stationarity
23 residuals = y - hedge_ratio * x
24 adf_result = adfuller(residuals)
25
26 p_value = adf_result[1]
27 cointegrated = p_value < significance
28
29 return {
30 'cointegrated': cointegrated,
31 'p_value': p_value,
32 'hedge_ratio': hedge_ratio,
33 'residuals': residuals,
34 'adf_statistic': adf_result[0]
35 }
36
37# Example: Test two stock prices
38np.random.seed(42)
39n = 252
40
41# Generate cointegrated series
42x = np.cumsum(np.random.normal(0, 1, n)) + 100
43noise = np.random.normal(0, 0.5, n)
44y = 1.5 * x + 10 + noise # Cointegrated with x
45
46result = engle_granger_test(y, x)
47
48print("Cointegrated:", result['cointegrated'])
49print("P-value:", result['p_value'])
50print("Hedge ratio:", result['hedge_ratio'])
51
52# Visualize
53fig, axes = plt.subplots(2, 1, figsize=(14, 8))
54
55axes[0].plot(x, label='Stock X')
56axes[0].plot(y, label='Stock Y')
57axes[0].set_ylabel('Price')
58axes[0].set_title('Price Series')
59axes[0].legend()
60axes[0].grid(True, alpha=0.3)
61
62axes[1].plot(result['residuals'])
63axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
64axes[1].axhline(y=np.mean(result['residuals']), color='red', linestyle='--',
65 label='Mean')
66axes[1].fill_between(range(len(result['residuals'])),
67 np.mean(result['residuals']) - 2*np.std(result['residuals']),
68 np.mean(result['residuals']) + 2*np.std(result['residuals']),
69 alpha=0.2)
70axes[1].set_xlabel('Time')
71axes[1].set_ylabel('Spread')
72axes[1].set_title('Cointegration Residuals (Spread)')
73axes[1].legend()
74axes[1].grid(True, alpha=0.3)
75
76plt.tight_layout()
77plt.show()
78The spread follows an OU process:
Where:
Half-life: Time for spread to revert halfway:
1class OrnsteinUhlenbeck:
2 """Ornstein-Uhlenbeck process for mean reversion."""
3
4 def __init__(self):
5 self.theta = None
6 self.mu = None
7 self.sigma = None
8 self.half_life = None
9
10 def fit(self, spread, dt=1.0):
11 """
12 Estimate OU parameters from spread data.
13
14 Uses discrete-time approximation:
15 X_t = X_{t-1} + theta*(mu - X_{t-1})*dt + sigma*sqrt(dt)*epsilon
16
17 Rearranging: X_t - X_{t-1} = theta*mu*dt - theta*X_{t-1}*dt + noise
18 """
19 spread = np.array(spread)
20
21 # Differences
22 dx = np.diff(spread)
23 x_lag = spread[:-1]
24
25 # OLS regression: dx = a + b*x_lag
26 from sklearn.linear_model import LinearRegression
27
28 model = LinearRegression()
29 model.fit(x_lag.reshape(-1, 1), dx)
30
31 b = model.coef_[0]
32 a = model.intercept_
33
34 # Extract parameters
35 self.theta = -b / dt
36 self.mu = a / (self.theta * dt)
37
38 # Estimate sigma from residuals
39 residuals = dx - (a + b * x_lag)
40 self.sigma = np.std(residuals) / np.sqrt(dt)
41
42 # Half-life
43 if self.theta > 0:
44 self.half_life = np.log(2) / self.theta
45 else:
46 self.half_life = np.inf
47
48 return self
49
50 def expected_value(self, x0, t):
51 """Expected value at time t given current value x0."""
52 return self.mu + (x0 - self.mu) * np.exp(-self.theta * t)
53
54 def std_at_time(self, t):
55 """Standard deviation at time t."""
56 if self.theta > 0:
57 return self.sigma * np.sqrt((1 - np.exp(-2 * self.theta * t)) / (2 * self.theta))
58 else:
59 return self.sigma * np.sqrt(t)
60
61# Fit OU to spread
62ou = OrnsteinUhlenbeck()
63ou.fit(result['residuals'])
64
65print("OU Parameters:")
66print(" Theta (mean reversion speed):", ou.theta)
67print(" Mu (long-term mean):", ou.mu)
68print(" Sigma (volatility):", ou.sigma)
69print(" Half-life (days):", ou.half_life)
70
71# Plot expected reversion
72current_spread = result['residuals'][-1]
73forecast_horizon = 20
74t_forecast = np.arange(0, forecast_horizon)
75expected = [ou.expected_value(current_spread, t) for t in t_forecast]
76std = [ou.std_at_time(t) for t in t_forecast]
77
78plt.figure(figsize=(12, 6))
79plt.plot(t_forecast, expected, 'b-', linewidth=2, label='Expected spread')
80plt.fill_between(t_forecast,
81 np.array(expected) - 2*np.array(std),
82 np.array(expected) + 2*np.array(std),
83 alpha=0.3, label='95% confidence')
84plt.axhline(y=ou.mu, color='red', linestyle='--', label='Long-term mean')
85plt.axhline(y=current_spread, color='green', linestyle='--', label='Current spread')
86plt.xlabel('Days ahead')
87plt.ylabel('Spread')
88plt.title('OU Process: Expected Spread Reversion')
89plt.legend()
90plt.grid(True, alpha=0.3)
91plt.show()
921class PairsTradingStrategy:
2 """Complete pairs trading strategy."""
3
4 def __init__(self, entry_threshold=2.0, exit_threshold=0.5):
5 self.entry_threshold = entry_threshold
6 self.exit_threshold = exit_threshold
7
8 self.hedge_ratio = None
9 self.ou_model = None
10
11 self.positions = []
12 self.pnl = []
13
14 def fit(self, price_x, price_y):
15 """Fit cointegration and OU model."""
16 # Test cointegration
17 coint_result = engle_granger_test(price_y, price_x)
18
19 if not coint_result['cointegrated']:
20 print("Warning: Series not cointegrated!")
21
22 self.hedge_ratio = coint_result['hedge_ratio']
23
24 # Fit OU to spread
25 spread = coint_result['residuals']
26 self.ou_model = OrnsteinUhlenbeck().fit(spread)
27
28 return self
29
30 def generate_signals(self, price_x, price_y):
31 """
32 Generate trading signals.
33
34 Returns:
35 signals: 1 (long spread), -1 (short spread), 0 (neutral)
36 """
37 # Compute spread
38 spread = price_y - self.hedge_ratio * price_x
39
40 # Z-score
41 z_score = (spread - self.ou_model.mu) / self.ou_model.sigma
42
43 # Generate signals
44 signals = np.zeros(len(spread))
45 position = 0
46
47 for i in range(len(spread)):
48 if position == 0:
49 # Entry signals
50 if z_score[i] > self.entry_threshold:
51 position = -1 # Short spread (sell Y, buy X)
52 elif z_score[i] < -self.entry_threshold:
53 position = 1 # Long spread (buy Y, sell X)
54 else:
55 # Exit signals
56 if abs(z_score[i]) < self.exit_threshold:
57 position = 0
58
59 signals[i] = position
60
61 return signals, z_score
62
63 def backtest(self, price_x, price_y, signals):
64 """
65 Backtest strategy.
66
67 Returns:
68 pnl: Cumulative P&L
69 trades: List of trades
70 """
71 spread = price_y - self.hedge_ratio * price_x
72 spread_returns = np.diff(spread)
73
74 # P&L from spread changes
75 pnl = np.zeros(len(signals))
76 trades = []
77
78 for i in range(1, len(signals)):
79 # Position change
80 if signals[i] != signals[i-1]:
81 trades.append({
82 'time': i,
83 'action': 'enter' if signals[i] != 0 else 'exit',
84 'position': signals[i],
85 'spread': spread[i]
86 })
87
88 # P&L from holding position
89 if i < len(spread_returns):
90 pnl[i] = pnl[i-1] + signals[i-1] * spread_returns[i-1]
91
92 return pnl, trades
93
94# Example backtest
95strategy = PairsTradingStrategy(entry_threshold=2.0, exit_threshold=0.5)
96strategy.fit(x[:200], y[:200]) # Fit on training data
97
98# Generate signals on test data
99signals, z_scores = strategy.generate_signals(x[200:], y[200:])
100pnl, trades = strategy.backtest(x[200:], y[200:], signals)
101
102# Plot results
103fig, axes = plt.subplots(3, 1, figsize=(14, 12))
104
105# Prices
106axes[0].plot(x[200:], label='Stock X')
107axes[0].plot(y[200:], label='Stock Y')
108axes[0].set_ylabel('Price')
109axes[0].set_title('Stock Prices')
110axes[0].legend()
111axes[0].grid(True, alpha=0.3)
112
113# Z-score and signals
114axes[1].plot(z_scores, label='Z-score')
115axes[1].axhline(y=strategy.entry_threshold, color='red', linestyle='--', alpha=0.5)
116axes[1].axhline(y=-strategy.entry_threshold, color='red', linestyle='--', alpha=0.5)
117axes[1].axhline(y=strategy.exit_threshold, color='green', linestyle='--', alpha=0.5)
118axes[1].axhline(y=-strategy.exit_threshold, color='green', linestyle='--', alpha=0.5)
119
120# Mark entry/exit points
121for trade in trades:
122 color = 'green' if trade['action'] == 'enter' else 'red'
123 axes[1].axvline(x=trade['time']-200, color=color, alpha=0.3, linestyle=':')
124
125axes[1].set_ylabel('Z-score')
126axes[1].set_title('Spread Z-Score and Trading Signals')
127axes[1].legend()
128axes[1].grid(True, alpha=0.3)
129
130# P&L
131axes[2].plot(pnl)
132axes[2].set_xlabel('Time')
133axes[2].set_ylabel('Cumulative P&L')
134axes[2].set_title('Strategy Performance')
135axes[2].grid(True, alpha=0.3)
136
137plt.tight_layout()
138plt.show()
139
140print("Total P&L:", pnl[-1])
141print("Number of trades:", len(trades))
142print("Sharpe ratio:", np.mean(np.diff(pnl)) / np.std(np.diff(pnl)) * np.sqrt(252))
1431from statsmodels.tsa.vector_ar.vecm import coint_johansen
2
3def johansen_cointegration_test(data, significance=0.05):
4 """
5 Johansen test for multiple cointegrating relationships.
6
7 Args:
8 data: DataFrame with multiple price series
9
10 Returns:
11 n_coint: Number of cointegrating relationships
12 eigenvectors: Cointegrating vectors
13 """
14 # Run Johansen test
15 result = coint_johansen(data, det_order=0, k_ar_diff=1)
16
17 # Trace statistic test
18 trace_stat = result.lr1
19 critical_values = result.cvt[:, 1] # 5% significance
20
21 n_coint = np.sum(trace_stat > critical_values)
22
23 return {
24 'n_cointegrating': n_coint,
25 'trace_statistic': trace_stat,
26 'critical_values': critical_values,
27 'eigenvectors': result.evec,
28 'eigenvalues': result.eig
29 }
30
31# Example: 3 stocks
32np.random.seed(42)
33n = 252
34
35# Generate 3 cointegrated series
36factor = np.cumsum(np.random.normal(0, 1, n))
37stock1 = factor + np.random.normal(0, 0.5, n) + 100
38stock2 = 1.5 * factor + np.random.normal(0, 0.5, n) + 150
39stock3 = 0.8 * factor + np.random.normal(0, 0.5, n) + 80
40
41data = pd.DataFrame({
42 'stock1': stock1,
43 'stock2': stock2,
44 'stock3': stock3
45})
46
47johansen_result = johansen_cointegration_test(data)
48
49print("Number of cointegrating relationships:", johansen_result['n_cointegrating'])
50print("Cointegrating vectors:")
51print(johansen_result['eigenvectors'][:, :johansen_result['n_cointegrating']])
521class BasketTradingStrategy:
2 """Trade baskets of cointegrated securities."""
3
4 def __init__(self, n_components=1):
5 self.n_components = n_components
6 self.weights = None
7 self.ou_model = None
8
9 def fit(self, prices):
10 """
11 Find cointegrating basket using Johansen test.
12
13 Args:
14 prices: DataFrame with multiple price series
15 """
16 # Johansen test
17 result = johansen_cointegration_test(prices, significance=0.05)
18
19 if result['n_cointegrating'] == 0:
20 raise ValueError("No cointegrating relationships found!")
21
22 # Use first cointegrating vector
23 self.weights = result['eigenvectors'][:, 0]
24
25 # Compute basket value
26 basket = (prices.values @ self.weights).flatten()
27
28 # Fit OU to basket
29 self.ou_model = OrnsteinUhlenbeck().fit(basket)
30
31 return self
32
33 def compute_basket(self, prices):
34 """Compute basket value from prices."""
35 return (prices.values @ self.weights).flatten()
36
37 def generate_signals(self, prices, entry_z=2.0, exit_z=0.5):
38 """Generate trading signals for basket."""
39 basket = self.compute_basket(prices)
40
41 # Z-score
42 z_score = (basket - self.ou_model.mu) / self.ou_model.sigma
43
44 # Signals
45 signals = np.zeros(len(basket))
46 position = 0
47
48 for i in range(len(basket)):
49 if position == 0:
50 if z_score[i] > entry_z:
51 position = -1 # Short basket
52 elif z_score[i] < -entry_z:
53 position = 1 # Long basket
54 else:
55 if abs(z_score[i]) < exit_z:
56 position = 0
57
58 signals[i] = position
59
60 return signals, z_score, basket
61
62# Example
63basket_strategy = BasketTradingStrategy()
64basket_strategy.fit(data[:200])
65
66print("Basket weights:", basket_strategy.weights)
67print("OU half-life:", basket_strategy.ou_model.half_life)
68
69signals, z_scores, basket = basket_strategy.generate_signals(data[200:])
70
71# Backtest
72basket_returns = np.diff(basket)
73pnl = np.cumsum(signals[:-1] * basket_returns)
74
75plt.figure(figsize=(14, 8))
76
77plt.subplot(2, 1, 1)
78plt.plot(basket, label='Basket value')
79plt.ylabel('Basket')
80plt.title('Mean-Reverting Basket')
81plt.legend()
82plt.grid(True, alpha=0.3)
83
84plt.subplot(2, 1, 2)
85plt.plot(pnl)
86plt.xlabel('Time')
87plt.ylabel('Cumulative P&L')
88plt.title('Basket Trading P&L')
89plt.grid(True, alpha=0.3)
90
91plt.tight_layout()
92plt.show()
93Mean reversion strategies are foundational in statistical arbitrage. Always test for cointegration, estimate OU parameters carefully, and monitor for regime changes. Use Kalman filters for dynamic hedging and implement robust risk management.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.