NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 31, 2024
•
NordVarg Team
•

Real-Time Data Quality Monitoring

Data Engineeringdata-qualitymonitoringstreamingkafkapython
8 min read
Share:

Bad data causes bad trades. After implementing real-time data quality monitoring that caught 142 data issues before they reached trading algorithms (2021-2024), I've learned that continuous validation is essential for reliable trading systems. This article shares production data quality monitoring.

Why Real-Time Quality Monitoring#

Production incidents from data quality issues:

  • Stale data: Exchange feed lag caused trades on 5-minute-old prices (2020, $45k loss)
  • Missing ticks: Market data gap during volatility spike (2021)
  • Price spike: Bad tick (AAPL $0.01) triggered stop losses (2022)
  • Schema change: Exchange added field, broke parser (2023)

Architecture#

plaintext
1Data Stream          Quality Checks       Alerting
2───────────────      ────────────────     ─────────────
3Kafka (ticks)   →    Freshness           → Slack
4                     Completeness        → PagerDuty  
5                     Validity            → Metrics
6                     Statistical         → Dashboard
7

Quality Checks Framework#

python
1from dataclasses import dataclass
2from typing import Callable, Optional, Dict, Any, List
3from datetime import datetime, timedelta
4from enum import Enum
5import statistics
6
7class Severity(Enum):
8    INFO = 1
9    WARNING = 2
10    CRITICAL = 3
11
12@dataclass
13class QualityIssue:
14    """Represents a data quality issue."""
15    check_name: str
16    severity: Severity
17    message: str
18    timestamp: datetime
19    metrics: Dict[str, Any]
20    
21class QualityCheck:
22    """Base class for data quality checks."""
23    
24    def __init__(self, name: str, severity: Severity = Severity.WARNING):
25        self.name = name
26        self.severity = severity
27        
28    def check(self, data: Any) -> Optional[QualityIssue]:
29        """
30        Check data quality.
31        
32        Returns:
33            QualityIssue if check failed, None otherwise
34        """
35        raise NotImplementedError
36
37class QualityMonitor:
38    """Monitor data quality with multiple checks."""
39    
40    def __init__(self):
41        self.checks: List[QualityCheck] = []
42        self.issues: List[QualityIssue] = []
43        
44    def add_check(self, check: QualityCheck):
45        """Add quality check."""
46        self.checks.append(check)
47        
48    def run_checks(self, data: Any) -> List[QualityIssue]:
49        """Run all checks on data."""
50        issues = []
51        
52        for check in self.checks:
53            issue = check.check(data)
54            if issue:
55                issues.append(issue)
56                self.issues.append(issue)
57        
58        return issues
59

Freshness Checks#

python
1import time
2
3class FreshnessCheck(QualityCheck):
4    """Check if data is fresh (recent)."""
5    
6    def __init__(self, max_age_seconds: float, severity: Severity = Severity.CRITICAL):
7        super().__init__("freshness", severity)
8        self.max_age_seconds = max_age_seconds
9        
10    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
11        """Check data timestamp freshness."""
12        if 'timestamp' not in data:
13            return QualityIssue(
14                check_name=self.name,
15                severity=Severity.CRITICAL,
16                message="Missing timestamp field",
17                timestamp=datetime.utcnow(),
18                metrics={}
19            )
20        
21        # Parse timestamp (nanoseconds)
22        data_time = datetime.fromtimestamp(data['timestamp'] / 1e9)
23        age = (datetime.utcnow() - data_time).total_seconds()
24        
25        if age > self.max_age_seconds:
26            return QualityIssue(
27                check_name=self.name,
28                severity=self.severity,
29                message=f"Stale data: {age:.1f}s old (limit: {self.max_age_seconds}s)",
30                timestamp=datetime.utcnow(),
31                metrics={'age_seconds': age, 'data_timestamp': data_time.isoformat()}
32            )
33        
34        return None
35
36class GapDetectionCheck(QualityCheck):
37    """Detect gaps in time-series data."""
38    
39    def __init__(self, expected_interval_ms: float, gap_threshold_multiplier: float = 5.0):
40        super().__init__("gap_detection", Severity.WARNING)
41        self.expected_interval_ms = expected_interval_ms
42        self.gap_threshold_multiplier = gap_threshold_multiplier
43        self.last_timestamp: Optional[int] = None
44        
45    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
46        """Detect time gaps between messages."""
47        current_timestamp = data['timestamp']
48        
49        if self.last_timestamp is None:
50            self.last_timestamp = current_timestamp
51            return None
52        
53        gap_ms = (current_timestamp - self.last_timestamp) / 1e6
54        
55        if gap_ms > self.expected_interval_ms * self.gap_threshold_multiplier:
56            issue = QualityIssue(
57                check_name=self.name,
58                severity=Severity.WARNING if gap_ms < self.expected_interval_ms * 10 else Severity.CRITICAL,
59                message=f"Data gap detected: {gap_ms:.0f}ms (expected: {self.expected_interval_ms}ms)",
60                timestamp=datetime.utcnow(),
61                metrics={
62                    'gap_ms': gap_ms,
63                    'expected_ms': self.expected_interval_ms,
64                    'last_timestamp': self.last_timestamp,
65                    'current_timestamp': current_timestamp
66                }
67            )
68            
69            self.last_timestamp = current_timestamp
70            return issue
71        
72        self.last_timestamp = current_timestamp
73        return None
74

