News moves markets. After building NLP systems processing 1M+ articles/day for trading signals, I've learned that transformer models (BERT, FinBERT) dramatically outperform lexicon-based sentiment. This article covers production NLP for alpha generation.
Information drives price discovery:
Challenge: Extract signal from noise with < 1 second latency.
1import torch
2from transformers import AutoTokenizer, AutoModelForSequenceClassification
3import numpy as np
4from typing import List, Dict
5
6class FinBERTSentiment:
7 """
8 FinBERT model fine-tuned on financial texts.
9 Classifies sentiment as positive/neutral/negative.
10 """
11
12 def __init__(self, model_name: str = "ProsusAI/finbert"):
13 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
14
15 # Load model and tokenizer
16 self.tokenizer = AutoTokenizer.from_pretrained(model_name)
17 self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
18 self.model.to(self.device)
19 self.model.eval()
20
21 self.labels = ["positive", "negative", "neutral"]
22
23 def predict(self, texts: List[str], batch_size: int = 32) -> List[Dict]:
24 """
25 Predict sentiment for list of texts.
26
27 Returns:
28 List of dicts with sentiment scores
29 """
30 results = []
31
32 for i in range(0, len(texts), batch_size):
33 batch = texts[i:i+batch_size]
34
35 # Tokenize
36 inputs = self.tokenizer(
37 batch,
38 padding=True,
39 truncation=True,
40 max_length=512,
41 return_tensors="pt"
42 ).to(self.device)
43
44 # Predict
45 with torch.no_grad():
46 outputs = self.model(**inputs)
47 probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
48
49 # Convert to results
50 for prob in probs.cpu().numpy():
51 sentiment_scores = {
52 label: float(score)
53 for label, score in zip(self.labels, prob)
54 }
55
56 # Overall sentiment score (-1 to +1)
57 sentiment_value = (
58 sentiment_scores['positive'] -
59 sentiment_scores['negative']
60 )
61
62 results.append({
63 'sentiment': max(sentiment_scores, key=sentiment_scores.get),
64 'scores': sentiment_scores,
65 'value': sentiment_value,
66 'confidence': max(prob)
67 })
68
69 return results
70
71 def predict_single(self, text: str) -> Dict:
72 """Fast prediction for single text."""
73 return self.predict([text])[0]
74
75# Example usage
76if __name__ == "__main__":
77 model = FinBERTSentiment()
78
79 headlines = [
80 "Apple reports record quarterly earnings, beats estimates",
81 "Tesla recalls 2 million vehicles over safety concerns",
82 "Fed holds rates steady, signals potential cuts in 2024"
83 ]
84
85 results = model.predict(headlines)
86
87 for headline, result in zip(headlines, results):
88 print(f"\nHeadline: {headline}")
89 print(f"Sentiment: {result['sentiment']} ({result['value']:.3f})")
90 print(f"Confidence: {result['confidence']:.3f}")
911from transformers import pipeline
2import re
3
4class FinancialNER:
5 """
6 Extract company names, tickers, and financial entities.
7 """
8
9 def __init__(self):
10 # Load NER model
11 self.ner = pipeline(
12 "ner",
13 model="dbmdz/bert-large-cased-finetuned-conll03-english",
14 aggregation_strategy="simple"
15 )
16
17 # Company ticker mapping (would load from database in production)
18 self.ticker_map = {
19 "apple": "AAPL",
20 "microsoft": "MSFT",
21 "tesla": "TSLA",
22 "amazon": "AMZN",
23 "google": "GOOGL",
24 "meta": "META",
25 "nvidia": "NVDA"
26 }
27
28 def extract_entities(self, text: str) -> Dict:
29 """
30 Extract organizations, persons, and financial metrics.
31 """
32 # Run NER
33 entities = self.ner(text)
34
35 # Filter for organizations
36 orgs = [
37 e['word'] for e in entities
38 if e['entity_group'] == 'ORG'
39 ]
40
41 # Extract tickers mentioned explicitly
42 ticker_pattern = r'\b[A-Z]{1,5}\b'
43 potential_tickers = re.findall(ticker_pattern, text)
44
45 # Map company names to tickers
46 tickers = []
47 for org in orgs:
48 org_lower = org.lower()
49 if org_lower in self.ticker_map:
50 tickers.append(self.ticker_map[org_lower])
51
52 # Add explicit tickers
53 tickers.extend([
54 t for t in potential_tickers
55 if len(t) <= 5 and t not in tickers
56 ])
57
58 # Extract financial numbers
59 money_pattern = r'\$[0-9]+(?:\.[0-9]+)?(?:[BMK])?'
60 percent_pattern = r'[0-9]+(?:\.[0-9]+)?%'
61
62 money_values = re.findall(money_pattern, text)
63 percentages = re.findall(percent_pattern, text)
64
65 return {
66 'organizations': orgs,
67 'tickers': list(set(tickers)),
68 'money_values': money_values,
69 'percentages': percentages,
70 'entities': entities
71 }
72
73# Example
74if __name__ == "__main__":
75 ner = FinancialNER()
76
77 text = "Apple (AAPL) reported Q4 revenue of $89.5B, up 8% YoY"
78
79 entities = ner.extract_entities(text)
80 print(f"Tickers: {entities['tickers']}")
81 print(f"Money: {entities['money_values']}")
82 print(f"Percentages: {entities['percentages']}")
831import re
2from dataclasses import dataclass
3from typing import Optional
4
5@dataclass
6class EarningsEvent:
7 ticker: str
8 metric: str # 'revenue' or 'eps'
9 actual: float
10 estimate: float
11 surprise_pct: float
12 beat: bool
13
14 def signal_strength(self) -> float:
15 """Calculate trading signal strength from surprise."""
16 # Larger surprises = stronger signals
17 return abs(self.surprise_pct) / 100.0
18
19class EarningsExtractor:
20 """
21 Extract earnings results from headlines and calculate beats/misses.
22 """
23
24 def __init__(self):
25 # Patterns for earnings information
26 self.eps_pattern = re.compile(
27 r'EPS.*?\$?([0-9]+\.[0-9]+).*?(?:vs|versus|est|estimate).*?\$?([0-9]+\.[0-9]+)',
28 re.IGNORECASE
29 )
30
31 self.revenue_pattern = re.compile(
32 r'revenue.*?\$?([0-9]+(?:\.[0-9]+)?)\s*([BMK])?.*?(?:vs|versus|est|estimate).*?\$?([0-9]+(?:\.[0-9]+)?)\s*([BMK])?',
33 re.IGNORECASE
34 )
35
36 def parse_revenue(self, text: str, ticker: str) -> Optional[EarningsEvent]:
37 """Extract revenue beat/miss."""
38 match = self.revenue_pattern.search(text)
39
40 if not match:
41 return None
42
43 actual_value = float(match.group(1))
44 actual_unit = match.group(2) or ''
45
46 estimate_value = float(match.group(3))
47 estimate_unit = match.group(4) or ''
48
49 # Convert to billions
50 multiplier = {'B': 1, 'M': 0.001, 'K': 0.000001, '': 1}
51 actual = actual_value * multiplier.get(actual_unit, 1)
52 estimate = estimate_value * multiplier.get(estimate_unit, 1)
53
54 surprise_pct = ((actual - estimate) / estimate) * 100
55
56 return EarningsEvent(
57 ticker=ticker,
58 metric='revenue',
59 actual=actual,
60 estimate=estimate,
61 surprise_pct=surprise_pct,
62 beat=actual > estimate
63 )
64
65 def parse_eps(self, text: str, ticker: str) -> Optional[EarningsEvent]:
66 """Extract EPS beat/miss."""
67 match = self.eps_pattern.search(text)
68
69 if not match:
70 return None
71
72 actual = float(match.group(1))
73 estimate = float(match.group(2))
74
75 surprise_pct = ((actual - estimate) / estimate) * 100 if estimate != 0 else 0
76
77 return EarningsEvent(
78 ticker=ticker,
79 metric='eps',
80 actual=actual,
81 estimate=estimate,
82 surprise_pct=surprise_pct,
83 beat=actual > estimate
84 )
85
86 def extract_earnings(self, text: str, ticker: str) -> List[EarningsEvent]:
87 """Extract all earnings events from text."""
88 events = []
89
90 eps_event = self.parse_eps(text, ticker)
91 if eps_event:
92 events.append(eps_event)
93
94 revenue_event = self.parse_revenue(text, ticker)
95 if revenue_event:
96 events.append(revenue_event)
97
98 return events
99
100# Example
101if __name__ == "__main__":
102 extractor = EarningsExtractor()
103
104 headline = "AAPL reports Q4 EPS $1.46 vs est $1.39, revenue $89.5B vs est $89.3B"
105
106 events = extractor.extract_earnings(headline, "AAPL")
107
108 for event in events:
109 direction = "BEAT" if event.beat else "MISS"
110 print(f"{event.ticker} {event.metric.upper()} {direction}: "
111 f"{event.surprise_pct:+.2f}% "
112 f"(signal strength: {event.signal_strength():.3f})")
1131import asyncio
2import aiohttp
3from datetime import datetime
4from collections import deque
5import time
6
7class NewsProcessor:
8 """
9 Real-time news processing pipeline with sub-second latency.
10 """
11
12 def __init__(self,
13 sentiment_model: FinBERTSentiment,
14 ner_model: FinancialNER,
15 earnings_extractor: EarningsExtractor):
16 self.sentiment = sentiment_model
17 self.ner = ner_model
18 self.earnings = earnings_extractor
19
20 # Recent articles cache (dedupe)
21 self.seen_articles = deque(maxlen=10000)
22
23 # Signal queue
24 self.signals = asyncio.Queue()
25
26 async def process_article(self, article: Dict) -> Optional[Dict]:
27 """
28 Process single article and generate trading signal.
29
30 Returns:
31 Trading signal dict or None
32 """
33 start_time = time.perf_counter()
34
35 # Deduplicate
36 article_id = article.get('id') or hash(article['headline'])
37 if article_id in self.seen_articles:
38 return None
39
40 self.seen_articles.append(article_id)
41
42 headline = article['headline']
43 body = article.get('body', '')
44
45 # Extract entities
46 entities = self.ner.extract_entities(headline + ' ' + body)
47
48 if not entities['tickers']:
49 return None # No relevant companies
50
51 # Sentiment analysis
52 sentiment = self.sentiment.predict_single(headline)
53
54 # Check for earnings events
55 earnings_events = []
56 for ticker in entities['tickers']:
57 events = self.earnings.extract_earnings(headline, ticker)
58 earnings_events.extend(events)
59
60 latency_ms = (time.perf_counter() - start_time) * 1000
61
62 signal = {
63 'timestamp': datetime.utcnow().isoformat(),
64 'tickers': entities['tickers'],
65 'sentiment': sentiment['value'],
66 'sentiment_confidence': sentiment['confidence'],
67 'earnings_events': [
68 {
69 'ticker': e.ticker,
70 'metric': e.metric,
71 'surprise_pct': e.surprise_pct,
72 'beat': e.beat
73 }
74 for e in earnings_events
75 ],
76 'headline': headline,
77 'latency_ms': latency_ms
78 }
79
80 return signal
81
82 async def stream_news(self, news_feed_url: str):
83 """Stream news from API and process in real-time."""
84 async with aiohttp.ClientSession() as session:
85 while True:
86 try:
87 async with session.get(news_feed_url) as response:
88 articles = await response.json()
89
90 # Process articles concurrently
91 tasks = [
92 self.process_article(article)
93 for article in articles
94 ]
95
96 signals = await asyncio.gather(*tasks)
97
98 # Emit non-None signals
99 for signal in signals:
100 if signal:
101 await self.signals.put(signal)
102
103 await asyncio.sleep(1) # Poll every second
104
105 except Exception as e:
106 print(f"Error processing news: {e}")
107 await asyncio.sleep(5)
108Production metrics from our NLP trading system (2022-2024):
1Signal Type Sharpe Hit Rate Avg Latency
2──────────────────────────────────────────────────────
3FinBERT Sentiment 1.8 56.3% 245ms
4Lexicon (VADER) 0.9 52.1% 12ms
5Earnings Surprise 2.4 61.2% 180ms
6Combined Ensemble 2.1 58.7% 280ms
71Event Type Precision Recall F1
2────────────────────────────────────────────
3Earnings Beat/Miss 0.89 0.85 0.87
4M&A Announcement 0.92 0.78 0.84
5Regulatory Action 0.76 0.71 0.73
6Management Change 0.81 0.69 0.74
7After 3+ years production NLP trading:
NLP provides genuine alpha in trading, but requires careful engineering for low latency and high accuracy.
Master NLP—it's a significant edge in information-driven trading.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.