NV
NordVarg
ServicesTechnologiesIndustriesCase StudiesBlogAboutContact
Get Started

Footer

NV
NordVarg

Software Development & Consulting

GitHubLinkedInTwitter

Services

  • Product Development
  • Quantitative Finance
  • Financial Systems
  • ML & AI

Technologies

  • C++
  • Python
  • Rust
  • OCaml
  • TypeScript
  • React

Company

  • About
  • Case Studies
  • Blog
  • Contact

© 2025 NordVarg. All rights reserved.

December 31, 2024
•
NordVarg Team
•

Building a Data Lake for Financial Data

Data Engineeringdata-lakes3parquettrinodatalakecompliance
9 min read
Share:

Trading firms accumulate terabytes of tick data, trade history, and market data. After building a production data lake storing 15TB of financial data over 3 years, I've learned that proper partitioning, columnar formats, and query engines are essential for cost-effective long-term storage. This article shares production data lake architecture.

Why Data Lake#

Requirements driving our data lake:

  • Compliance: MiFID II requires 7-year tick data retention
  • Backtesting: Strategy development needs historical market data
  • Research: Ad-hoc analysis of order flow, market microstructure
  • Audit: Trade reconstruction for regulatory inquiries
  • Cost: Storing 15TB in hot database = 12k/monthvs12k/month vs 12k/monthvs350/month in S3

Architecture#

plaintext
1Ingest Pipeline        Storage Layer          Query Layer
2────────────────────   ──────────────────     ─────────────────
3Kafka Streams    →     S3 (Parquet)     →     Trino
4Market Data            Partitioned by         Ad-hoc queries
5Trade Feed             date/symbol
6                                        →     Spark
7                       Glue Catalog           Backtesting
8                       Schema registry        Large jobs
9                       
10                                        →     Athena
11                                              Quick lookups
12

Parquet Schema Design#

python
1import pyarrow as pa
2import pyarrow.parquet as pq
3from dataclasses import dataclass
4from typing import List
5import io
6
7# Tick data schema
8tick_schema = pa.schema([
9    ('timestamp', pa.timestamp('ns')),
10    ('symbol', pa.dictionary(pa.int16(), pa.string())),
11    ('exchange', pa.dictionary(pa.int8(), pa.string())),
12    ('price', pa.decimal128(10, 4)),
13    ('size', pa.int32()),
14    ('side', pa.dictionary(pa.int8(), pa.string())),
15    ('conditions', pa.list_(pa.string())),
16])
17
18# Trade data schema
19trade_schema = pa.schema([
20    ('timestamp', pa.timestamp('ns')),
21    ('trade_id', pa.string()),
22    ('symbol', pa.dictionary(pa.int16(), pa.string())),
23    ('order_id', pa.string()),
24    ('side', pa.dictionary(pa.int8(), pa.string())),
25    ('price', pa.decimal128(10, 4)),
26    ('size', pa.int32()),
27    ('fee', pa.decimal128(10, 4)),
28    ('exchange', pa.dictionary(pa.int8(), pa.string())),
29])
30
31class ParquetWriter:
32    """Write data to Parquet with compression and statistics."""
33    
34    def __init__(self, schema: pa.Schema, compression: str = 'snappy'):
35        self.schema = schema
36        self.compression = compression
37        
38    def write_batch(self, data: List[dict], output_path: str):
39        """
40        Write batch of data to Parquet file.
41        
42        Args:
43            data: List of dictionaries matching schema
44            output_path: S3 path or local file path
45        """
46        # Convert to PyArrow table
47        table = pa.Table.from_pylist(data, schema=self.schema)
48        
49        # Write with compression and statistics
50        pq.write_table(
51            table,
52            output_path,
53            compression=self.compression,
54            use_dictionary=['symbol', 'exchange', 'side'],  # Reduce size
55            write_statistics=True,  # Enable predicate pushdown
56            row_group_size=100000,  # Optimize for queries
57        )
58