Validity Checks#

python
1from decimal import Decimal
2
3class SchemaCheck(QualityCheck):
4    """Validate data schema."""
5    
6    def __init__(self, required_fields: List[str], field_types: Dict[str, type]):
7        super().__init__("schema", Severity.CRITICAL)
8        self.required_fields = required_fields
9        self.field_types = field_types
10        
11    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
12        """Check required fields and types."""
13        # Check required fields
14        missing_fields = []
15        for field in self.required_fields:
16            if field not in data:
17                missing_fields.append(field)
18        
19        if missing_fields:
20            return QualityIssue(
21                check_name=self.name,
22                severity=Severity.CRITICAL,
23                message=f"Missing required fields: {missing_fields}",
24                timestamp=datetime.utcnow(),
25                metrics={'missing_fields': missing_fields}
26            )
27        
28        # Check field types
29        type_errors = []
30        for field, expected_type in self.field_types.items():
31            if field in data and not isinstance(data[field], expected_type):
32                type_errors.append({
33                    'field': field,
34                    'expected': expected_type.__name__,
35                    'actual': type(data[field]).__name__
36                })
37        
38        if type_errors:
39            return QualityIssue(
40                check_name=self.name,
41                severity=Severity.CRITICAL,
42                message=f"Type errors: {type_errors}",
43                timestamp=datetime.utcnow(),
44                metrics={'type_errors': type_errors}
45            )
46        
47        return None
48
49class RangeCheck(QualityCheck):
50    """Check if numeric values are within expected ranges."""
51    
52    def __init__(self, field: str, min_value: float, max_value: float):
53        super().__init__(f"range_{field}", Severity.WARNING)
54        self.field = field
55        self.min_value = min_value
56        self.max_value = max_value
57        
58    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
59        """Check field is within range."""
60        if self.field not in data:
61            return None
62        
63        value = float(data[self.field])
64        
65        if value < self.min_value or value > self.max_value:
66            return QualityIssue(
67                check_name=self.name,
68                severity=Severity.CRITICAL if value <= 0 else Severity.WARNING,
69                message=f"{self.field}={value} outside range [{self.min_value}, {self.max_value}]",
70                timestamp=datetime.utcnow(),
71                metrics={
72                    'field': self.field,
73                    'value': value,
74                    'min': self.min_value,
75                    'max': self.max_value
76                }
77            )
78        
79        return None
80

Statistical Outlier Detection#

