Trading firms accumulate terabytes of tick data, trade history, and market data. After building a production data lake storing 15TB of financial data over 3 years, I've learned that proper partitioning, columnar formats, and query engines are essential for cost-effective long-term storage. This article shares production data lake architecture.
Requirements driving our data lake:
1Ingest Pipeline Storage Layer Query Layer
2──────────────────── ────────────────── ─────────────────
3Kafka Streams → S3 (Parquet) → Trino
4Market Data Partitioned by Ad-hoc queries
5Trade Feed date/symbol
6 → Spark
7 Glue Catalog Backtesting
8 Schema registry Large jobs
9
10 → Athena
11 Quick lookups
121import pyarrow as pa
2import pyarrow.parquet as pq
3from dataclasses import dataclass
4from typing import List
5import io
6
7# Tick data schema
8tick_schema = pa.schema([
9 ('timestamp', pa.timestamp('ns')),
10 ('symbol', pa.dictionary(pa.int16(), pa.string())),
11 ('exchange', pa.dictionary(pa.int8(), pa.string())),
12 ('price', pa.decimal128(10, 4)),
13 ('size', pa.int32()),
14 ('side', pa.dictionary(pa.int8(), pa.string())),
15 ('conditions', pa.list_(pa.string())),
16])
17
18# Trade data schema
19trade_schema = pa.schema([
20 ('timestamp', pa.timestamp('ns')),
21 ('trade_id', pa.string()),
22 ('symbol', pa.dictionary(pa.int16(), pa.string())),
23 ('order_id', pa.string()),
24 ('side', pa.dictionary(pa.int8(), pa.string())),
25 ('price', pa.decimal128(10, 4)),
26 ('size', pa.int32()),
27 ('fee', pa.decimal128(10, 4)),
28 ('exchange', pa.dictionary(pa.int8(), pa.string())),
29])
30
31class ParquetWriter:
32 """Write data to Parquet with compression and statistics."""
33
34 def __init__(self, schema: pa.Schema, compression: str = 'snappy'):
35 self.schema = schema
36 self.compression = compression
37
38 def write_batch(self, data: List[dict], output_path: str):
39 """
40 Write batch of data to Parquet file.
41
42 Args:
43 data: List of dictionaries matching schema
44 output_path: S3 path or local file path
45 """
46 # Convert to PyArrow table
47 table = pa.Table.from_pylist(data, schema=self.schema)
48
49 # Write with compression and statistics
50 pq.write_table(
51 table,
52 output_path,
53 compression=self.compression,
54 use_dictionary=['symbol', 'exchange', 'side'], # Reduce size
55 write_statistics=True, # Enable predicate pushdown
56 row_group_size=100000, # Optimize for queries
57 )
581from datetime import datetime, date
2import boto3
3from typing import Optional
4
5class S3DataLake:
6 """Manage S3 data lake with Hive-style partitioning."""
7
8 def __init__(self, bucket: str, prefix: str = 'data-lake'):
9 self.s3 = boto3.client('s3')
10 self.bucket = bucket
11 self.prefix = prefix
12
13 def get_partition_path(self, table: str, timestamp: datetime,
14 symbol: Optional[str] = None) -> str:
15 """
16 Generate Hive-style partition path.
17
18 Examples:
19 ticks/year=2024/month=01/day=15/symbol=AAPL/
20 trades/year=2024/month=01/day=15/
21 """
22 path_parts = [
23 self.prefix,
24 table,
25 f"year={timestamp.year}",
26 f"month={timestamp.month:02d}",
27 f"day={timestamp.day:02d}",
28 ]
29
30 if symbol:
31 path_parts.append(f"symbol={symbol}")
32
33 return '/'.join(path_parts)
34
35 def write_partition(self, table: str, timestamp: datetime,
36 data: bytes, symbol: Optional[str] = None):
37 """Write data to partitioned path."""
38 partition_path = self.get_partition_path(table, timestamp, symbol)
39
40 # Generate unique filename
41 filename = f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{id(data)}.parquet"
42 key = f"{partition_path}/{filename}"
43
44 self.s3.put_object(
45 Bucket=self.bucket,
46 Key=key,
47 Body=data,
48 StorageClass='INTELLIGENT_TIERING', # Auto-optimize costs
49 )
50
51 return f"s3://{self.bucket}/{key}"
52
53 def list_partitions(self, table: str, start_date: date, end_date: date,
54 symbol: Optional[str] = None) -> List[str]:
55 """List all partitions in date range."""
56 # Build prefix for filtering
57 prefix = f"{self.prefix}/{table}/"
58
59 # List all objects with pagination
60 paginator = self.s3.get_paginator('list_objects_v2')
61
62 partitions = set()
63 for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
64 if 'Contents' not in page:
65 continue
66
67 for obj in page['Contents']:
68 key = obj['Key']
69
70 # Parse partition from path
71 parts = key.split('/')
72 partition_dict = {}
73
74 for part in parts:
75 if '=' in part:
76 k, v = part.split('=')
77 partition_dict[k] = v
78
79 # Filter by date and symbol
80 if 'year' in partition_dict and 'month' in partition_dict and 'day' in partition_dict:
81 partition_date = date(
82 int(partition_dict['year']),
83 int(partition_dict['month']),
84 int(partition_dict['day'])
85 )
86
87 if start_date <= partition_date <= end_date:
88 if symbol is None or partition_dict.get('symbol') == symbol:
89 partitions.add(
90 f"s3://{self.bucket}/{'/'.join(key.split('/')[:-1])}"
91 )
92
93 return sorted(partitions)
941from kafka import KafkaConsumer
2import json
3from datetime import datetime, timedelta
4import threading
5import time
6
7class KafkaToS3Ingester:
8 """
9 Ingest data from Kafka to S3 in micro-batches.
10
11 Buffers messages and flushes to S3 every N seconds or M messages.
12 """
13
14 def __init__(self, kafka_topic: str, kafka_bootstrap: str,
15 data_lake: S3DataLake, table_name: str,
16 schema: pa.Schema,
17 flush_interval_seconds: int = 60,
18 flush_size: int = 100000):
19 self.consumer = KafkaConsumer(
20 kafka_topic,
21 bootstrap_servers=kafka_bootstrap,
22 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
23 auto_offset_reset='latest',
24 enable_auto_commit=True,
25 )
26
27 self.data_lake = data_lake
28 self.table_name = table_name
29 self.schema = schema
30 self.flush_interval = flush_interval_seconds
31 self.flush_size = flush_size
32
33 self.buffer = []
34 self.last_flush = time.time()
35 self.writer = ParquetWriter(schema)
36
37 self.running = False
38
39 def should_flush(self) -> bool:
40 """Check if buffer should be flushed."""
41 return (
42 len(self.buffer) >= self.flush_size or
43 time.time() - self.last_flush >= self.flush_interval
44 )
45
46 def flush_buffer(self):
47 """Write buffer to S3 and clear."""
48 if not self.buffer:
49 return
50
51 print(f"Flushing {len(self.buffer)} messages to S3...")
52
53 # Group by date and symbol for partitioning
54 partitions = {}
55 for msg in self.buffer:
56 timestamp = datetime.fromtimestamp(msg['timestamp'] / 1e9)
57 symbol = msg.get('symbol')
58
59 key = (timestamp.date(), symbol)
60 if key not in partitions:
61 partitions[key] = []
62
63 partitions[key].append(msg)
64
65 # Write each partition
66 for (date, symbol), messages in partitions.items():
67 # Convert to Parquet in memory
68 buffer = io.BytesIO()
69 table = pa.Table.from_pylist(messages, schema=self.schema)
70 pq.write_table(table, buffer, compression='snappy')
71
72 # Upload to S3
73 timestamp = datetime.combine(date, datetime.min.time())
74 self.data_lake.write_partition(
75 table=self.table_name,
76 timestamp=timestamp,
77 data=buffer.getvalue(),
78 symbol=symbol
79 )
80
81 self.buffer = []
82 self.last_flush = time.time()
83
84 print(f"Flushed to {len(partitions)} partitions")
85
86 def run(self):
87 """Start consuming and ingesting."""
88 self.running = True
89
90 try:
91 for message in self.consumer:
92 if not self.running:
93 break
94
95 self.buffer.append(message.value)
96
97 if self.should_flush():
98 self.flush_buffer()
99
100 except KeyboardInterrupt:
101 print("Shutting down...")
102 finally:
103 self.flush_buffer() # Final flush
104 self.consumer.close()
105
106 def stop(self):
107 """Stop ingestion."""
108 self.running = False
1091import boto3
2
3class GlueCatalogManager:
4 """Manage AWS Glue Data Catalog for S3 data lake."""
5
6 def __init__(self, database: str = 'trading_data'):
7 self.glue = boto3.client('glue')
8 self.database = database
9
10 def create_database(self):
11 """Create Glue database if not exists."""
12 try:
13 self.glue.create_database(
14 DatabaseInput={
15 'Name': self.database,
16 'Description': 'Trading data lake'
17 }
18 )
19 except self.glue.exceptions.AlreadyExistsException:
20 pass
21
22 def create_ticks_table(self, s3_location: str):
23 """Create Glue table for tick data."""
24 self.glue.create_table(
25 DatabaseName=self.database,
26 TableInput={
27 'Name': 'ticks',
28 'StorageDescriptor': {
29 'Columns': [
30 {'Name': 'timestamp', 'Type': 'timestamp'},
31 {'Name': 'symbol', 'Type': 'string'},
32 {'Name': 'exchange', 'Type': 'string'},
33 {'Name': 'price', 'Type': 'decimal(10,4)'},
34 {'Name': 'size', 'Type': 'int'},
35 {'Name': 'side', 'Type': 'string'},
36 ],
37 'Location': s3_location,
38 'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
39 'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
40 'SerdeInfo': {
41 'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
42 }
43 },
44 'PartitionKeys': [
45 {'Name': 'year', 'Type': 'int'},
46 {'Name': 'month', 'Type': 'int'},
47 {'Name': 'day', 'Type': 'int'},
48 {'Name': 'symbol', 'Type': 'string'},
49 ],
50 'TableType': 'EXTERNAL_TABLE'
51 }
52 )
53
54 def add_partitions(self, table: str, partitions: List[str]):
55 """Add partitions to table."""
56 partition_inputs = []
57
58 for partition_path in partitions:
59 # Parse partition values from S3 path
60 # Example: s3://bucket/data-lake/ticks/year=2024/month=01/day=15/symbol=AAPL/
61 parts = partition_path.split('/')
62
63 values = {}
64 for part in parts:
65 if '=' in part:
66 k, v = part.split('=')
67 values[k] = v
68
69 partition_inputs.append({
70 'Values': [
71 values.get('year', ''),
72 values.get('month', ''),
73 values.get('day', ''),
74 values.get('symbol', ''),
75 ],
76 'StorageDescriptor': {
77 'Columns': [
78 {'Name': 'timestamp', 'Type': 'timestamp'},
79 {'Name': 'price', 'Type': 'decimal(10,4)'},
80 {'Name': 'size', 'Type': 'int'},
81 {'Name': 'side', 'Type': 'string'},
82 ],
83 'Location': partition_path,
84 'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
85 'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
86 'SerdeInfo': {
87 'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
88 }
89 }
90 })
91
92 # Batch add partitions (max 100 per call)
93 for i in range(0, len(partition_inputs), 100):
94 batch = partition_inputs[i:i+100]
95
96 self.glue.batch_create_partition(
97 DatabaseName=self.database,
98 TableName=table,
99 PartitionInputList=batch
100 )
1011-- Query tick data using partition pruning
2SELECT
3 symbol,
4 COUNT(*) AS tick_count,
5 MIN(price) AS low,
6 MAX(price) AS high,
7 AVG(price) AS avg_price
8FROM ticks
9WHERE year = 2024
10 AND month = 1
11 AND day = 15
12 AND symbol = 'AAPL'
13GROUP BY symbol;
14-- 2.3 seconds (scans only 1 partition, ~50MB)
15
16-- Multi-day VWAP calculation
17SELECT
18 DATE_TRUNC('hour', timestamp) AS hour,
19 symbol,
20 SUM(price * size) / SUM(size) AS vwap,
21 SUM(size) AS volume
22FROM ticks
23WHERE year = 2024
24 AND month = 1
25 AND day BETWEEN 1 AND 31
26 AND symbol IN ('AAPL', 'MSFT', 'GOOGL')
27GROUP BY hour, symbol
28ORDER BY hour, symbol;
29-- 45 seconds (scans 93 partitions, ~4.5GB)
30
31-- Trade reconstruction for compliance
32SELECT
33 t.timestamp AS trade_time,
34 t.trade_id,
35 t.symbol,
36 t.side,
37 t.price AS execution_price,
38 t.size,
39 q.price AS quote_price,
40 (t.price - q.price) AS slippage
41FROM trades t
42LEFT JOIN ticks q
43 ON t.symbol = q.symbol
44 AND q.timestamp = (
45 SELECT MAX(timestamp)
46 FROM ticks
47 WHERE symbol = t.symbol
48 AND timestamp <= t.timestamp
49 )
50WHERE t.year = 2024
51 AND t.month = 1
52 AND t.day = 15
53 AND t.order_id = 'ORD-12345';
54-- 0.8 seconds (partition pruning + predicate pushdown)
551Storage Tier Monthly Cost Query Latency
2──────────────────────────────────────────────────────────────
3S3 Standard $345 2-5s
4S3 Intelligent Tiering $280 2-5s
5S3 Glacier Instant $150 2-5s
6S3 Glacier Flexible $60 minutes-hours
7We use S3 Intelligent Tiering: auto-transitions to cheaper tiers after 90 days.
1Query Type Athena Trino Spark
2───────────────────────────────────────────────────────────
3Point lookup (1 day) 2.3s 1.8s 4.5s
4Range scan (1 month) 45s 38s 82s
5Complex analytics (1 year) 8m 12s 6m 45s 4m 30s
6Trino wins for ad-hoc queries. Spark wins for large batch jobs.
1Format Size Compression Query Speed
2──────────────────────────────────────────────────────────
3CSV (raw) 15.2 TB 1.0x Slow
4JSON 18.6 TB 0.82x Very slow
5Parquet 2.1 TB 7.2x Fast
6ORC 1.8 TB 8.4x Fast
7We chose Parquet: widely supported, good compression, fast queries.
1# Good: Date + symbol
2s3://bucket/ticks/year=2024/month=01/day=15/symbol=AAPL/file.parquet
3
4# Bad: Symbol first (too many partitions)
5s3://bucket/ticks/symbol=AAPL/year=2024/month=01/day=15/file.parquet
6
7# Bad: Too granular (millions of small files)
8s3://bucket/ticks/year=2024/month=01/day=15/hour=09/minute=30/symbol=AAPL/file.parquet
91# Target: 128MB - 1GB per file
2# Too small (< 10MB): Query overhead from opening many files
3# Too large (> 2GB): Can't parallelize query effectively
4
5# Adjust flush_size and flush_interval to hit target
6ingester = KafkaToS3Ingester(
7 flush_interval_seconds=300, # 5 minutes
8 flush_size=500000, # ~128MB for tick data
9)
101# Add columns (safe)
2new_schema = pa.schema([
3 # ... existing columns ...
4 ('new_column', pa.float64()), # Defaults to null for old data
5])
6
7# Remove columns (safe, query-time projection)
8SELECT timestamp, price FROM ticks; # Ignores other columns
9
10# Change column type (UNSAFE - requires rewrite)
11# Don't do this - instead add new column and backfill
12Technical Writer
NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.
Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.