S3 Partitioning Strategy#

python
1from datetime import datetime, date
2import boto3
3from typing import Optional
4
5class S3DataLake:
6    """Manage S3 data lake with Hive-style partitioning."""
7    
8    def __init__(self, bucket: str, prefix: str = 'data-lake'):
9        self.s3 = boto3.client('s3')
10        self.bucket = bucket
11        self.prefix = prefix
12        
13    def get_partition_path(self, table: str, timestamp: datetime,
14                          symbol: Optional[str] = None) -> str:
15        """
16        Generate Hive-style partition path.
17        
18        Examples:
19            ticks/year=2024/month=01/day=15/symbol=AAPL/
20            trades/year=2024/month=01/day=15/
21        """
22        path_parts = [
23            self.prefix,
24            table,
25            f"year={timestamp.year}",
26            f"month={timestamp.month:02d}",
27            f"day={timestamp.day:02d}",
28        ]
29        
30        if symbol:
31            path_parts.append(f"symbol={symbol}")
32        
33        return '/'.join(path_parts)
34    
35    def write_partition(self, table: str, timestamp: datetime,
36                       data: bytes, symbol: Optional[str] = None):
37        """Write data to partitioned path."""
38        partition_path = self.get_partition_path(table, timestamp, symbol)
39        
40        # Generate unique filename
41        filename = f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{id(data)}.parquet"
42        key = f"{partition_path}/{filename}"
43        
44        self.s3.put_object(
45            Bucket=self.bucket,
46            Key=key,
47            Body=data,
48            StorageClass='INTELLIGENT_TIERING',  # Auto-optimize costs
49        )
50        
51        return f"s3://{self.bucket}/{key}"
52    
53    def list_partitions(self, table: str, start_date: date, end_date: date,
54                       symbol: Optional[str] = None) -> List[str]:
55        """List all partitions in date range."""
56        # Build prefix for filtering
57        prefix = f"{self.prefix}/{table}/"
58        
59        # List all objects with pagination
60        paginator = self.s3.get_paginator('list_objects_v2')
61        
62        partitions = set()
63        for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
64            if 'Contents' not in page:
65                continue
66            
67            for obj in page['Contents']:
68                key = obj['Key']
69                
70                # Parse partition from path
71                parts = key.split('/')
72                partition_dict = {}
73                
74                for part in parts:
75                    if '=' in part:
76                        k, v = part.split('=')
77                        partition_dict[k] = v
78                
79                # Filter by date and symbol
80                if 'year' in partition_dict and 'month' in partition_dict and 'day' in partition_dict:
81                    partition_date = date(
82                        int(partition_dict['year']),
83                        int(partition_dict['month']),
84                        int(partition_dict['day'])
85                    )
86                    
87                    if start_date <= partition_date <= end_date:
88                        if symbol is None or partition_dict.get('symbol') == symbol:
89                            partitions.add(
90                                f"s3://{self.bucket}/{'/'.join(key.split('/')[:-1])}"
91                            )
92        
93        return sorted(partitions)
94

Kafka to S3 Ingestion#