python
1import numpy as np
2from collections import deque
3
4class StatisticalOutlierCheck(QualityCheck):
5    """Detect statistical outliers using z-score."""
6    
7    def __init__(self, field: str, window_size: int = 1000, zscore_threshold: float = 5.0):
8        super().__init__(f"outlier_{field}", Severity.WARNING)
9        self.field = field
10        self.window_size = window_size
11        self.zscore_threshold = zscore_threshold
12        self.values = deque(maxlen=window_size)
13        
14    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
15        """Detect outliers using z-score."""
16        if self.field not in data:
17            return None
18        
19        value = float(data[self.field])
20        self.values.append(value)
21        
22        if len(self.values) < 30:  # Need minimum samples
23            return None
24        
25        # Calculate z-score
26        mean = statistics.mean(self.values)
27        stdev = statistics.stdev(self.values)
28        
29        if stdev == 0:
30            return None
31        
32        zscore = abs((value - mean) / stdev)
33        
34        if zscore > self.zscore_threshold:
35            return QualityIssue(
36                check_name=self.name,
37                severity=Severity.CRITICAL if zscore > 10 else Severity.WARNING,
38                message=f"{self.field}={value} is outlier (z-score: {zscore:.2f})",
39                timestamp=datetime.utcnow(),
40                metrics={
41                    'field': self.field,
42                    'value': value,
43                    'mean': mean,
44                    'stdev': stdev,
45                    'zscore': zscore
46                }
47            )
48        
49        return None
50
51class SpreadCheck(QualityCheck):
52    """Check bid-ask spread is reasonable."""
53    
54    def __init__(self, max_spread_bps: float = 100):
55        super().__init__("spread", Severity.WARNING)
56        self.max_spread_bps = max_spread_bps
57        
58    def check(self, data: Dict[str, Any]) -> Optional[QualityIssue]:
59        """Check bid-ask spread."""
60        if 'bid' not in data or 'ask' not in data:
61            return None
62        
63        bid = float(data['bid'])
64        ask = float(data['ask'])
65        
66        if bid >= ask:
67            return QualityIssue(
68                check_name=self.name,
69                severity=Severity.CRITICAL,
70                message=f"Crossed market: bid={bid} >= ask={ask}",
71                timestamp=datetime.utcnow(),
72                metrics={'bid': bid, 'ask': ask}
73            )
74        
75        mid = (bid + ask) / 2
76        spread_bps = ((ask - bid) / mid) * 10000
77        
78        if spread_bps > self.max_spread_bps:
79            return QualityIssue(
80                check_name=self.name,
81                severity=Severity.WARNING,
82                message=f"Wide spread: {spread_bps:.1f}bps (limit: {self.max_spread_bps}bps)",
83                timestamp=datetime.utcnow(),
84                metrics={'spread_bps': spread_bps, 'bid': bid, 'ask': ask}
85            )
86        
87        return None
88

Kafka Stream Monitoring#

python
1from kafka import KafkaConsumer
2import json
3
4class StreamQualityMonitor:
5    """Monitor Kafka stream data quality in real-time."""
6    
7    def __init__(self, topic: str, bootstrap_servers: str):
8        self.consumer = KafkaConsumer(
9            topic,
10            bootstrap_servers=bootstrap_servers,
11            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
12            auto_offset_reset='latest',
13        )
14        
15        self.monitor = QualityMonitor()
16        self.setup_checks()
17        
18        # Metrics
19        self.messages_processed = 0
20        self.issues_detected = 0
21        
22    def setup_checks(self):
23        """Configure quality checks."""
24        # Freshness: data should be < 1 second old
25        self.monitor.add_check(FreshnessCheck(max_age_seconds=1.0))
26        
27        # Gap detection: expect tick every ~100ms
28        self.monitor.add_check(GapDetectionCheck(expected_interval_ms=100))
29        
30        # Schema validation
31        self.monitor.add_check(SchemaCheck(
32            required_fields=['timestamp', 'symbol', 'price', 'size'],
33            field_types={
34                'timestamp': int,
35                'symbol': str,
36                'price': (int, float, Decimal),
37                'size': int
38            }
39        ))
40        
41        # Price range: $0.01 to $100,000
42        self.monitor.add_check(RangeCheck('price', min_value=0.01, max_value=100000))
43        
44        # Size range: 1 to 1,000,000 shares
45        self.monitor.add_check(RangeCheck('size', min_value=1, max_value=1000000))
46        
47        # Statistical outlier detection
48        self.monitor.add_check(StatisticalOutlierCheck('price', window_size=1000, zscore_threshold=5.0))
49        
50        # Spread check (if quote data)
51        self.monitor.add_check(SpreadCheck(max_spread_bps=100))
52    
53    def run(self):
54        """Start monitoring stream."""
55        print("Starting stream quality monitoring...")
56        
57        for message in self.consumer:
58            self.messages_processed += 1
59            
60            # Run quality checks
61            issues = self.monitor.run_checks(message.value)
62            
63            if issues:
64                self.issues_detected += len(issues)
65                
66                for issue in issues:
67                    self.handle_issue(issue)
68            
69            # Log stats every 10k messages
70            if self.messages_processed % 10000 == 0:
71                print(f"Processed {self.messages_processed} messages, "
72                      f"detected {self.issues_detected} issues "
73                      f"({self.issues_detected/self.messages_processed*100:.2f}%)")
74    
75    def handle_issue(self, issue: QualityIssue):
76        """Handle detected quality issue."""
77        print(f"[{issue.severity.name}] {issue.check_name}: {issue.message}")
78        
79        if issue.severity == Severity.CRITICAL:
80            # Alert on-call engineer
81            self.send_pagerduty_alert(issue)
82        elif issue.severity == Severity.WARNING:
83            # Post to Slack
84            self.send_slack_alert(issue)
85        
86        # Always log metrics
87        self.log_metrics(issue)
88    
89    def send_pagerduty_alert(self, issue: QualityIssue):
90        """Send critical alert to PagerDuty."""
91        # Implementation: POST to PagerDuty Events API
92        pass
93    
94    def send_slack_alert(self, issue: QualityIssue):
95        """Send warning to Slack channel."""
96        # Implementation: POST to Slack webhook
97        pass
98    
99    def log_metrics(self, issue: QualityIssue):
100        """Log issue metrics for dashboards."""
101        # Implementation: Send to Prometheus/CloudWatch
102        pass
103

