E-commerce Domain¶
This document describes the E-commerce data domain, including data sources, processing pipelines, and analytical models for customer behavior and product analytics.
Overview¶
The E-commerce domain focuses on customer and product data, providing insights into customer behavior, product performance, order analytics, and business metrics. This domain demonstrates how to handle transactional e-commerce data in a modern data platform.
Data Sources¶
Mock E-commerce Data¶
Primary Source: Generated mock data - Data Format: CSV, JSON - Update Frequency: Daily - Data Volume: ~5K orders per day - Schema: Customer details, product information, order data
Data Schema¶
{
"customer_id": "string",
"customer_name": "string",
"email": "string",
"phone": "string",
"address": "string",
"city": "string",
"state": "string",
"zip_code": "string",
"customer_segment": "string",
"registration_date": "date",
"order_id": "string",
"order_date": "timestamp",
"product_id": "string",
"product_name": "string",
"category": "string",
"subcategory": "string",
"brand": "string",
"price": "decimal",
"quantity": "integer",
"discount": "decimal",
"total_amount": "decimal",
"payment_method": "string",
"shipping_method": "string",
"order_status": "string"
}
Data Pipeline¶
Bronze Layer (Raw Data)¶
Storage: MinIO datalake/bronze/ecommerce/
Format: CSV, JSON
Partitioning: By date (year=YYYY/month=MM/day=DD)
@asset
def ecommerce_bronze():
"""Ingest raw e-commerce data from mock sources"""
# Mock data generation
# Schema validation
# Data quality checks
pass
Silver Layer (Prepared Data)¶
Storage: MinIO datalake/silver/ecommerce/
Format: Delta Lake
Processing: Apache Spark
@asset(deps=[ecommerce_bronze])
def ecommerce_silver():
"""Clean and prepare e-commerce data"""
# Data cleaning
# Schema standardization
# Duplicate removal
# Data type conversion
# Customer segmentation
pass
Gold Layer (Analytics Data)¶
Storage: MinIO warehouse/ecommerce/
Format: Apache Iceberg
Processing: dbt transformations
-- Customers table
CREATE TABLE customers (
customer_id STRING,
customer_name STRING,
email STRING,
phone STRING,
address STRING,
city STRING,
state STRING,
zip_code STRING,
customer_segment STRING,
registration_date DATE,
created_at TIMESTAMP,
updated_at TIMESTAMP
) USING ICEBERG;
-- Products table
CREATE TABLE products (
product_id STRING,
product_name STRING,
category STRING,
subcategory STRING,
brand STRING,
price DECIMAL(10,2),
created_at TIMESTAMP,
updated_at TIMESTAMP
) USING ICEBERG;
-- Orders table
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
order_date TIMESTAMP,
total_amount DECIMAL(10,2),
payment_method STRING,
shipping_method STRING,
order_status STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
) USING ICEBERG
PARTITIONED BY (order_date);
-- Order items table
CREATE TABLE order_items (
order_id STRING,
product_id STRING,
quantity INT,
price DECIMAL(10,2),
discount DECIMAL(10,2),
total_amount DECIMAL(10,2),
created_at TIMESTAMP,
updated_at TIMESTAMP
) USING ICEBERG;
Analytical Models¶
Customer Analytics Model¶
Purpose: Customer behavior and segmentation Key Metrics: - Customer lifetime value - Purchase frequency - Customer segments - Retention rates
Product Performance Model¶
Purpose: Product sales and performance analysis Key Metrics: - Sales volume - Revenue contribution - Product categories - Brand performance
Order Analytics Model¶
Purpose: Order processing and fulfillment Key Metrics: - Order volume - Average order value - Payment methods - Shipping performance
Business Metrics Model¶
Purpose: Overall business performance Key Metrics: - Revenue trends - Growth rates - Market share - Operational efficiency
Data Quality¶
Validation Rules¶
# Data quality checks
def validate_ecommerce_data(df):
"""Validate e-commerce data quality"""
# Check required fields
required_fields = ['customer_id', 'order_id', 'product_id', 'order_date']
for field in required_fields:
assert df[field].isnull().sum() == 0, f"Missing values in {field}"
# Check data types
assert df['order_date'].dtype == 'timestamp', "Invalid order date format"
assert df['price'].dtype == 'decimal', "Invalid price format"
assert df['quantity'].dtype == 'integer', "Invalid quantity format"
# Check value ranges
assert df['price'].min() > 0, "Invalid price range"
assert df['quantity'].min() > 0, "Invalid quantity range"
assert df['total_amount'].min() >= 0, "Invalid total amount range"
# Check logical constraints
assert (df['total_amount'] == df['price'] * df['quantity'] - df['discount']).all(), "Invalid total calculation"
return True
Data Quality Metrics¶
- Completeness: Percentage of non-null values
- Accuracy: Data validation against source
- Consistency: Cross-field validation
- Timeliness: Data freshness tracking
Query Examples¶
Basic Queries¶
-- Top 10 customers by total spending
SELECT
c.customer_id,
c.customer_name,
c.customer_segment,
COUNT(o.order_id) as order_count,
SUM(o.total_amount) as total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY c.customer_id, c.customer_name, c.customer_segment
ORDER BY total_spent DESC
LIMIT 10;
-- Product performance by category
SELECT
p.category,
p.subcategory,
COUNT(DISTINCT oi.product_id) as product_count,
SUM(oi.quantity) as total_quantity,
SUM(oi.total_amount) as total_revenue,
AVG(oi.price) as avg_price
FROM products p
JOIN order_items oi ON p.product_id = oi.product_id
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY p.category, p.subcategory
ORDER BY total_revenue DESC;
Advanced Analytics¶
-- Customer lifetime value analysis
WITH customer_metrics AS (
SELECT
c.customer_id,
c.customer_name,
c.customer_segment,
c.registration_date,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(o.total_amount) as total_spent,
MIN(o.order_date) as first_order_date,
MAX(o.order_date) as last_order_date,
DATEDIFF(MAX(o.order_date), MIN(o.order_date)) as customer_lifespan_days
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.customer_name, c.customer_segment, c.registration_date
),
customer_segments AS (
SELECT
customer_segment,
COUNT(*) as customer_count,
AVG(total_orders) as avg_orders,
AVG(total_spent) as avg_spent,
AVG(customer_lifespan_days) as avg_lifespan_days,
AVG(total_spent / customer_lifespan_days) as daily_value
FROM customer_metrics
GROUP BY customer_segment
)
SELECT
customer_segment,
customer_count,
ROUND(avg_orders, 2) as avg_orders,
ROUND(avg_spent, 2) as avg_spent,
ROUND(avg_lifespan_days, 2) as avg_lifespan_days,
ROUND(daily_value, 2) as daily_value
FROM customer_segments
ORDER BY avg_spent DESC;
Performance Optimization¶
Partitioning Strategy¶
-- Partition by order date for time-based queries
PARTITIONED BY (order_date)
-- Additional partitioning for customer queries
PARTITIONED BY (order_date, customer_segment)
Indexing¶
-- Create indexes for common query patterns
CREATE INDEX idx_orders_customer ON orders (customer_id);
CREATE INDEX idx_orders_date ON orders (order_date);
CREATE INDEX idx_order_items_product ON order_items (product_id);
CREATE INDEX idx_customers_segment ON customers (customer_segment);
Caching¶
- Query Result Caching: Cache frequent query results
- Data Caching: Cache frequently accessed data
- Metadata Caching: Cache table metadata
Monitoring¶
Key Metrics¶
- Data Volume: Orders processed per day
- Data Quality: Quality score trends
- Processing Time: Pipeline execution duration
- Query Performance: Average query response time
Alerts¶
- Data Quality: Quality score below threshold
- Processing Delays: Pipeline execution time exceeded
- Data Volume: Unusual data volume changes
- Query Performance: Slow query detection
Business Use Cases¶
E-commerce Analytics¶
- Customer Analysis: Behavior patterns and segmentation
- Product Performance: Sales and inventory analysis
- Order Management: Processing and fulfillment
- Revenue Analysis: Financial performance tracking
Customer Insights¶
- Purchase Patterns: Buying behavior analysis
- Customer Segmentation: Demographic and behavioral segments
- Retention Analysis: Customer loyalty metrics
- Personalization: Recommendation engine data
Data Governance¶
Access Control¶
- Role-based Access: Different access levels for different users
- Data Classification: Sensitive data identification
- Audit Logging: All data access logged
- Compliance: GDPR and privacy regulations
Data Lineage¶
- Source Tracking: Data origin documentation
- Transformation Logging: All data transformations logged
- Quality Tracking: Data quality metrics over time
- Impact Analysis: Change impact assessment
Future Enhancements¶
Planned Features¶
- Real-time Analytics: Live customer behavior tracking
- ML Models: Recommendation and prediction models
- Personalization: Dynamic content and offers
- A/B Testing: Experimentation framework
Integration Opportunities¶
- External Data: Market data, competitor analysis
- ML Pipeline: Machine learning model integration
- API Development: Real-time data access APIs
- Dashboard Enhancement: Advanced visualization features
Created: October 3, 2025