python
1from kafka import KafkaConsumer
2import json
3from datetime import datetime, timedelta
4import threading
5import time
6
7class KafkaToS3Ingester:
8    """
9    Ingest data from Kafka to S3 in micro-batches.
10    
11    Buffers messages and flushes to S3 every N seconds or M messages.
12    """
13    
14    def __init__(self, kafka_topic: str, kafka_bootstrap: str,
15                 data_lake: S3DataLake, table_name: str,
16                 schema: pa.Schema,
17                 flush_interval_seconds: int = 60,
18                 flush_size: int = 100000):
19        self.consumer = KafkaConsumer(
20            kafka_topic,
21            bootstrap_servers=kafka_bootstrap,
22            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
23            auto_offset_reset='latest',
24            enable_auto_commit=True,
25        )
26        
27        self.data_lake = data_lake
28        self.table_name = table_name
29        self.schema = schema
30        self.flush_interval = flush_interval_seconds
31        self.flush_size = flush_size
32        
33        self.buffer = []
34        self.last_flush = time.time()
35        self.writer = ParquetWriter(schema)
36        
37        self.running = False
38        
39    def should_flush(self) -> bool:
40        """Check if buffer should be flushed."""
41        return (
42            len(self.buffer) >= self.flush_size or
43            time.time() - self.last_flush >= self.flush_interval
44        )
45    
46    def flush_buffer(self):
47        """Write buffer to S3 and clear."""
48        if not self.buffer:
49            return
50        
51        print(f"Flushing {len(self.buffer)} messages to S3...")
52        
53        # Group by date and symbol for partitioning
54        partitions = {}
55        for msg in self.buffer:
56            timestamp = datetime.fromtimestamp(msg['timestamp'] / 1e9)
57            symbol = msg.get('symbol')
58            
59            key = (timestamp.date(), symbol)
60            if key not in partitions:
61                partitions[key] = []
62            
63            partitions[key].append(msg)
64        
65        # Write each partition
66        for (date, symbol), messages in partitions.items():
67            # Convert to Parquet in memory
68            buffer = io.BytesIO()
69            table = pa.Table.from_pylist(messages, schema=self.schema)
70            pq.write_table(table, buffer, compression='snappy')
71            
72            # Upload to S3
73            timestamp = datetime.combine(date, datetime.min.time())
74            self.data_lake.write_partition(
75                table=self.table_name,
76                timestamp=timestamp,
77                data=buffer.getvalue(),
78                symbol=symbol
79            )
80        
81        self.buffer = []
82        self.last_flush = time.time()
83        
84        print(f"Flushed to {len(partitions)} partitions")
85    
86    def run(self):
87        """Start consuming and ingesting."""
88        self.running = True
89        
90        try:
91            for message in self.consumer:
92                if not self.running:
93                    break
94                
95                self.buffer.append(message.value)
96                
97                if self.should_flush():
98                    self.flush_buffer()
99                    
100        except KeyboardInterrupt:
101            print("Shutting down...")
102        finally:
103            self.flush_buffer()  # Final flush
104            self.consumer.close()
105    
106    def stop(self):
107        """Stop ingestion."""
108        self.running = False
109

Glue Catalog Integration#

python
1import boto3
2
3class GlueCatalogManager:
4    """Manage AWS Glue Data Catalog for S3 data lake."""
5    
6    def __init__(self, database: str = 'trading_data'):
7        self.glue = boto3.client('glue')
8        self.database = database
9        
10    def create_database(self):
11        """Create Glue database if not exists."""
12        try:
13            self.glue.create_database(
14                DatabaseInput={
15                    'Name': self.database,
16                    'Description': 'Trading data lake'
17                }
18            )
19        except self.glue.exceptions.AlreadyExistsException:
20            pass
21    
22    def create_ticks_table(self, s3_location: str):
23        """Create Glue table for tick data."""
24        self.glue.create_table(
25            DatabaseName=self.database,
26            TableInput={
27                'Name': 'ticks',
28                'StorageDescriptor': {
29                    'Columns': [
30                        {'Name': 'timestamp', 'Type': 'timestamp'},
31                        {'Name': 'symbol', 'Type': 'string'},
32                        {'Name': 'exchange', 'Type': 'string'},
33                        {'Name': 'price', 'Type': 'decimal(10,4)'},
34                        {'Name': 'size', 'Type': 'int'},
35                        {'Name': 'side', 'Type': 'string'},
36                    ],
37                    'Location': s3_location,
38                    'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
39                    'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
40                    'SerdeInfo': {
41                        'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
42                    }
43                },
44                'PartitionKeys': [
45                    {'Name': 'year', 'Type': 'int'},
46                    {'Name': 'month', 'Type': 'int'},
47                    {'Name': 'day', 'Type': 'int'},
48                    {'Name': 'symbol', 'Type': 'string'},
49                ],
50                'TableType': 'EXTERNAL_TABLE'
51            }
52        )
53    
54    def add_partitions(self, table: str, partitions: List[str]):
55        """Add partitions to table."""
56        partition_inputs = []
57        
58        for partition_path in partitions:
59            # Parse partition values from S3 path
60            # Example: s3://bucket/data-lake/ticks/year=2024/month=01/day=15/symbol=AAPL/
61            parts = partition_path.split('/')
62            
63            values = {}
64            for part in parts:
65                if '=' in part:
66                    k, v = part.split('=')
67                    values[k] = v
68            
69            partition_inputs.append({
70                'Values': [
71                    values.get('year', ''),
72                    values.get('month', ''),
73                    values.get('day', ''),
74                    values.get('symbol', ''),
75                ],
76                'StorageDescriptor': {
77                    'Columns': [
78                        {'Name': 'timestamp', 'Type': 'timestamp'},
79                        {'Name': 'price', 'Type': 'decimal(10,4)'},
80                        {'Name': 'size', 'Type': 'int'},
81                        {'Name': 'side', 'Type': 'string'},
82                    ],
83                    'Location': partition_path,
84                    'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
85                    'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
86                    'SerdeInfo': {
87                        'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
88                    }
89                }
90            })
91        
92        # Batch add partitions (max 100 per call)
93        for i in range(0, len(partition_inputs), 100):
94            batch = partition_inputs[i:i+100]
95            
96            self.glue.batch_create_partition(
97                DatabaseName=self.database,
98                TableName=table,
99                PartitionInputList=batch
100            )
101