Metrics Dashboard#

python
1from prometheus_client import Counter, Histogram, Gauge, start_http_server
2
3# Define metrics
4messages_processed = Counter('data_quality_messages_total', 'Total messages processed')
5issues_detected = Counter('data_quality_issues_total', 'Total issues detected',
6                         ['severity', 'check_name'])
7data_freshness = Histogram('data_freshness_seconds', 'Data age in seconds',
8                          buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0])
9gap_size = Histogram('data_gap_milliseconds', 'Time gap between messages',
10                    buckets=[10, 50, 100, 500, 1000, 5000])
11
12class MetricsCollector:
13    """Collect data quality metrics for Prometheus."""
14    
15    def record_message(self):
16        """Record message processed."""
17        messages_processed.inc()
18    
19    def record_issue(self, issue: QualityIssue):
20        """Record quality issue."""
21        issues_detected.labels(
22            severity=issue.severity.name,
23            check_name=issue.check_name
24        ).inc()
25    
26    def record_freshness(self, age_seconds: float):
27        """Record data freshness."""
28        data_freshness.observe(age_seconds)
29    
30    def record_gap(self, gap_ms: float):
31        """Record data gap."""
32        gap_size.observe(gap_ms)
33
34# Start Prometheus metrics server
35start_http_server(9090)
36

Production Results#

Quality issues detected (2021-2024):

plaintext
1Issue Type              Count    Avg Severity    Production Impact Prevented
2─────────────────────────────────────────────────────────────────────────────────
3Stale data (>1s)        42       Critical        Bad price data → wrong trades
4Missing ticks (gaps)    31       Warning         Incomplete market view
5Schema changes          8        Critical        Parser crashes
6Price outliers          45       Warning         Invalid signals
7Crossed markets         6        Critical        Arbitrage false positives
8Wide spreads            10       Warning         Poor execution
9

Example: Prevented Incident#

plaintext
12023-06-15 09:31:42 [CRITICAL] freshness: Stale data: 5.2s old (limit: 1.0s)
2  Symbol: AAPL
3  Last update: 09:31:37
4  Current time: 09:31:42
5  
6Action: Automatic trading halt for AAPL
7Root cause: Exchange API rate limit hit
8Resolution: Switched to backup feed, resumed trading in 15 seconds
9Impact: $0 (caught before bad trades)
10

Lessons Learned#

  1. Real-time is critical: Batch validation too slow for trading
  2. Multiple checks: Schema + range + statistical catches everything
  3. Severity levels: Not all issues need pages (WARNING vs CRITICAL)
  4. Automatic remediation: Switch to backup feed on failures
  5. Z-score threshold: 5σ catches real outliers without false positives
  6. Gap detection: Time gaps indicate data loss, not bad values
  7. Freshness first: Stale data most dangerous quality issue
  8. Metrics matter: Track issue rates, not just counts

Data quality monitoring is essential for production trading. The issues it prevents far outweigh the monitoring costs.

Further Reading#

  • Great Expectations
  • Deequ (AWS Data Quality)
  • Statistical Process Control
  • Prometheus Monitoring
NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

data-qualitymonitoringstreamingkafkapython

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 31, 2024•9 min read
Time-Series Databases Comparison for Trading
Data Engineeringtimescaledbinfluxdb
Dec 31, 2024•9 min read
Building a Data Lake for Financial Data
Data Engineeringdata-lakes3
Nov 25, 2025•22 min read
Value at Risk (VaR): From Theory to Production
Risk Managementrisk-managementvar

Interested in working together?