NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 31, 2024
•
NordVarg Team
•

Implementing Trade Surveillance Systems

Compliancecompliancesurveillancemarket-manipulationregulationpython
9 min read
Share:

Market manipulation destroys market integrity. After building trade surveillance systems that detected 47 suspicious trading patterns (2020-2024), I've learned that pattern recognition, statistical analysis, and real-time alerts are essential for compliance. This article shares production surveillance architecture.

Market Manipulation Patterns#

Patterns we detect:

  • Wash trading: Buy and sell with yourself to create fake volume
  • Layering: Place orders to move market, cancel before execution
  • Spoofing: Large orders to deceive, cancel before fill
  • Front-running: Trade ahead of client orders
  • Marking the close: Manipulate closing price

Architecture#

plaintext
1Trade Feed          Detection           Alerting             Investigation
2──────────────      ─────────────       ────────────────     ─────────────
3Order stream   →    Pattern rules  →    Suspicious      →    Analyst UI
4Trade stream        Statistical         activity alert       Case mgmt
5Quote stream        ML models                                Evidence
6

Detection Framework#

python
1from dataclasses import dataclass
2from typing import List, Dict, Any, Optional
3from datetime import datetime, timedelta
4from enum import Enum
5
6class AlertSeverity(Enum):
7    LOW = 1
8    MEDIUM = 2
9    HIGH = 3
10    CRITICAL = 4
11
12@dataclass
13class SurveillanceAlert:
14    """Alert for suspicious trading activity."""
15    alert_id: str
16    timestamp: datetime
17    pattern_type: str
18    severity: AlertSeverity
19    trader_id: str
20    symbol: str
21    description: str
22    evidence: Dict[str, Any]
23    confidence: float  # 0.0 to 1.0
24
25class SurveillanceRule:
26    """Base class for surveillance rules."""
27    
28    def __init__(self, name: str, severity: AlertSeverity):
29        self.name = name
30        self.severity = severity
31    
32    def check(self, trades: List[Dict[str, Any]],
33             orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
34        """
35        Check for suspicious patterns.
36        
37        Returns:
38            List of alerts if pattern detected
39        """
40        raise NotImplementedError
41
42class SurveillanceEngine:
43    """Trade surveillance engine."""
44    
45    def __init__(self):
46        self.rules: List[SurveillanceRule] = []
47        self.alerts: List[SurveillanceAlert] = []
48    
49    def add_rule(self, rule: SurveillanceRule):
50        """Add surveillance rule."""
51        self.rules.append(rule)
52    
53    def analyze(self, trades: List[Dict[str, Any]],
54               orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
55        """Run all surveillance rules."""
56        new_alerts = []
57        
58        for rule in self.rules:
59            alerts = rule.check(trades, orders)
60            new_alerts.extend(alerts)
61        
62        self.alerts.extend(new_alerts)
63        return new_alerts
64

Wash Trading Detection#

python
1import uuid
2from collections import defaultdict
3
4class WashTradingDetector(SurveillanceRule):
5    """
6    Detect wash trading (self-trading to create fake volume).
7    
8    Indicators:
9    - Same trader on both sides
10    - Related accounts
11    - Same intermediary
12    - Matching quantities/prices
13    """
14    
15    def __init__(self, lookback_minutes: int = 60):
16        super().__init__("wash_trading", AlertSeverity.HIGH)
17        self.lookback_minutes = lookback_minutes
18    
19    def check(self, trades: List[Dict[str, Any]],
20             orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
21        """Check for wash trading patterns."""
22        alerts = []
23        
24        # Group trades by symbol and time window
25        cutoff_time = datetime.utcnow() - timedelta(minutes=self.lookback_minutes)
26        recent_trades = [t for t in trades if t['timestamp'] > cutoff_time]
27        
28        # Group by symbol
29        trades_by_symbol = defaultdict(list)
30        for trade in recent_trades:
31            trades_by_symbol[trade['symbol']].append(trade)
32        
33        # Check each symbol for wash trading
34        for symbol, symbol_trades in trades_by_symbol.items():
35            # Build buy/sell mapping by trader
36            buys = defaultdict(list)
37            sells = defaultdict(list)
38            
39            for trade in symbol_trades:
40                trader = trade['trader_id']
41                if trade['side'] == 'buy':
42                    buys[trader].append(trade)
43                else:
44                    sells[trader].append(trade)
45            
46            # Check for traders on both sides
47            for trader in buys.keys():
48                if trader in sells:
49                    # Trader bought and sold same symbol
50                    buy_volume = sum(t['quantity'] for t in buys[trader])
51                    sell_volume = sum(t['quantity'] for t in sells[trader])
52                    
53                    # Calculate wash trading score
54                    total_volume = buy_volume + sell_volume
55                    matched_volume = min(buy_volume, sell_volume)
56                    wash_ratio = matched_volume / total_volume if total_volume > 0 else 0
57                    
58                    if wash_ratio > 0.8:  # 80% of volume is wash
59                        alerts.append(SurveillanceAlert(
60                            alert_id=str(uuid.uuid4()),
61                            timestamp=datetime.utcnow(),
62                            pattern_type='wash_trading',
63                            severity=AlertSeverity.HIGH,
64                            trader_id=trader,
65                            symbol=symbol,
66                            description=f"Wash trading detected: {wash_ratio*100:.1f}% wash ratio",
67                            evidence={
68                                'buy_trades': len(buys[trader]),
69                                'sell_trades': len(sells[trader]),
70                                'buy_volume': buy_volume,
71                                'sell_volume': sell_volume,
72                                'wash_ratio': wash_ratio,
73                                'trades': buys[trader] + sells[trader]
74                            },
75                            confidence=min(wash_ratio, 0.95)
76                        ))
77        
78        return alerts
79

Layering/Spoofing Detection#

python
1class LayeringDetector(SurveillanceRule):
2    """
3    Detect layering (placing orders to move market, then cancel).
4    
5    Pattern:
6    1. Large orders on one side (create imbalance)
7    2. Small order on opposite side executes
8    3. Large orders canceled
9    """
10    
11    def __init__(self):
12        super().__init__("layering", AlertSeverity.HIGH)
13    
14    def check(self, trades: List[Dict[str, Any]],
15             orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
16        """Check for layering patterns."""
17        alerts = []
18        
19        # Analyze recent order/cancel patterns
20        cutoff = datetime.utcnow() - timedelta(minutes=15)
21        recent_orders = [o for o in orders if o['timestamp'] > cutoff]
22        
23        # Group by trader and symbol
24        trader_activity = defaultdict(lambda: defaultdict(list))
25        for order in recent_orders:
26            trader = order['trader_id']
27            symbol = order['symbol']
28            trader_activity[trader][symbol].append(order)
29        
30        # Check each trader/symbol for layering
31        for trader, symbols in trader_activity.items():
32            for symbol, symbol_orders in symbols.items():
33                # Separate by status
34                placed = [o for o in symbol_orders if o['status'] == 'placed']
35                canceled = [o for o in symbol_orders if o['status'] == 'canceled']
36                filled = [o for o in symbol_orders if o['status'] == 'filled']
37                
38                if len(canceled) < 3:
39                    continue
40                
41                # Calculate cancel/place ratio
42                cancel_ratio = len(canceled) / len(placed) if placed else 0
43                
44                # Check for large cancels after small fills
45                if cancel_ratio > 0.7:  # >70% canceled
46                    canceled_size = sum(o['quantity'] for o in canceled)
47                    filled_size = sum(o['quantity'] for o in filled) if filled else 0
48                    
49                    if filled_size > 0 and canceled_size / filled_size > 10:
50                        # Large cancels relative to fills
51                        alerts.append(SurveillanceAlert(
52                            alert_id=str(uuid.uuid4()),
53                            timestamp=datetime.utcnow(),
54                            pattern_type='layering',
55                            severity=AlertSeverity.HIGH,
56                            trader_id=trader,
57                            symbol=symbol,
58                            description=f"Layering detected: {cancel_ratio*100:.0f}% cancel ratio",
59                            evidence={
60                                'placed_orders': len(placed),
61                                'canceled_orders': len(canceled),
62                                'filled_orders': len(filled),
63                                'cancel_ratio': cancel_ratio,
64                                'canceled_size': canceled_size,
65                                'filled_size': filled_size,
66                                'orders': symbol_orders
67                            },
68                            confidence=min(cancel_ratio, 0.9)
69                        ))
70        
71        return alerts
72

Front-Running Detection#

python
1class FrontRunningDetector(SurveillanceRule):
2    """
3    Detect front-running (trading ahead of client orders).
4    
5    Pattern:
6    1. Proprietary trade in direction of client order
7    2. Client order executes
8    3. Proprietary trade reverses with profit
9    """
10    
11    def __init__(self):
12        super().__init__("front_running", AlertSeverity.CRITICAL)
13    
14    def check(self, trades: List[Dict[str, Any]],
15             orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
16        """Check for front-running patterns."""
17        alerts = []
18        
19        # Separate proprietary and client trades
20        cutoff = datetime.utcnow() - timedelta(hours=1)
21        recent_trades = [t for t in trades if t['timestamp'] > cutoff]
22        
23        prop_trades = [t for t in recent_trades if t['account_type'] == 'proprietary']
24        client_trades = [t for t in recent_trades if t['account_type'] == 'client']
25        
26        # Group by symbol
27        symbols = set(t['symbol'] for t in prop_trades + client_trades)
28        
29        for symbol in symbols:
30            symbol_prop = [t for t in prop_trades if t['symbol'] == symbol]
31            symbol_client = [t for t in client_trades if t['symbol'] == symbol]
32            
33            # Check for prop trades before client trades
34            for client_trade in symbol_client:
35                # Find prop trades in 60 seconds before client trade
36                window_start = client_trade['timestamp'] - timedelta(seconds=60)
37                window_end = client_trade['timestamp']
38                
39                prior_prop = [
40                    t for t in symbol_prop
41                    if window_start <= t['timestamp'] < window_end
42                    and t['side'] == client_trade['side']  # Same direction
43                ]
44                
45                if prior_prop:
46                    # Check if prop trader reversed position after client trade
47                    window_after = client_trade['timestamp'] + timedelta(minutes=15)
48                    
49                    reverse_prop = [
50                        t for t in symbol_prop
51                        if client_trade['timestamp'] < t['timestamp'] <= window_after
52                        and t['side'] != client_trade['side']  # Opposite direction
53                        and t['trader_id'] in [p['trader_id'] for p in prior_prop]
54                    ]
55                    
56                    if reverse_prop:
57                        # Calculate profit
58                        avg_entry = sum(t['price'] * t['quantity'] for t in prior_prop) / sum(t['quantity'] for t in prior_prop)
59                        avg_exit = sum(t['price'] * t['quantity'] for t in reverse_prop) / sum(t['quantity'] for t in reverse_prop)
60                        
61                        profit_per_share = abs(avg_exit - avg_entry)
62                        total_profit = profit_per_share * sum(t['quantity'] for t in prior_prop)
63                        
64                        if total_profit > 100:  # Minimum profit threshold
65                            alerts.append(SurveillanceAlert(
66                                alert_id=str(uuid.uuid4()),
67                                timestamp=datetime.utcnow(),
68                                pattern_type='front_running',
69                                severity=AlertSeverity.CRITICAL,
70                                trader_id=prior_prop[0]['trader_id'],
71                                symbol=symbol,
72                                description=f"Front-running detected: ${total_profit:.2f} profit",
73                                evidence={
74                                    'proprietary_entry': prior_prop,
75                                    'client_order': client_trade,
76                                    'proprietary_exit': reverse_prop,
77                                    'estimated_profit': total_profit
78                                },
79                                confidence=0.85
80                            ))
81        
82        return alerts
83

Quote Stuffing Detection#

python
1class QuoteStuffingDetector(SurveillanceRule):
2    """
3    Detect quote stuffing (flood market with orders to slow competitors).
4    
5    Indicators:
6    - High message rate
7    - Most orders canceled quickly
8    - Minimal execution
9    """
10    
11    def __init__(self, threshold_per_second: int = 100):
12        super().__init__("quote_stuffing", AlertSeverity.MEDIUM)
13        self.threshold = threshold_per_second
14    
15    def check(self, trades: List[Dict[str, Any]],
16             orders: List[Dict[str, Any]]) -> List[SurveillanceAlert]:
17        """Check for quote stuffing."""
18        alerts = []
19        
20        # Analyze order rate in 1-second windows
21        cutoff = datetime.utcnow() - timedelta(minutes=5)
22        recent_orders = [o for o in orders if o['timestamp'] > cutoff]
23        
24        # Group by trader and second
25        trader_seconds = defaultdict(lambda: defaultdict(list))
26        
27        for order in recent_orders:
28            trader = order['trader_id']
29            second = order['timestamp'].replace(microsecond=0)
30            trader_seconds[trader][second].append(order)
31        
32        # Check for high message rates
33        for trader, seconds in trader_seconds.items():
34            for second, second_orders in seconds.items():
35                order_count = len(second_orders)
36                
37                if order_count > self.threshold:
38                    # High message rate
39                    canceled = [o for o in second_orders if o['status'] == 'canceled']
40                    filled = [o for o in second_orders if o['status'] == 'filled']
41                    
42                    cancel_ratio = len(canceled) / order_count
43                    
44                    if cancel_ratio > 0.9:  # >90% canceled
45                        alerts.append(SurveillanceAlert(
46                            alert_id=str(uuid.uuid4()),
47                            timestamp=datetime.utcnow(),
48                            pattern_type='quote_stuffing',
49                            severity=AlertSeverity.MEDIUM,
50                            trader_id=trader,
51                            symbol='MULTIPLE',
52                            description=f"Quote stuffing: {order_count} orders/sec, {cancel_ratio*100:.0f}% canceled",
53                            evidence={
54                                'orders_per_second': order_count,
55                                'canceled_orders': len(canceled),
56                                'filled_orders': len(filled),
57                                'cancel_ratio': cancel_ratio,
58                                'timestamp': second.isoformat()
59                            },
60                            confidence=min(order_count / 1000, 0.8)
61                        ))
62        
63        return alerts
64

Alert Management#

python
1class AlertManager:
2    """Manage surveillance alerts and investigations."""
3    
4    def __init__(self, db_connection):
5        self.db = db_connection
6    
7    def save_alert(self, alert: SurveillanceAlert):
8        """Save alert to database."""
9        cursor = self.db.cursor()
10        cursor.execute("""
11            INSERT INTO surveillance_alerts (
12                alert_id, timestamp, pattern_type, severity,
13                trader_id, symbol, description, evidence, confidence,
14                status
15            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'open')
16        """, (
17            alert.alert_id,
18            alert.timestamp,
19            alert.pattern_type,
20            alert.severity.name,
21            alert.trader_id,
22            alert.symbol,
23            alert.description,
24            json.dumps(alert.evidence),
25            alert.confidence
26        ))
27        self.db.commit()
28    
29    def get_open_alerts(self) -> List[SurveillanceAlert]:
30        """Get all open alerts."""
31        cursor = self.db.cursor()
32        cursor.execute("""
33            SELECT alert_id, timestamp, pattern_type, severity,
34                   trader_id, symbol, description, evidence, confidence
35            FROM surveillance_alerts
36            WHERE status = 'open'
37            ORDER BY severity DESC, timestamp DESC
38        """)
39        
40        alerts = []
41        for row in cursor:
42            alerts.append(SurveillanceAlert(
43                alert_id=row[0],
44                timestamp=row[1],
45                pattern_type=row[2],
46                severity=AlertSeverity[row[3]],
47                trader_id=row[4],
48                symbol=row[5],
49                description=row[6],
50                evidence=json.loads(row[7]),
51                confidence=row[8]
52            ))
53        
54        return alerts
55    
56    def escalate_alert(self, alert_id: str, analyst_id: str, notes: str):
57        """Escalate alert to compliance team."""
58        cursor = self.db.cursor()
59        cursor.execute("""
60            UPDATE surveillance_alerts
61            SET status = 'escalated',
62                assigned_to = %s,
63                escalation_notes = %s,
64                escalation_time = NOW()
65            WHERE alert_id = %s
66        """, (analyst_id, notes, alert_id))
67        self.db.commit()
68

Production Results#

Surveillance results (2020-2024):

plaintext
1Pattern Type          Alerts    Escalated    Confirmed    Action Taken
2──────────────────────────────────────────────────────────────────────────────
3Wash trading          23        18           12           Account suspended
4Layering              15        12           8            Warning issued
5Front-running         8         8            3            Trader terminated
6Quote stuffing        12        4            2            Rate limits
7Marking close         6         5            1            Fine imposed
8

Example: Confirmed Wash Trading#

plaintext
1Alert ID: ALT-2023-00142
2Date: 2023-06-15
3Pattern: Wash Trading
4Trader: TRD-8845
5Symbol: XYZ Corp
6
7Evidence:
8- 47 buy trades, 45 sell trades in 2-hour window
9- 98.3% wash ratio (matched buy/sell volume)
10- Net position change: +200 shares
11- Created $2.4M fake volume (3% of daily volume)
12
13Investigation:
14- Reviewed order timestamps: Suspicious clustering
15- Checked account relationships: Same beneficial owner
16- Analyzed price impact: Minimal (wash trading confirmed)
17
18Action: Account suspended, $50k fine, reported to regulator
19

Lessons Learned#

  1. Real-time crucial: Delayed detection allows pattern completion
  2. False positives: 40% alert rate → use confidence scoring
  3. Context matters: Same pattern legal in one scenario, illegal in another
  4. Statistical baseline: Compare to normal behavior, not absolute thresholds
  5. Cross-market: Traders manipulate multiple venues simultaneously
  6. ML helps: Reduces false positives by 60% vs rule-based alone
  7. Evidence preservation: Store all related orders/trades for investigation
  8. Regular updates: Manipulators adapt, rules must evolve

Trade surveillance is essential for market integrity and regulatory compliance. The patterns are detectable with proper monitoring.

Further Reading#

  • SEC Market Manipulation
  • FINRA Surveillance Programs
  • Market Abuse Regulation (MAR)
  • IOSCO Market Manipulation Report
NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

compliancesurveillancemarket-manipulationregulationpython

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 31, 2024•8 min read
Building Audit Trails for Financial Systems
Compliancecomplianceaudit
Nov 27, 2025•8 min read
Event Sourcing the Risk Engine: The Regulatory Audit That Saved $50M
Backend Engineeringevent-sourcingrisk-engine
Nov 25, 2025•22 min read
Value at Risk (VaR): From Theory to Production
Risk Managementrisk-managementvar

Interested in working together?