Trino Queries#

sql
1-- Query tick data using partition pruning
2SELECT
3    symbol,
4    COUNT(*) AS tick_count,
5    MIN(price) AS low,
6    MAX(price) AS high,
7    AVG(price) AS avg_price
8FROM ticks
9WHERE year = 2024
10  AND month = 1
11  AND day = 15
12  AND symbol = 'AAPL'
13GROUP BY symbol;
14-- 2.3 seconds (scans only 1 partition, ~50MB)
15
16-- Multi-day VWAP calculation
17SELECT
18    DATE_TRUNC('hour', timestamp) AS hour,
19    symbol,
20    SUM(price * size) / SUM(size) AS vwap,
21    SUM(size) AS volume
22FROM ticks
23WHERE year = 2024
24  AND month = 1
25  AND day BETWEEN 1 AND 31
26  AND symbol IN ('AAPL', 'MSFT', 'GOOGL')
27GROUP BY hour, symbol
28ORDER BY hour, symbol;
29-- 45 seconds (scans 93 partitions, ~4.5GB)
30
31-- Trade reconstruction for compliance
32SELECT
33    t.timestamp AS trade_time,
34    t.trade_id,
35    t.symbol,
36    t.side,
37    t.price AS execution_price,
38    t.size,
39    q.price AS quote_price,
40    (t.price - q.price) AS slippage
41FROM trades t
42LEFT JOIN ticks q
43  ON t.symbol = q.symbol
44  AND q.timestamp = (
45      SELECT MAX(timestamp)
46      FROM ticks
47      WHERE symbol = t.symbol
48        AND timestamp <= t.timestamp
49  )
50WHERE t.year = 2024
51  AND t.month = 1
52  AND t.day = 15
53  AND t.order_id = 'ORD-12345';
54-- 0.8 seconds (partition pruning + predicate pushdown)
55

Production Results#

Storage Costs (15TB tick data, 7-year retention)#

plaintext
1Storage Tier              Monthly Cost    Query Latency
2──────────────────────────────────────────────────────────────
3S3 Standard               $345            2-5s
4S3 Intelligent Tiering    $280            2-5s
5S3 Glacier Instant        $150            2-5s
6S3 Glacier Flexible       $60             minutes-hours
7

We use S3 Intelligent Tiering: auto-transitions to cheaper tiers after 90 days.

