TL;DR – Kalman filters optimally estimate hidden states from noisy observations. This guide covers applications to pairs trading, dynamic beta estimation, yield curve filtering, and volatility forecasting with production-ready implementations.
Kalman filters excel at:
Key advantage: Recursive updates with minimal memory (no need to store all history).
State equation (hidden process):
Observation equation (measurements):
Where:
Prediction step:
Update step:
1import numpy as np
2import matplotlib.pyplot as plt
3
4class KalmanFilter:
5 """Basic Kalman filter for linear state-space models."""
6
7 def __init__(self, F, H, Q, R, x0, P0):
8 """
9 Initialize Kalman filter.
10
11 Args:
12 F: State transition matrix
13 H: Observation matrix
14 Q: Process noise covariance
15 R: Measurement noise covariance
16 x0: Initial state estimate
17 P0: Initial state covariance
18 """
19 self.F = F
20 self.H = H
21 self.Q = Q
22 self.R = R
23
24 self.x = x0
25 self.P = P0
26
27 self.n_states = len(x0)
28 self.n_obs = H.shape[0]
29
30 def predict(self, u=None):
31 """Prediction step."""
32 # State prediction
33 self.x = self.F @ self.x
34 if u is not None:
35 self.x += u
36
37 # Covariance prediction
38 self.P = self.F @ self.P @ self.F.T + self.Q
39
40 return self.x, self.P
41
42 def update(self, y):
43 """Update step with new observation."""
44 # Innovation
45 y_pred = self.H @ self.x
46 innovation = y - y_pred
47
48 # Innovation covariance
49 S = self.H @ self.P @ self.H.T + self.R
50
51 # Kalman gain
52 K = self.P @ self.H.T @ np.linalg.inv(S)
53
54 # State update
55 self.x = self.x + K @ innovation
56
57 # Covariance update
58 self.P = (np.eye(self.n_states) - K @ self.H) @ self.P
59
60 return self.x, self.P
61
62 def filter(self, observations):
63 """
64 Run filter on sequence of observations.
65
66 Returns:
67 filtered_states, filtered_covariances
68 """
69 n_steps = len(observations)
70 filtered_states = np.zeros((n_steps, self.n_states))
71 filtered_covs = np.zeros((n_steps, self.n_states, self.n_states))
72
73 for t in range(n_steps):
74 self.predict()
75 self.update(observations[t])
76
77 filtered_states[t] = self.x
78 filtered_covs[t] = self.P
79
80 return filtered_states, filtered_covs
81
82# Example: Tracking position with noisy GPS
83np.random.seed(42)
84n_steps = 100
85
86# True trajectory (sine wave)
87t = np.linspace(0, 10, n_steps)
88true_position = np.sin(t)
89true_velocity = np.cos(t)
90
91# Noisy observations
92observations = true_position + np.random.normal(0, 0.5, n_steps)
93
94# Kalman filter setup
95dt = t[1] - t[0]
96F = np.array([[1, dt], [0, 1]]) # Position-velocity model
97H = np.array([[1, 0]]) # Observe position only
98Q = np.eye(2) * 0.01 # Small process noise
99R = np.array([[0.25]]) # Measurement noise
100
101kf = KalmanFilter(F, H, Q, R, x0=np.array([0, 0]), P0=np.eye(2))
102filtered_states, _ = kf.filter(observations.reshape(-1, 1))
103
104# Plot results
105plt.figure(figsize=(14, 5))
106plt.plot(t, true_position, 'g-', label='True position', linewidth=2)
107plt.plot(t, observations, 'r.', label='Noisy observations', alpha=0.5)
108plt.plot(t, filtered_states[:, 0], 'b-', label='Kalman filtered', linewidth=2)
109plt.xlabel('Time')
110plt.ylabel('Position')
111plt.title('Kalman Filter: Position Tracking')
112plt.legend()
113plt.grid(True, alpha=0.3)
114plt.show()
115Model the spread between two assets:
where is the time-varying hedge ratio.
State-space formulation:
1class PairsTradingKalman:
2 """Kalman filter for pairs trading with dynamic hedge ratio."""
3
4 def __init__(self, delta=1e-4, Ve=1e-3):
5 """
6 Initialize pairs trading filter.
7
8 Args:
9 delta: Process noise (how fast beta changes)
10 Ve: Measurement noise (spread volatility)
11 """
12 self.delta = delta
13 self.Ve = Ve
14
15 # Initial state
16 self.beta = 0
17 self.P = 1
18
19 self.betas = []
20 self.spreads = []
21
22 def update(self, x, y):
23 """
24 Update with new price observations.
25
26 Args:
27 x: Price of stock 1
28 y: Price of stock 2
29
30 Returns:
31 beta: Current hedge ratio
32 spread: Current spread
33 """
34 # Prediction
35 beta_pred = self.beta
36 P_pred = self.P + self.delta
37
38 # Update
39 # Innovation
40 y_pred = beta_pred * x
41 e = y - y_pred
42
43 # Innovation variance
44 Q = x**2 * P_pred + self.Ve
45
46 # Kalman gain
47 K = P_pred * x / Q
48
49 # State update
50 self.beta = beta_pred + K * e
51 self.P = P_pred * (1 - K * x)
52
53 # Compute spread
54 spread = y - self.beta * x
55
56 self.betas.append(self.beta)
57 self.spreads.append(spread)
58
59 return self.beta, spread
60
61 def generate_signals(self, threshold=2.0):
62 """
63 Generate trading signals based on spread.
64
65 Args:
66 threshold: Number of standard deviations
67
68 Returns:
69 signals: 1 (long spread), -1 (short spread), 0 (neutral)
70 """
71 spreads = np.array(self.spreads)
72 mean_spread = np.mean(spreads)
73 std_spread = np.std(spreads)
74
75 z_scores = (spreads - mean_spread) / std_spread
76
77 signals = np.zeros(len(z_scores))
78 signals[z_scores > threshold] = -1 # Short spread (sell Y, buy X)
79 signals[z_scores < -threshold] = 1 # Long spread (buy Y, sell X)
80
81 return signals
82
83# Example: Pairs trading simulation
84np.random.seed(42)
85n_days = 252
86
87# Generate correlated stock prices
88returns1 = np.random.normal(0.0005, 0.02, n_days)
89returns2 = 0.8 * returns1 + np.random.normal(0, 0.01, n_days)
90
91prices1 = 100 * np.exp(np.cumsum(returns1))
92prices2 = 100 * np.exp(np.cumsum(returns2))
93
94# Apply Kalman filter
95pairs_kf = PairsTradingKalman(delta=1e-4, Ve=1e-2)
96
97for i in range(n_days):
98 beta, spread = pairs_kf.update(prices1[i], prices2[i])
99
100# Generate signals
101signals = pairs_kf.generate_signals(threshold=2.0)
102
103# Plot results
104fig, axes = plt.subplots(3, 1, figsize=(14, 10))
105
106# Prices
107axes[0].plot(prices1, label='Stock 1')
108axes[0].plot(prices2, label='Stock 2')
109axes[0].set_ylabel('Price')
110axes[0].set_title('Stock Prices')
111axes[0].legend()
112axes[0].grid(True, alpha=0.3)
113
114# Dynamic beta
115axes[1].plot(pairs_kf.betas)
116axes[1].set_ylabel('Hedge Ratio (Beta)')
117axes[1].set_title('Dynamic Hedge Ratio')
118axes[1].grid(True, alpha=0.3)
119
120# Spread and signals
121axes[2].plot(pairs_kf.spreads, label='Spread')
122axes[2].axhline(y=np.mean(pairs_kf.spreads), color='black', linestyle='--', alpha=0.5)
123axes[2].fill_between(range(n_days),
124 np.mean(pairs_kf.spreads) - 2*np.std(pairs_kf.spreads),
125 np.mean(pairs_kf.spreads) + 2*np.std(pairs_kf.spreads),
126 alpha=0.2)
127
128# Mark trading signals
129long_signals = np.where(signals == 1)[0]
130short_signals = np.where(signals == -1)[0]
131axes[2].scatter(long_signals, np.array(pairs_kf.spreads)[long_signals],
132 color='green', marker='^', s=100, label='Long')
133axes[2].scatter(short_signals, np.array(pairs_kf.spreads)[short_signals],
134 color='red', marker='v', s=100, label='Short')
135
136axes[2].set_xlabel('Days')
137axes[2].set_ylabel('Spread')
138axes[2].set_title('Spread and Trading Signals')
139axes[2].legend()
140axes[2].grid(True, alpha=0.3)
141
142plt.tight_layout()
143plt.show()
144Track time-varying market exposure:
1class DynamicBetaKalman:
2 """Estimate time-varying beta using Kalman filter."""
3
4 def __init__(self, delta=1e-5):
5 self.delta = delta
6 self.beta = 1.0
7 self.alpha = 0.0
8 self.P = np.eye(2)
9
10 self.betas = []
11 self.alphas = []
12
13 def update(self, r_market, r_stock):
14 """
15 Update with new returns.
16
17 Model: r_stock = alpha + beta * r_market + epsilon
18 """
19 # State: [alpha, beta]
20 # Observation: r_stock
21 # Predictor: [1, r_market]
22
23 # Prediction
24 state_pred = np.array([self.alpha, self.beta])
25 P_pred = self.P + self.delta * np.eye(2)
26
27 # Observation
28 H = np.array([1, r_market])
29 y = r_stock
30 y_pred = H @ state_pred
31
32 # Innovation
33 innovation = y - y_pred
34 S = H @ P_pred @ H.T + 0.0001 # Measurement noise
35
36 # Kalman gain
37 K = P_pred @ H / S
38
39 # Update
40 state = state_pred + K * innovation
41 self.alpha, self.beta = state
42 self.P = (np.eye(2) - np.outer(K, H)) @ P_pred
43
44 self.alphas.append(self.alpha)
45 self.betas.append(self.beta)
46
47 return self.alpha, self.beta
48
49# Example
50np.random.seed(42)
51n_days = 500
52
53# Market returns
54r_market = np.random.normal(0.001, 0.02, n_days)
55
56# Stock returns with time-varying beta
57true_beta = 1.0 + 0.5 * np.sin(np.linspace(0, 4*np.pi, n_days))
58r_stock = 0.0005 + true_beta * r_market + np.random.normal(0, 0.01, n_days)
59
60# Estimate dynamic beta
61beta_kf = DynamicBetaKalman(delta=1e-4)
62
63for i in range(n_days):
64 alpha, beta = beta_kf.update(r_market[i], r_stock[i])
65
66# Plot
67plt.figure(figsize=(14, 6))
68plt.plot(true_beta, label='True beta', linewidth=2)
69plt.plot(beta_kf.betas, label='Estimated beta', linewidth=2, alpha=0.7)
70plt.xlabel('Days')
71plt.ylabel('Beta')
72plt.title('Dynamic Beta Estimation with Kalman Filter')
73plt.legend()
74plt.grid(True, alpha=0.3)
75plt.show()
76For non-linear state-space models, use EKF with linearization.
1class ExtendedKalmanFilter:
2 """Extended Kalman filter for non-linear models."""
3
4 def __init__(self, f, h, F_jacobian, H_jacobian, Q, R, x0, P0):
5 """
6 Args:
7 f: Non-linear state transition function
8 h: Non-linear observation function
9 F_jacobian: Jacobian of f
10 H_jacobian: Jacobian of h
11 """
12 self.f = f
13 self.h = h
14 self.F_jacobian = F_jacobian
15 self.H_jacobian = H_jacobian
16 self.Q = Q
17 self.R = R
18
19 self.x = x0
20 self.P = P0
21
22 def predict(self):
23 """Prediction with non-linear dynamics."""
24 # Non-linear state prediction
25 self.x = self.f(self.x)
26
27 # Linearize
28 F = self.F_jacobian(self.x)
29
30 # Covariance prediction
31 self.P = F @ self.P @ F.T + self.Q
32
33 return self.x
34
35 def update(self, y):
36 """Update with non-linear observation."""
37 # Predicted observation
38 y_pred = self.h(self.x)
39
40 # Linearize
41 H = self.H_jacobian(self.x)
42
43 # Innovation
44 innovation = y - y_pred
45 S = H @ self.P @ H.T + self.R
46
47 # Kalman gain
48 K = self.P @ H.T @ np.linalg.inv(S)
49
50 # Update
51 self.x = self.x + K @ innovation
52 self.P = (np.eye(len(self.x)) - K @ H) @ self.P
53
54 return self.x
55
56# Example: Volatility estimation (non-linear)
57def f_vol(x):
58 """State transition: log-volatility random walk."""
59 return x + np.random.normal(0, 0.01)
60
61def h_vol(x):
62 """Observation: squared returns ~ volatility."""
63 return np.exp(x)
64
65def F_jacobian_vol(x):
66 return np.array([[1.0]])
67
68def H_jacobian_vol(x):
69 return np.array([[np.exp(x)]])
70
71# Simulate
72n_days = 200
73true_vol = 0.2 * np.exp(0.5 * np.sin(np.linspace(0, 4*np.pi, n_days)))
74returns = np.random.normal(0, true_vol)
75squared_returns = returns**2
76
77# EKF for volatility
78ekf = ExtendedKalmanFilter(
79 f=f_vol,
80 h=h_vol,
81 F_jacobian=F_jacobian_vol,
82 H_jacobian=H_jacobian_vol,
83 Q=np.array([[0.0001]]),
84 R=np.array([[0.01]]),
85 x0=np.array([np.log(0.2)]),
86 P0=np.array([[0.1]])
87)
88
89estimated_log_vols = []
90for sr in squared_returns:
91 ekf.predict()
92 ekf.update(np.array([sr]))
93 estimated_log_vols.append(ekf.x[0])
94
95estimated_vols = np.exp(estimated_log_vols)
96
97plt.figure(figsize=(14, 6))
98plt.plot(true_vol, label='True volatility', linewidth=2)
99plt.plot(estimated_vols, label='EKF estimate', linewidth=2, alpha=0.7)
100plt.xlabel('Days')
101plt.ylabel('Volatility')
102plt.title('Volatility Estimation with Extended Kalman Filter')
103plt.legend()
104plt.grid(True, alpha=0.3)
105plt.show()
106Kalman filters are essential for dynamic estimation in finance. Use basic Kalman for linear models, EKF for non-linear, and always validate with out-of-sample data. Perfect for pairs trading and dynamic risk management.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.