Bad data causes bad trades. After implementing real-time data quality monitoring that caught 142 data issues before they reached trading algorithms (2021-2024), I've learned that continuous validation is essential for reliable trading systems. This article shares production data quality monitoring.
Production incidents from data quality issues:
1Data Stream Quality Checks Alerting
2─────────────── ──────────────── ─────────────
3Kafka (ticks) → Freshness → Slack
4 Completeness → PagerDuty
5 Validity → Metrics
6 Statistical → Dashboard
71from dataclasses import dataclass
2from typing import Callable, Optional, Dict, Any, List
3from datetime import datetime, timedelta
4from enum import Enum
5import statistics
6
7class Severity(Enum):
8 INFO = 1
9 WARNING = 2
10 CRITICAL = 3
11
12@dataclass
13class QualityIssue:
14 """Represents a data quality issue."""
15 check_name: str
16 severity: Severity
17 message: str
18 timestamp: datetime
19 metrics: Dict[str, Any]
20
21class QualityCheck:
22 """Base class for data quality checks."""
23
24 def __init__(self, name: str, severity: Severity = Severity.WARNING):
25 self.name = name
26 self.severity = severity
27
28 def check(self, data: Any) -> Optional[QualityIssue]:
29 """
30 Check data quality.
31
32 Returns:
33 QualityIssue if check failed, None otherwise
34 """
35 raise NotImplementedError
36
37class QualityMonitor:
38 """Monitor data quality with multiple checks."""
39
40 def __init__(self):
41 self.checks: List[QualityCheck] = []
42 self.issues: List[QualityIssue] = []
43
44 def add_check(self, check: QualityCheck):
45 """Add quality check."""
46 self.checks.append(check)
47
48 def run_checks(self, data: Any) -> List[QualityIssue]:
49 """Run all checks on data."""
50 issues = []
51
52 for check in self.checks:
53 issue = check.check(data)
54 if issue:
55 issues.append(issue)
56 self.issues.append(issue)
57
58 return issues
591import time
2
3class FreshnessCheck(QualityCheck):
4 """Check if data is fresh (recent)."""
5
6 def __init__(self, max_age_seconds: float, severity: Severity = Severity.CRITICAL):
7 super().__init__("freshness", severity)
8 self.max_age_seconds = max_age_seconds
9
10 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
11 """Check data timestamp freshness."""
12 if 'timestamp' not in data:
13 return QualityIssue(
14 check_name=self.name,
15 severity=Severity.CRITICAL,
16 message="Missing timestamp field",
17 timestamp=datetime.utcnow(),
18 metrics={}
19 )
20
21 # Parse timestamp (nanoseconds)
22 data_time = datetime.fromtimestamp(data['timestamp'] / 1e9)
23 age = (datetime.utcnow() - data_time).total_seconds()
24
25 if age > self.max_age_seconds:
26 return QualityIssue(
27 check_name=self.name,
28 severity=self.severity,
29 message=f"Stale data: {age:.1f}s old (limit: {self.max_age_seconds}s)",
30 timestamp=datetime.utcnow(),
31 metrics={'age_seconds': age, 'data_timestamp': data_time.isoformat()}
32 )
33
34 return None
35
36class GapDetectionCheck(QualityCheck):
37 """Detect gaps in time-series data."""
38
39 def __init__(self, expected_interval_ms: float, gap_threshold_multiplier: float = 5.0):
40 super().__init__("gap_detection", Severity.WARNING)
41 self.expected_interval_ms = expected_interval_ms
42 self.gap_threshold_multiplier = gap_threshold_multiplier
43 self.last_timestamp: Optional[int] = None
44
45 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
46 """Detect time gaps between messages."""
47 current_timestamp = data['timestamp']
48
49 if self.last_timestamp is None:
50 self.last_timestamp = current_timestamp
51 return None
52
53 gap_ms = (current_timestamp - self.last_timestamp) / 1e6
54
55 if gap_ms > self.expected_interval_ms * self.gap_threshold_multiplier:
56 issue = QualityIssue(
57 check_name=self.name,
58 severity=Severity.WARNING if gap_ms < self.expected_interval_ms * 10 else Severity.CRITICAL,
59 message=f"Data gap detected: {gap_ms:.0f}ms (expected: {self.expected_interval_ms}ms)",
60 timestamp=datetime.utcnow(),
61 metrics={
62 'gap_ms': gap_ms,
63 'expected_ms': self.expected_interval_ms,
64 'last_timestamp': self.last_timestamp,
65 'current_timestamp': current_timestamp
66 }
67 )
68
69 self.last_timestamp = current_timestamp
70 return issue
71
72 self.last_timestamp = current_timestamp
73 return None
741from decimal import Decimal
2
3class SchemaCheck(QualityCheck):
4 """Validate data schema."""
5
6 def __init__(self, required_fields: List[str], field_types: Dict[str, type]):
7 super().__init__("schema", Severity.CRITICAL)
8 self.required_fields = required_fields
9 self.field_types = field_types
10
11 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
12 """Check required fields and types."""
13 # Check required fields
14 missing_fields = []
15 for field in self.required_fields:
16 if field not in data:
17 missing_fields.append(field)
18
19 if missing_fields:
20 return QualityIssue(
21 check_name=self.name,
22 severity=Severity.CRITICAL,
23 message=f"Missing required fields: {missing_fields}",
24 timestamp=datetime.utcnow(),
25 metrics={'missing_fields': missing_fields}
26 )
27
28 # Check field types
29 type_errors = []
30 for field, expected_type in self.field_types.items():
31 if field in data and not isinstance(data[field], expected_type):
32 type_errors.append({
33 'field': field,
34 'expected': expected_type.__name__,
35 'actual': type(data[field]).__name__
36 })
37
38 if type_errors:
39 return QualityIssue(
40 check_name=self.name,
41 severity=Severity.CRITICAL,
42 message=f"Type errors: {type_errors}",
43 timestamp=datetime.utcnow(),
44 metrics={'type_errors': type_errors}
45 )
46
47 return None
48
49class RangeCheck(QualityCheck):
50 """Check if numeric values are within expected ranges."""
51
52 def __init__(self, field: str, min_value: float, max_value: float):
53 super().__init__(f"range_{field}", Severity.WARNING)
54 self.field = field
55 self.min_value = min_value
56 self.max_value = max_value
57
58 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
59 """Check field is within range."""
60 if self.field not in data:
61 return None
62
63 value = float(data[self.field])
64
65 if value < self.min_value or value > self.max_value:
66 return QualityIssue(
67 check_name=self.name,
68 severity=Severity.CRITICAL if value <= 0 else Severity.WARNING,
69 message=f"{self.field}={value} outside range [{self.min_value}, {self.max_value}]",
70 timestamp=datetime.utcnow(),
71 metrics={
72 'field': self.field,
73 'value': value,
74 'min': self.min_value,
75 'max': self.max_value
76 }
77 )
78
79 return None
801import numpy as np
2from collections import deque
3
4class StatisticalOutlierCheck(QualityCheck):
5 """Detect statistical outliers using z-score."""
6
7 def __init__(self, field: str, window_size: int = 1000, zscore_threshold: float = 5.0):
8 super().__init__(f"outlier_{field}", Severity.WARNING)
9 self.field = field
10 self.window_size = window_size
11 self.zscore_threshold = zscore_threshold
12 self.values = deque(maxlen=window_size)
13
14 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
15 """Detect outliers using z-score."""
16 if self.field not in data:
17 return None
18
19 value = float(data[self.field])
20 self.values.append(value)
21
22 if len(self.values) < 30: # Need minimum samples
23 return None
24
25 # Calculate z-score
26 mean = statistics.mean(self.values)
27 stdev = statistics.stdev(self.values)
28
29 if stdev == 0:
30 return None
31
32 zscore = abs((value - mean) / stdev)
33
34 if zscore > self.zscore_threshold:
35 return QualityIssue(
36 check_name=self.name,
37 severity=Severity.CRITICAL if zscore > 10 else Severity.WARNING,
38 message=f"{self.field}={value} is outlier (z-score: {zscore:.2f})",
39 timestamp=datetime.utcnow(),
40 metrics={
41 'field': self.field,
42 'value': value,
43 'mean': mean,
44 'stdev': stdev,
45 'zscore': zscore
46 }
47 )
48
49 return None
50
51class SpreadCheck(QualityCheck):
52 """Check bid-ask spread is reasonable."""
53
54 def __init__(self, max_spread_bps: float = 100):
55 super().__init__("spread", Severity.WARNING)
56 self.max_spread_bps = max_spread_bps
57
58 def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
59 """Check bid-ask spread."""
60 if 'bid' not in data or 'ask' not in data:
61 return None
62
63 bid = float(data['bid'])
64 ask = float(data['ask'])
65
66 if bid >= ask:
67 return QualityIssue(
68 check_name=self.name,
69 severity=Severity.CRITICAL,
70 message=f"Crossed market: bid={bid} >= ask={ask}",
71 timestamp=datetime.utcnow(),
72 metrics={'bid': bid, 'ask': ask}
73 )
74
75 mid = (bid + ask) / 2
76 spread_bps = ((ask - bid) / mid) * 10000
77
78 if spread_bps > self.max_spread_bps:
79 return QualityIssue(
80 check_name=self.name,
81 severity=Severity.WARNING,
82 message=f"Wide spread: {spread_bps:.1f}bps (limit: {self.max_spread_bps}bps)",
83 timestamp=datetime.utcnow(),
84 metrics={'spread_bps': spread_bps, 'bid': bid, 'ask': ask}
85 )
86
87 return None
881from kafka import KafkaConsumer
2import json
3
4class StreamQualityMonitor:
5 """Monitor Kafka stream data quality in real-time."""
6
7 def __init__(self, topic: str, bootstrap_servers: str):
8 self.consumer = KafkaConsumer(
9 topic,
10 bootstrap_servers=bootstrap_servers,
11 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
12 auto_offset_reset='latest',
13 )
14
15 self.monitor = QualityMonitor()
16 self.setup_checks()
17
18 # Metrics
19 self.messages_processed = 0
20 self.issues_detected = 0
21
22 def setup_checks(self):
23 """Configure quality checks."""
24 # Freshness: data should be < 1 second old
25 self.monitor.add_check(FreshnessCheck(max_age_seconds=1.0))
26
27 # Gap detection: expect tick every ~100ms
28 self.monitor.add_check(GapDetectionCheck(expected_interval_ms=100))
29
30 # Schema validation
31 self.monitor.add_check(SchemaCheck(
32 required_fields=['timestamp', 'symbol', 'price', 'size'],
33 field_types={
34 'timestamp': int,
35 'symbol': str,
36 'price': (int, float, Decimal),
37 'size': int
38 }
39 ))
40
41 # Price range: $0.01 to $100,000
42 self.monitor.add_check(RangeCheck('price', min_value=0.01, max_value=100000))
43
44 # Size range: 1 to 1,000,000 shares
45 self.monitor.add_check(RangeCheck('size', min_value=1, max_value=1000000))
46
47 # Statistical outlier detection
48 self.monitor.add_check(StatisticalOutlierCheck('price', window_size=1000, zscore_threshold=5.0))
49
50 # Spread check (if quote data)
51 self.monitor.add_check(SpreadCheck(max_spread_bps=100))
52
53 def run(self):
54 """Start monitoring stream."""
55 print("Starting stream quality monitoring...")
56
57 for message in self.consumer:
58 self.messages_processed += 1
59
60 # Run quality checks
61 issues = self.monitor.run_checks(message.value)
62
63 if issues:
64 self.issues_detected += len(issues)
65
66 for issue in issues:
67 self.handle_issue(issue)
68
69 # Log stats every 10k messages
70 if self.messages_processed % 10000 == 0:
71 print(f"Processed {self.messages_processed} messages, "
72 f"detected {self.issues_detected} issues "
73 f"({self.issues_detected/self.messages_processed*100:.2f}%)")
74
75 def handle_issue(self, issue: QualityIssue):
76 """Handle detected quality issue."""
77 print(f"[{issue.severity.name}] {issue.check_name}: {issue.message}")
78
79 if issue.severity == Severity.CRITICAL:
80 # Alert on-call engineer
81 self.send_pagerduty_alert(issue)
82 elif issue.severity == Severity.WARNING:
83 # Post to Slack
84 self.send_slack_alert(issue)
85
86 # Always log metrics
87 self.log_metrics(issue)
88
89 def send_pagerduty_alert(self, issue: QualityIssue):
90 """Send critical alert to PagerDuty."""
91 # Implementation: POST to PagerDuty Events API
92 pass
93
94 def send_slack_alert(self, issue: QualityIssue):
95 """Send warning to Slack channel."""
96 # Implementation: POST to Slack webhook
97 pass
98
99 def log_metrics(self, issue: QualityIssue):
100 """Log issue metrics for dashboards."""
101 # Implementation: Send to Prometheus/CloudWatch
102 pass
1031from prometheus_client import Counter, Histogram, Gauge, start_http_server
2
3# Define metrics
4messages_processed = Counter('data_quality_messages_total', 'Total messages processed')
5issues_detected = Counter('data_quality_issues_total', 'Total issues detected',
6 ['severity', 'check_name'])
7data_freshness = Histogram('data_freshness_seconds', 'Data age in seconds',
8 buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0])
9gap_size = Histogram('data_gap_milliseconds', 'Time gap between messages',
10 buckets=[10, 50, 100, 500, 1000, 5000])
11
12class MetricsCollector:
13 """Collect data quality metrics for Prometheus."""
14
15 def record_message(self):
16 """Record message processed."""
17 messages_processed.inc()
18
19 def record_issue(self, issue: QualityIssue):
20 """Record quality issue."""
21 issues_detected.labels(
22 severity=issue.severity.name,
23 check_name=issue.check_name
24 ).inc()
25
26 def record_freshness(self, age_seconds: float):
27 """Record data freshness."""
28 data_freshness.observe(age_seconds)
29
30 def record_gap(self, gap_ms: float):
31 """Record data gap."""
32 gap_size.observe(gap_ms)
33
34# Start Prometheus metrics server
35start_http_server(9090)
36Quality issues detected (2021-2024):
1Issue Type Count Avg Severity Production Impact Prevented
2─────────────────────────────────────────────────────────────────────────────────
3Stale data (>1s) 42 Critical Bad price data → wrong trades
4Missing ticks (gaps) 31 Warning Incomplete market view
5Schema changes 8 Critical Parser crashes
6Price outliers 45 Warning Invalid signals
7Crossed markets 6 Critical Arbitrage false positives
8Wide spreads 10 Warning Poor execution
912023-06-15 09:31:42 [CRITICAL] freshness: Stale data: 5.2s old (limit: 1.0s)
2 Symbol: AAPL
3 Last update: 09:31:37
4 Current time: 09:31:42
5
6Action: Automatic trading halt for AAPL
7Root cause: Exchange API rate limit hit
8Resolution: Switched to backup feed, resumed trading in 15 seconds
9Impact: $0 (caught before bad trades)
10Data quality monitoring is essential for production trading. The issues it prevents far outweigh the monitoring costs.
Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.