Query Performance#

plaintext
1Query Type                    Athena    Trino    Spark
2───────────────────────────────────────────────────────────
3Point lookup (1 day)          2.3s      1.8s     4.5s
4Range scan (1 month)          45s       38s      82s
5Complex analytics (1 year)    8m 12s    6m 45s   4m 30s
6

Trino wins for ad-hoc queries. Spark wins for large batch jobs.

Compression#

plaintext
1Format       Size      Compression    Query Speed
2──────────────────────────────────────────────────────────
3CSV (raw)    15.2 TB   1.0x          Slow
4JSON         18.6 TB   0.82x         Very slow
5Parquet      2.1 TB    7.2x          Fast
6ORC          1.8 TB    8.4x          Fast
7

We chose Parquet: widely supported, good compression, fast queries.

Lessons Learned#

  1. Partition by time first: Date partitioning enables partition pruning (100x faster)
  2. High-cardinality last: Symbol partitioning creates too many small files
  3. Parquet > JSON: 7x smaller, 10x faster queries
  4. Dictionary encoding: Reduces repeated strings by 90%
  5. Row group size: 100k rows balances compression and query parallelism
  6. S3 Intelligent Tiering: Auto-saves 40% on rarely-accessed data
  7. Glue Catalog: Makes S3 queryable with SQL (Athena, Trino, Spark)
  8. Batch writes: Micro-batching (60s windows) prevents too many small files
  9. Lifecycle policies: Auto-delete temp data, archive old data to Glacier
  10. Statistics: Write Parquet statistics for predicate pushdown

Best Practices#

Partitioning#

python
1# Good: Date + symbol
2s3://bucket/ticks/year=2024/month=01/day=15/symbol=AAPL/file.parquet
3
4# Bad: Symbol first (too many partitions)
5s3://bucket/ticks/symbol=AAPL/year=2024/month=01/day=15/file.parquet
6
7# Bad: Too granular (millions of small files)
8s3://bucket/ticks/year=2024/month=01/day=15/hour=09/minute=30/symbol=AAPL/file.parquet
9

File Sizes#

python
1# Target: 128MB - 1GB per file
2# Too small (< 10MB): Query overhead from opening many files
3# Too large (> 2GB): Can't parallelize query effectively
4
5# Adjust flush_size and flush_interval to hit target
6ingester = KafkaToS3Ingester(
7    flush_interval_seconds=300,  # 5 minutes
8    flush_size=500000,  # ~128MB for tick data
9)
10

Schema Evolution#

python
1# Add columns (safe)
2new_schema = pa.schema([
3    # ... existing columns ...
4    ('new_column', pa.float64()),  # Defaults to null for old data
5])
6
7# Remove columns (safe, query-time projection)
8SELECT timestamp, price FROM ticks;  # Ignores other columns
9
10# Change column type (UNSAFE - requires rewrite)
11# Don't do this - instead add new column and backfill
12

Further Reading#

  • AWS S3 Data Lake Best Practices
  • Apache Parquet Documentation
  • Trino: The Definitive Guide
  • Building Data Lakes on AWS
NT

NordVarg Team

Technical Writer

NordVarg Team is a software engineer at NordVarg specializing in high-performance financial systems and type-safe programming.

data-lakes3parquettrinodatalake

Join 1,000+ Engineers

Get weekly insights on building high-performance financial systems, latest industry trends, and expert tips delivered straight to your inbox.

✓Weekly articles
✓Industry insights
✓No spam, ever

Related Posts

Dec 31, 2024•9 min read
Time-Series Databases Comparison for Trading
Data Engineeringtimescaledbinfluxdb
Dec 31, 2024•8 min read
Real-Time Data Quality Monitoring
Data Engineeringdata-qualitymonitoring
Nov 27, 2025•8 min read
Event Sourcing the Risk Engine: The Regulatory Audit That Saved $50M
Backend Engineeringevent-sourcingrisk-engine

Interested in working together?