Factor investing has become the dominant paradigm in quantitative equity trading. This article shares our complete production implementation of multi-factor models, including data pipeline, signal generation, portfolio construction, and real-time monitoring.
Factor models decompose returns into systematic risk factors:
1R_i,t = α_i + β_i,1 * F_1,t + β_i,2 * F_2,t + ... + β_i,n * F_n,t + ε_i,t
2Common factors:
1import pandas as pd
2import numpy as np
3from typing import Dict, List
4from dataclasses import dataclass
5from datetime import datetime, timedelta
6
7@dataclass
8class FactorData:
9 date: pd.Timestamp
10 symbol: str
11 factors: Dict[str, float]
12 returns_1m: float
13 returns_12m: float
14 market_cap: float
15
16class FactorDataPipeline:
17 def __init__(self, data_source):
18 self.data_source = data_source
19
20 def fetch_fundamentals(self, symbols: List[str], date: pd.Timestamp) -> pd.DataFrame:
21 """Fetch fundamental data for factor calculation."""
22 query = f"""
23 SELECT
24 symbol,
25 book_value,
26 market_cap,
27 earnings,
28 revenue,
29 net_income,
30 total_assets,
31 total_liabilities,
32 operating_cash_flow,
33 fiscal_quarter_end
34 FROM fundamentals
35 WHERE symbol IN ({','.join(f"'{s}'" for s in symbols)})
36 AND fiscal_quarter_end <= '{date}'
37 ORDER BY symbol, fiscal_quarter_end DESC
38 """
39 return self.data_source.execute(query)
40
41 def fetch_price_data(self, symbols: List[str],
42 start_date: pd.Timestamp,
43 end_date: pd.Timestamp) -> pd.DataFrame:
44 """Fetch historical price data."""
45 query = f"""
46 SELECT
47 date,
48 symbol,
49 close,
50 volume,
51 returns
52 FROM prices
53 WHERE symbol IN ({','.join(f"'{s}'" for s in symbols)})
54 AND date BETWEEN '{start_date}' AND '{end_date}'
55 ORDER BY date, symbol
56 """
57 return self.data_source.execute(query)
58
59 def calculate_value_factors(self, fundamentals: pd.DataFrame,
60 prices: pd.DataFrame) -> pd.DataFrame:
61 """Calculate value-based factors."""
62 # Book-to-market
63 fundamentals['book_to_market'] = (
64 fundamentals['book_value'] / fundamentals['market_cap']
65 )
66
67 # Earnings yield
68 fundamentals['earnings_yield'] = (
69 fundamentals['earnings'] / fundamentals['market_cap']
70 )
71
72 # Cash flow yield
73 fundamentals['cf_yield'] = (
74 fundamentals['operating_cash_flow'] / fundamentals['market_cap']
75 )
76
77 return fundamentals[['symbol', 'book_to_market', 'earnings_yield', 'cf_yield']]
78
79 def calculate_momentum_factors(self, prices: pd.DataFrame) -> pd.DataFrame:
80 """Calculate momentum factors."""
81 results = []
82
83 for symbol in prices['symbol'].unique():
84 symbol_prices = prices[prices['symbol'] == symbol].sort_values('date')
85
86 # 12-month momentum (skipping last month)
87 returns_12m = symbol_prices['returns'].iloc[-252:-21].sum()
88
89 # 6-month momentum
90 returns_6m = symbol_prices['returns'].iloc[-126:-21].sum()
91
92 # 3-month momentum
93 returns_3m = symbol_prices['returns'].iloc[-63:-21].sum()
94
95 results.append({
96 'symbol': symbol,
97 'momentum_12m': returns_12m,
98 'momentum_6m': returns_6m,
99 'momentum_3m': returns_3m
100 })
101
102 return pd.DataFrame(results)
103
104 def calculate_quality_factors(self, fundamentals: pd.DataFrame) -> pd.DataFrame:
105 """Calculate quality factors."""
106 # ROE
107 fundamentals['roe'] = (
108 fundamentals['net_income'] / fundamentals['book_value']
109 )
110
111 # Profit margin
112 fundamentals['profit_margin'] = (
113 fundamentals['net_income'] / fundamentals['revenue']
114 )
115
116 # Asset turnover
117 fundamentals['asset_turnover'] = (
118 fundamentals['revenue'] / fundamentals['total_assets']
119 )
120
121 # Leverage (lower is better)
122 fundamentals['leverage'] = (
123 fundamentals['total_liabilities'] / fundamentals['total_assets']
124 )
125
126 return fundamentals[['symbol', 'roe', 'profit_margin',
127 'asset_turnover', 'leverage']]
128
129 def calculate_risk_factors(self, prices: pd.DataFrame) -> pd.DataFrame:
130 """Calculate risk-based factors."""
131 results = []
132
133 for symbol in prices['symbol'].unique():
134 symbol_prices = prices[prices['symbol'] == symbol].sort_values('date')
135 returns = symbol_prices['returns']
136
137 # Volatility (annualized)
138 volatility = returns.std() * np.sqrt(252)
139
140 # Beta (vs market - assume market is equal-weighted here)
141 market_returns = prices.groupby('date')['returns'].mean()
142 aligned_returns = symbol_prices.set_index('date')['returns']
143 covariance = aligned_returns.cov(market_returns)
144 market_var = market_returns.var()
145 beta = covariance / market_var if market_var > 0 else 1.0
146
147 # Max drawdown
148 cumulative = (1 + returns).cumprod()
149 running_max = cumulative.expanding().max()
150 drawdown = (cumulative - running_max) / running_max
151 max_drawdown = drawdown.min()
152
153 results.append({
154 'symbol': symbol,
155 'volatility': volatility,
156 'beta': beta,
157 'max_drawdown': max_drawdown
158 })
159
160 return pd.DataFrame(results)
1611class FactorSignalGenerator:
2 def __init__(self):
3 self.factor_weights = {
4 'value': 0.25,
5 'momentum': 0.25,
6 'quality': 0.25,
7 'low_vol': 0.25
8 }
9
10 def normalize_factor(self, factor_values: pd.Series) -> pd.Series:
11 """Z-score normalization, winsorized."""
12 # Winsorize at 3 standard deviations
13 mean = factor_values.mean()
14 std = factor_values.std()
15 lower = mean - 3 * std
16 upper = mean + 3 * std
17
18 winsorized = factor_values.clip(lower, upper)
19
20 # Z-score
21 z_scores = (winsorized - winsorized.mean()) / winsorized.std()
22
23 return z_scores
24
25 def combine_factor_group(self, factors: pd.DataFrame,
26 factor_names: List[str],
27 weights: List[float] = None) -> pd.Series:
28 """Combine multiple factors into single score."""
29 if weights is None:
30 weights = [1.0 / len(factor_names)] * len(factor_names)
31
32 combined = pd.Series(0.0, index=factors.index)
33
34 for factor_name, weight in zip(factor_names, weights):
35 if factor_name in factors.columns:
36 normalized = self.normalize_factor(factors[factor_name])
37 combined += weight * normalized
38
39 return combined
40
41 def generate_signals(self, all_factors: pd.DataFrame) -> pd.DataFrame:
42 """Generate combined factor signals."""
43 signals = pd.DataFrame(index=all_factors.index)
44 signals['symbol'] = all_factors['symbol']
45
46 # Value score
47 value_factors = ['book_to_market', 'earnings_yield', 'cf_yield']
48 signals['value_score'] = self.combine_factor_group(
49 all_factors, value_factors
50 )
51
52 # Momentum score
53 momentum_factors = ['momentum_12m', 'momentum_6m', 'momentum_3m']
54 signals['momentum_score'] = self.combine_factor_group(
55 all_factors, momentum_factors, [0.5, 0.3, 0.2]
56 )
57
58 # Quality score
59 quality_factors = ['roe', 'profit_margin', 'asset_turnover']
60 signals['quality_score'] = self.combine_factor_group(
61 all_factors, quality_factors
62 )
63 # Leverage is bad, so negate
64 signals['quality_score'] -= self.normalize_factor(all_factors['leverage'])
65
66 # Low volatility score (inverse of volatility)
67 signals['low_vol_score'] = -self.normalize_factor(all_factors['volatility'])
68
69 # Combined signal
70 signals['combined_score'] = (
71 self.factor_weights['value'] * signals['value_score'] +
72 self.factor_weights['momentum'] * signals['momentum_score'] +
73 self.factor_weights['quality'] * signals['quality_score'] +
74 self.factor_weights['low_vol'] * signals['low_vol_score']
75 )
76
77 return signals
781import cvxpy as cp
2from scipy.optimize import minimize
3
4class FactorPortfolioOptimizer:
5 def __init__(self,
6 max_position_size: float = 0.05,
7 max_turnover: float = 0.30,
8 target_num_positions: int = 50):
9 self.max_position_size = max_position_size
10 self.max_turnover = max_turnover
11 self.target_num_positions = target_num_positions
12
13 def optimize_long_only(self,
14 signals: pd.DataFrame,
15 current_positions: Dict[str, float] = None) -> Dict[str, float]:
16 """
17 Optimize portfolio weights using convex optimization.
18 Maximize signal while constraining turnover and concentration.
19 """
20 n = len(signals)
21 symbols = signals['symbol'].values
22 scores = signals['combined_score'].values
23
24 # Decision variables
25 weights = cp.Variable(n)
26
27 # Current weights
28 if current_positions is None:
29 w_current = np.zeros(n)
30 else:
31 w_current = np.array([
32 current_positions.get(sym, 0.0) for sym in symbols
33 ])
34
35 # Objective: maximize signal
36 objective = cp.Maximize(scores @ weights)
37
38 # Constraints
39 constraints = [
40 weights >= 0, # Long only
41 cp.sum(weights) == 1, # Fully invested
42 weights <= self.max_position_size, # Max position size
43 ]
44
45 # Turnover constraint
46 if current_positions is not None:
47 turnover = cp.norm(weights - w_current, 1)
48 constraints.append(turnover <= self.max_turnover)
49
50 # Solve
51 problem = cp.Problem(objective, constraints)
52 try:
53 problem.solve(solver=cp.ECOS)
54
55 if weights.value is None:
56 raise ValueError("Optimization failed")
57
58 # Return as dictionary, filtering small positions
59 result = {}
60 for sym, w in zip(symbols, weights.value):
61 if w > 0.001: # 0.1% minimum
62 result[sym] = w
63
64 return result
65
66 except Exception as e:
67 print(f"Optimization error: {e}")
68 # Fallback: equal weight top N stocks
69 return self.equal_weight_top_n(signals)
70
71 def equal_weight_top_n(self, signals: pd.DataFrame) -> Dict[str, float]:
72 """Fallback: equal weight top N stocks by signal."""
73 top_stocks = signals.nlargest(self.target_num_positions, 'combined_score')
74 weight = 1.0 / len(top_stocks)
75
76 return {
77 row['symbol']: weight
78 for _, row in top_stocks.iterrows()
79 }
80
81 def optimize_long_short(self,
82 signals: pd.DataFrame,
83 target_gross_exposure: float = 1.6,
84 target_net_exposure: float = 0.0) -> Dict[str, float]:
85 """
86 Long-short portfolio optimization.
87 """
88 n = len(signals)
89 symbols = signals['symbol'].values
90 scores = signals['combined_score'].values
91
92 weights = cp.Variable(n)
93
94 objective = cp.Maximize(scores @ weights)
95
96 constraints = [
97 # Gross exposure (sum of absolute values)
98 cp.norm(weights, 1) <= target_gross_exposure,
99
100 # Net exposure
101 cp.sum(weights) == target_net_exposure,
102
103 # Position limits
104 weights >= -self.max_position_size,
105 weights <= self.max_position_size,
106 ]
107
108 problem = cp.Problem(objective, constraints)
109 problem.solve(solver=cp.ECOS)
110
111 result = {}
112 for sym, w in zip(symbols, weights.value):
113 if abs(w) > 0.001:
114 result[sym] = w
115
116 return result
1171class FactorPortfolioExecutor:
2 def __init__(self, broker_interface):
3 self.broker = broker_interface
4 self.execution_cost_bps = 5 # 5 bps execution cost
5
6 def calculate_trades(self,
7 target_weights: Dict[str, float],
8 current_positions: Dict[str, float],
9 portfolio_value: float) -> List[dict]:
10 """
11 Calculate trades needed to reach target portfolio.
12 """
13 trades = []
14
15 all_symbols = set(target_weights.keys()) | set(current_positions.keys())
16
17 for symbol in all_symbols:
18 target_weight = target_weights.get(symbol, 0.0)
19 current_weight = current_positions.get(symbol, 0.0)
20
21 weight_diff = target_weight - current_weight
22
23 if abs(weight_diff) > 0.001: # Trade if > 0.1% difference
24 target_value = target_weight * portfolio_value
25 current_value = current_weight * portfolio_value
26 trade_value = target_value - current_value
27
28 # Get current price
29 price = self.broker.get_price(symbol)
30 shares = int(trade_value / price)
31
32 if shares != 0:
33 trades.append({
34 'symbol': symbol,
35 'shares': shares,
36 'price': price,
37 'value': shares * price
38 })
39
40 return trades
41
42 def execute_trades(self, trades: List[dict]) -> List[dict]:
43 """
44 Execute trades using TWAP/VWAP algorithms.
45 """
46 executions = []
47
48 # Sort by size (trade smaller positions first)
49 sorted_trades = sorted(trades, key=lambda t: abs(t['value']))
50
51 for trade in sorted_trades:
52 try:
53 # Submit TWAP order
54 order_id = self.broker.submit_twap_order(
55 symbol=trade['symbol'],
56 shares=trade['shares'],
57 duration_minutes=30
58 )
59
60 # Wait for fill (simplified)
61 fill = self.broker.wait_for_fill(order_id, timeout=60)
62
63 executions.append({
64 'symbol': trade['symbol'],
65 'shares': fill['shares'],
66 'avg_price': fill['avg_price'],
67 'execution_cost': self.calculate_execution_cost(trade, fill)
68 })
69
70 except Exception as e:
71 print(f"Execution error for {trade['symbol']}: {e}")
72
73 return executions
74
75 def calculate_execution_cost(self, trade: dict, fill: dict) -> float:
76 """Calculate execution cost vs mid price."""
77 mid_price = trade['price']
78 avg_fill_price = fill['avg_price']
79 shares = fill['shares']
80
81 if shares > 0: # Buy
82 slippage = avg_fill_price - mid_price
83 else: # Sell
84 slippage = mid_price - avg_fill_price
85
86 cost = slippage * abs(shares)
87 return cost
881class FactorPerformanceAttribution:
2 def __init__(self):
3 self.factor_returns = []
4
5 def calculate_factor_returns(self,
6 positions: Dict[str, float],
7 factor_exposures: pd.DataFrame,
8 returns: pd.Series) -> Dict[str, float]:
9 """
10 Decompose portfolio returns into factor contributions.
11 """
12 # Portfolio return
13 portfolio_return = sum(
14 weight * returns.get(symbol, 0.0)
15 for symbol, weight in positions.items()
16 )
17
18 # Calculate portfolio's factor exposures
19 portfolio_exposures = {}
20 for factor in ['value_score', 'momentum_score', 'quality_score', 'low_vol_score']:
21 exposure = sum(
22 weight * factor_exposures.loc[
23 factor_exposures['symbol'] == symbol, factor
24 ].iloc[0]
25 for symbol, weight in positions.items()
26 if symbol in factor_exposures['symbol'].values
27 )
28 portfolio_exposures[factor] = exposure
29
30 # Regress returns on factor exposures
31 # Simplified: assume factor returns are proportional to exposures
32 factor_contributions = {}
33 total_exposure = sum(abs(e) for e in portfolio_exposures.values())
34
35 if total_exposure > 0:
36 for factor, exposure in portfolio_exposures.items():
37 contribution = (exposure / total_exposure) * portfolio_return
38 factor_contributions[factor] = contribution
39
40 # Alpha is unexplained return
41 factor_contributions['alpha'] = (
42 portfolio_return - sum(factor_contributions.values())
43 )
44
45 return factor_contributions
46
47 def generate_attribution_report(self,
48 positions_history: List[Dict],
49 returns_history: pd.DataFrame) -> pd.DataFrame:
50 """
51 Generate comprehensive attribution report.
52 """
53 attributions = []
54
55 for positions, returns in zip(positions_history, returns_history):
56 attr = self.calculate_factor_returns(
57 positions['weights'],
58 positions['factor_exposures'],
59 returns
60 )
61 attr['date'] = positions['date']
62 attr['total_return'] = sum(
63 w * returns.get(s, 0.0)
64 for s, w in positions['weights'].items()
65 )
66 attributions.append(attr)
67
68 return pd.DataFrame(attributions)
691class FactorMonitor:
2 def __init__(self):
3 self.alerts = []
4
5 def check_factor_decay(self,
6 recent_attributions: pd.DataFrame,
7 lookback_days: int = 30) -> List[str]:
8 """
9 Detect if factor performance is deteriorating.
10 """
11 alerts = []
12
13 recent = recent_attributions.tail(lookback_days)
14
15 for factor in ['value_score', 'momentum_score', 'quality_score', 'low_vol_score']:
16 if factor not in recent.columns:
17 continue
18
19 # Check if factor has been consistently negative
20 factor_returns = recent[factor]
21 negative_days = (factor_returns < 0).sum()
22
23 if negative_days / len(recent) > 0.7: # 70% negative days
24 alerts.append(
25 f"WARNING: {factor} has been negative {negative_days}/{len(recent)} days"
26 )
27
28 # Check if factor return is significantly below historical
29 historical_mean = factor_returns.mean()
30 recent_mean = factor_returns.tail(5).mean()
31
32 if recent_mean < historical_mean - 2 * factor_returns.std():
33 alerts.append(
34 f"WARNING: {factor} recent performance significantly below average"
35 )
36
37 return alerts
38
39 def check_turnover(self,
40 trades: List[dict],
41 portfolio_value: float,
42 threshold: float = 0.5) -> List[str]:
43 """
44 Alert on excessive turnover.
45 """
46 alerts = []
47
48 total_traded = sum(abs(t['value']) for t in trades)
49 turnover = total_traded / portfolio_value
50
51 if turnover > threshold:
52 alerts.append(
53 f"WARNING: Turnover {turnover:.1%} exceeds threshold {threshold:.1%}"
54 )
55
56 return alerts
57
58 def check_concentration(self,
59 positions: Dict[str, float],
60 max_position: float = 0.10) -> List[str]:
61 """
62 Alert on position concentration.
63 """
64 alerts = []
65
66 for symbol, weight in positions.items():
67 if weight > max_position:
68 alerts.append(
69 f"WARNING: {symbol} position {weight:.1%} exceeds limit {max_position:.1%}"
70 )
71
72 # Check sector concentration (simplified)
73 top_5_weight = sum(sorted(positions.values(), reverse=True)[:5])
74 if top_5_weight > 0.40: # Top 5 > 40%
75 alerts.append(
76 f"WARNING: Top 5 positions account for {top_5_weight:.1%} of portfolio"
77 )
78
79 return alerts
80Our factor model performance (2020-2024):
1Metric Value
2──────────────────────────────────────
3Annualized Return 14.2%
4Sharpe Ratio 1.18
5Max Drawdown -22.3%
6Win Rate (monthly) 58%
7Information Ratio 0.85
8Average Turnover 32%/month
9Avg Execution Cost 4.2 bps
10Factor contribution:
1Factor Annual Return t-stat
2───────────────────────────────────────
3Value 2.8% 1.9
4Momentum 4.1% 2.8
5Quality 3.9% 2.6
6Low Vol 2.2% 1.5
7Alpha 1.2% 0.8
8Factor investing works, but requires discipline. The temptation to deviate from the model during drawdowns is strong—don't do it.
Systematic factor investing has democratized alpha generation. With proper implementation, it's one of the most reliable sources of returns in quantitative finance.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.