A global logistics company managing 50,000+ daily shipments across 120 countries faced massive inefficiencies. Manual route planning, lack of real-time visibility, and reactive inventory management were costing $200M annually and damaging customer relationships.
NordVarg built a comprehensive supply chain optimization platform combining route optimization algorithms, predictive analytics, and real-time tracking to transform operations and deliver measurable ROI within 6 months.
- Manual route planning taking 4-6 hours per day
- No real-time visibility into shipment locations
- Reactive inventory management leading to stockouts and overstock
- 78% on-time delivery rate (industry average: 85%)
- Paper-based processes for customs and documentation
- $200M annual losses from inefficiency
- Customer churn at 15% annually
- Contract penalties for late deliveries
- Excess inventory worth $500M
- Inability to scale operations
- Integration with 200+ carrier systems
- Real-time tracking across multiple transport modes
- Complex optimization with 1000s of constraints
- Legacy systems from acquisitions
- Data quality issues across sources
from typing import List, Dict
import asyncio
from datetime import datetime
class DataIntegrationPipeline:
def __init__(self):
self.carriers = self.load_carrier_integrations()
self.warehouse_systems = self.load_warehouse_systems()
self.kafka_producer = KafkaProducer()
async def sync_shipment_data(self) -> None:
"""Sync data from all carrier systems"""
tasks = []
for carrier in self.carriers:
task = self.sync_carrier(carrier)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for carrier, result in zip(self.carriers, results):
if isinstance(result, Exception):
logger.error(f"Failed to sync {carrier.name}: {result}")
continue
# Publish to Kafka
for shipment in result:
await self.kafka_producer.send(
'shipments',
key=shipment['tracking_number'],
value=shipment
)
async def sync_carrier(self, carrier: Carrier) -> List[Dict]:
"""Sync data from specific carrier"""
# Authenticate
token = await carrier.authenticate()
# Fetch shipments updated since last sync
last_sync = await self.get_last_sync_time(carrier.id)
shipments = await carrier.fetch_shipments(
since=last_sync,
token=token
)
# Normalize data format
normalized = [
self.normalize_shipment(s, carrier)
for s in shipments
]
# Update last sync time
await self.update_last_sync_time(carrier.id, datetime.now())
return normalized
def normalize_shipment(self, raw: Dict, carrier: Carrier) -> Dict:
"""Convert carrier-specific format to standard format"""
# Each carrier has different field names
mapping = carrier.field_mapping
return {
'tracking_number': raw[mapping['tracking_number']],
'origin': self.normalize_location(raw[mapping['origin']]),
'destination': self.normalize_location(raw[mapping['destination']]),
'status': self.normalize_status(raw[mapping['status']]),
'current_location': self.parse_location(raw.get(mapping['location'])),
'estimated_delivery': self.parse_datetime(raw[mapping['eta']]),
'carrier': carrier.name,
'service_level': raw.get(mapping['service'], 'STANDARD'),
'weight': float(raw.get(mapping['weight'], 0)),
'dimensions': self.parse_dimensions(raw.get(mapping['dimensions'])),
'last_updated': datetime.now().isoformat()
}
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import numpy as np
class RouteOptimizer:
def __init__(self):
self.geocoder = Geocoder()
self.traffic_predictor = TrafficPredictor()
async def optimize_routes(
self,
shipments: List[Shipment],
vehicles: List[Vehicle],
constraints: RouteConstraints
) -> List[Route]:
"""Optimize delivery routes using constraint programming"""
# Build distance matrix
distance_matrix = await self.build_distance_matrix(
shipments,
vehicles
)
# Create routing model
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix),
len(vehicles),
[v.depot_index for v in vehicles]
)
routing = pywrapcp.RoutingModel(manager)
# Define cost function
def distance_callback(from_index, to_index):
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return distance_matrix[from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(
distance_callback
)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# Add capacity constraints
def demand_callback(from_index):
from_node = manager.IndexToNode(from_index)
return shipments[from_node].weight
demand_callback_index = routing.RegisterUnaryTransitCallback(
demand_callback
)
routing.AddDimensionWithVehicleCapacity(
demand_callback_index,
0, # null capacity slack
[v.capacity for v in vehicles],
True, # start cumul to zero
'Capacity'
)
# Add time window constraints
self.add_time_windows(routing, manager, shipments)
# Set search parameters
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
search_parameters.time_limit.seconds = 30
# Solve
solution = routing.SolveWithParameters(search_parameters)
if solution:
return self.extract_routes(routing, manager, solution, vehicles)
else:
raise OptimizationError("No solution found")
async def build_distance_matrix(
self,
shipments: List[Shipment],
vehicles: List[Vehicle]
) -> np.ndarray:
"""Build distance/time matrix with traffic predictions"""
# Get all unique locations
locations = set()
for shipment in shipments:
locations.add(shipment.pickup_location)
locations.add(shipment.delivery_location)
for vehicle in vehicles:
locations.add(vehicle.depot_location)
locations = list(locations)
n = len(locations)
# Initialize matrix
matrix = np.zeros((n, n))
# Calculate distances with traffic
for i, origin in enumerate(locations):
for j, destination in enumerate(locations):
if i == j:
continue
# Get predicted travel time
travel_time = await self.traffic_predictor.predict_travel_time(
origin,
destination,
departure_time=datetime.now()
)
matrix[i][j] = travel_time
return matrix
import tensorflow as tf
from tensorflow import keras
import pandas as pd
class DemandForecaster:
def __init__(self):
self.model = self.build_model()
self.feature_engineer = FeatureEngineer()
def build_model(self) -> keras.Model:
"""Build LSTM model for demand forecasting"""
model = keras.Sequential([
keras.layers.LSTM(128, return_sequences=True, input_shape=(30, 50)),
keras.layers.Dropout(0.2),
keras.layers.LSTM(64, return_sequences=True),
keras.layers.Dropout(0.2),
keras.layers.LSTM(32),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(1)
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae', 'mape']
)
return model
async def forecast_demand(
self,
product_id: str,
location_id: str,
horizon_days: int = 30
) -> pd.DataFrame:
"""Forecast demand for product at location"""
# Get historical data
history = await self.get_historical_demand(
product_id,
location_id,
days=365
)
# Engineer features
features = self.feature_engineer.create_features(history)
# Prepare sequences
X = self.create_sequences(features, lookback=30)
# Generate forecast
predictions = []
current_sequence = X[-1:]
for day in range(horizon_days):
# Predict next day
pred = self.model.predict(current_sequence, verbose=0)
predictions.append(pred[0, 0])
# Update sequence
new_row = np.append(current_sequence[0, 1:], pred[0])
current_sequence = new_row.reshape(1, 30, -1)
# Create forecast dataframe
forecast_dates = pd.date_range(
start=history.index[-1] + pd.Timedelta(days=1),
periods=horizon_days
)
forecast = pd.DataFrame({
'date': forecast_dates,
'predicted_demand': predictions,
'confidence_lower': [p * 0.8 for p in predictions],
'confidence_upper': [p * 1.2 for p in predictions]
})
return forecast
def create_features(self, data: pd.DataFrame) -> pd.DataFrame:
"""Create time series features"""
data = data.copy()
# Temporal features
data['day_of_week'] = data.index.dayofweek
data['day_of_month'] = data.index.day
data['month'] = data.index.month
data['quarter'] = data.index.quarter
data['is_weekend'] = data['day_of_week'].isin([5, 6]).astype(int)
# Lag features
for lag in [1, 7, 14, 30]:
data[f'demand_lag_{lag}'] = data['demand'].shift(lag)
# Rolling statistics
for window in [7, 14, 30]:
data[f'demand_rolling_mean_{window}'] = (
data['demand'].rolling(window).mean()
)
data[f'demand_rolling_std_{window}'] = (
data['demand'].rolling(window).std()
)
# Holiday indicator
data['is_holiday'] = data.index.isin(self.get_holidays())
return data.dropna()
import { ApolloServer, gql } from 'apollo-server';
import DataLoader from 'dataloader';
const typeDefs = gql`
type Shipment {
id: ID!
trackingNumber: String!
origin: Location!
destination: Location!
currentLocation: Location
status: ShipmentStatus!
estimatedDelivery: DateTime!
actualDelivery: DateTime
carrier: Carrier!
route: Route
events: [TrackingEvent!]!
}
type Route {
id: ID!
stops: [Stop!]!
totalDistance: Float!
estimatedDuration: Int!
optimizationScore: Float!
}
type Location {
latitude: Float!
longitude: Float!
address: String!
city: String!
country: String!
}
enum ShipmentStatus {
PENDING
IN_TRANSIT
OUT_FOR_DELIVERY
DELIVERED
DELAYED
EXCEPTION
}
type Query {
shipment(trackingNumber: String!): Shipment
shipments(
status: ShipmentStatus
carrier: String
limit: Int
offset: Int
): [Shipment!]!
routeOptimization(
shipmentIds: [ID!]!
vehicleIds: [ID!]!
): [Route!]!
demandForecast(
productId: ID!
locationId: ID!
days: Int!
): [DemandPrediction!]!
}
type Subscription {
shipmentUpdated(trackingNumber: String!): Shipment!
routeOptimized(routeId: ID!): Route!
}
`;
const resolvers = {
Query: {
shipment: async (_, { trackingNumber }, { dataSources }) => {
return dataSources.shipmentAPI.getByTrackingNumber(trackingNumber);
},
shipments: async (_, args, { dataSources }) => {
return dataSources.shipmentAPI.getShipments(args);
},
routeOptimization: async (_, args, { dataSources }) => {
const optimizer = new RouteOptimizer();
return optimizer.optimize(args);
},
demandForecast: async (_, args, { dataSources }) => {
const forecaster = new DemandForecaster();
return forecaster.forecast(args);
}
},
Subscription: {
shipmentUpdated: {
subscribe: (_, { trackingNumber }, { pubsub }) => {
return pubsub.asyncIterator(`SHIPMENT_${trackingNumber}`);
}
}
},
Shipment: {
events: async (shipment, _, { eventLoader }) => {
return eventLoader.load(shipment.id);
}
}
};
| Category | Annual Before | Annual After | Savings |
|---|
| Transportation | $200M | $140M | $60M |
| Inventory Carrying | $80M | $48M | $32M |
| Warehousing | $50M | $38M | $12M |
| Labor | $70M | $56M | $14M |
| Penalties | $20M | $2M | $18M |
| Total | $420M | $284M | $136M |
- On-time delivery: 78% → 95%
- Route planning time: 4-6 hours → 15 minutes
- Fuel efficiency: 20% improvement
- Vehicle utilization: 65% → 87%
- Inventory turnover: 8x → 12x per year
- Customer satisfaction: +45 points (NPS)
- Customer churn: 15% → 4%
- New customer acquisition: +35%
- Contract renewals: 92% → 98%
- Revenue growth: +28% year-over-year
┌─────────────────────────────────────┐
│ Data Sources │
│ - Carrier APIs (200+) │
│ - IoT sensors │
│ - Weather data │
│ - Traffic data │
└──────────────┬──────────────────────┘
↓
┌─────────────────────────────────────┐
│ Data Pipeline (Apache Kafka) │
│ - Real-time ingestion │
│ - Stream processing │
│ - Event sourcing │
└──────────────┬──────────────────────┘
↓
┌─────────────────────────────────────┐
│ Analytics Engine │
│ - Route optimization │
│ - Demand forecasting │
│ - Anomaly detection │
└──────────────┬──────────────────────┘
↓
┌─────────────────────────────────────┐
│ API Layer (GraphQL) │
│ - Real-time queries │
│ - Subscriptions │
│ - Batch operations │
└──────────────┬──────────────────────┘
↓
┌─────────────────────────────────────┐
│ Frontend (React) │
│ - Control center dashboard │
│ - Mobile apps │
│ - Customer portal │
└─────────────────────────────────────┘
Problem: 200+ different carrier APIs with varying formats
Solution: Adapter pattern with automatic schema detection
Result: 95% of integrations automated
Problem: Optimizing 50K+ shipments daily in real-time
Solution: Micro-batching, caching, incremental updates
Result: Less than 30 second optimization time
Problem: 15-year-old warehouse management systems
Solution: Event-driven architecture with message queues
Result: Zero-downtime migration
"This platform has revolutionized our operations. We're saving over $100M annually while delivering better service than ever. The real-time visibility and optimization capabilities give us a competitive edge we never had before. NordVarg delivered on time and exceeded expectations."
— Chief Supply Chain Officer, Global Logistics Company
- Integration is critical - Supply chain success depends on data integration
- Real-time matters - Delays in information = delays in delivery
- Optimization at scale - Smart algorithms can save millions
- Predictive > Reactive - Forecasting prevents problems before they occur
- User experience - Complex systems need intuitive interfaces
- Incremental deployment - Phase rollout reduces risk
Need to optimize your supply chain operations? Get in touch to discuss how we can help reduce costs and improve efficiency.
Project Duration: 6 months
Team Size: 10 engineers
Technologies: Python, Go, PostgreSQL, Kafka, React
Industry: Logistics & Supply Chain
Location: Global