Skip to content

E-commerce Domain

This document describes the E-commerce data domain, including data sources, processing pipelines, and analytical models for customer behavior and product analytics.

Overview

The E-commerce domain focuses on customer and product data, providing insights into customer behavior, product performance, order analytics, and business metrics. This domain demonstrates how to handle transactional e-commerce data in a modern data platform.

Data Sources

Mock E-commerce Data

Primary Source: Generated mock data - Data Format: CSV, JSON - Update Frequency: Daily - Data Volume: ~5K orders per day - Schema: Customer details, product information, order data

Data Schema

{
  "customer_id": "string",
  "customer_name": "string",
  "email": "string",
  "phone": "string",
  "address": "string",
  "city": "string",
  "state": "string",
  "zip_code": "string",
  "customer_segment": "string",
  "registration_date": "date",
  "order_id": "string",
  "order_date": "timestamp",
  "product_id": "string",
  "product_name": "string",
  "category": "string",
  "subcategory": "string",
  "brand": "string",
  "price": "decimal",
  "quantity": "integer",
  "discount": "decimal",
  "total_amount": "decimal",
  "payment_method": "string",
  "shipping_method": "string",
  "order_status": "string"
}

Data Pipeline

Bronze Layer (Raw Data)

Storage: MinIO datalake/bronze/ecommerce/ Format: CSV, JSON Partitioning: By date (year=YYYY/month=MM/day=DD)

@asset
def ecommerce_bronze():
    """Ingest raw e-commerce data from mock sources"""
    # Mock data generation
    # Schema validation
    # Data quality checks
    pass

Silver Layer (Prepared Data)

Storage: MinIO datalake/silver/ecommerce/ Format: Delta Lake Processing: Apache Spark

@asset(deps=[ecommerce_bronze])
def ecommerce_silver():
    """Clean and prepare e-commerce data"""
    # Data cleaning
    # Schema standardization
    # Duplicate removal
    # Data type conversion
    # Customer segmentation
    pass

Gold Layer (Analytics Data)

Storage: MinIO warehouse/ecommerce/ Format: Apache Iceberg Processing: dbt transformations

-- Customers table
CREATE TABLE customers (
    customer_id STRING,
    customer_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    zip_code STRING,
    customer_segment STRING,
    registration_date DATE,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
) USING ICEBERG;

-- Products table
CREATE TABLE products (
    product_id STRING,
    product_name STRING,
    category STRING,
    subcategory STRING,
    brand STRING,
    price DECIMAL(10,2),
    created_at TIMESTAMP,
    updated_at TIMESTAMP
) USING ICEBERG;

-- Orders table
CREATE TABLE orders (
    order_id STRING,
    customer_id STRING,
    order_date TIMESTAMP,
    total_amount DECIMAL(10,2),
    payment_method STRING,
    shipping_method STRING,
    order_status STRING,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
) USING ICEBERG
PARTITIONED BY (order_date);

-- Order items table
CREATE TABLE order_items (
    order_id STRING,
    product_id STRING,
    quantity INT,
    price DECIMAL(10,2),
    discount DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    created_at TIMESTAMP,
    updated_at TIMESTAMP
) USING ICEBERG;

Analytical Models

Customer Analytics Model

Purpose: Customer behavior and segmentation Key Metrics: - Customer lifetime value - Purchase frequency - Customer segments - Retention rates

Product Performance Model

Purpose: Product sales and performance analysis Key Metrics: - Sales volume - Revenue contribution - Product categories - Brand performance

Order Analytics Model

Purpose: Order processing and fulfillment Key Metrics: - Order volume - Average order value - Payment methods - Shipping performance

Business Metrics Model

Purpose: Overall business performance Key Metrics: - Revenue trends - Growth rates - Market share - Operational efficiency

Data Quality

Validation Rules

# Data quality checks
def validate_ecommerce_data(df):
    """Validate e-commerce data quality"""

    # Check required fields
    required_fields = ['customer_id', 'order_id', 'product_id', 'order_date']
    for field in required_fields:
        assert df[field].isnull().sum() == 0, f"Missing values in {field}"

    # Check data types
    assert df['order_date'].dtype == 'timestamp', "Invalid order date format"
    assert df['price'].dtype == 'decimal', "Invalid price format"
    assert df['quantity'].dtype == 'integer', "Invalid quantity format"

    # Check value ranges
    assert df['price'].min() > 0, "Invalid price range"
    assert df['quantity'].min() > 0, "Invalid quantity range"
    assert df['total_amount'].min() >= 0, "Invalid total amount range"

    # Check logical constraints
    assert (df['total_amount'] == df['price'] * df['quantity'] - df['discount']).all(), "Invalid total calculation"

    return True

Data Quality Metrics

  • Completeness: Percentage of non-null values
  • Accuracy: Data validation against source
  • Consistency: Cross-field validation
  • Timeliness: Data freshness tracking

Query Examples

Basic Queries

-- Top 10 customers by total spending
SELECT 
    c.customer_id,
    c.customer_name,
    c.customer_segment,
    COUNT(o.order_id) as order_count,
    SUM(o.total_amount) as total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY c.customer_id, c.customer_name, c.customer_segment
ORDER BY total_spent DESC
LIMIT 10;

-- Product performance by category
SELECT 
    p.category,
    p.subcategory,
    COUNT(DISTINCT oi.product_id) as product_count,
    SUM(oi.quantity) as total_quantity,
    SUM(oi.total_amount) as total_revenue,
    AVG(oi.price) as avg_price
FROM products p
JOIN order_items oi ON p.product_id = oi.product_id
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY p.category, p.subcategory
ORDER BY total_revenue DESC;

Advanced Analytics

-- Customer lifetime value analysis
WITH customer_metrics AS (
    SELECT 
        c.customer_id,
        c.customer_name,
        c.customer_segment,
        c.registration_date,
        COUNT(DISTINCT o.order_id) as total_orders,
        SUM(o.total_amount) as total_spent,
        MIN(o.order_date) as first_order_date,
        MAX(o.order_date) as last_order_date,
        DATEDIFF(MAX(o.order_date), MIN(o.order_date)) as customer_lifespan_days
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id, c.customer_name, c.customer_segment, c.registration_date
),
customer_segments AS (
    SELECT 
        customer_segment,
        COUNT(*) as customer_count,
        AVG(total_orders) as avg_orders,
        AVG(total_spent) as avg_spent,
        AVG(customer_lifespan_days) as avg_lifespan_days,
        AVG(total_spent / customer_lifespan_days) as daily_value
    FROM customer_metrics
    GROUP BY customer_segment
)
SELECT 
    customer_segment,
    customer_count,
    ROUND(avg_orders, 2) as avg_orders,
    ROUND(avg_spent, 2) as avg_spent,
    ROUND(avg_lifespan_days, 2) as avg_lifespan_days,
    ROUND(daily_value, 2) as daily_value
FROM customer_segments
ORDER BY avg_spent DESC;

Performance Optimization

Partitioning Strategy

-- Partition by order date for time-based queries
PARTITIONED BY (order_date)

-- Additional partitioning for customer queries
PARTITIONED BY (order_date, customer_segment)

Indexing

-- Create indexes for common query patterns
CREATE INDEX idx_orders_customer ON orders (customer_id);
CREATE INDEX idx_orders_date ON orders (order_date);
CREATE INDEX idx_order_items_product ON order_items (product_id);
CREATE INDEX idx_customers_segment ON customers (customer_segment);

Caching

  • Query Result Caching: Cache frequent query results
  • Data Caching: Cache frequently accessed data
  • Metadata Caching: Cache table metadata

Monitoring

Key Metrics

  • Data Volume: Orders processed per day
  • Data Quality: Quality score trends
  • Processing Time: Pipeline execution duration
  • Query Performance: Average query response time

Alerts

  • Data Quality: Quality score below threshold
  • Processing Delays: Pipeline execution time exceeded
  • Data Volume: Unusual data volume changes
  • Query Performance: Slow query detection

Business Use Cases

E-commerce Analytics

  • Customer Analysis: Behavior patterns and segmentation
  • Product Performance: Sales and inventory analysis
  • Order Management: Processing and fulfillment
  • Revenue Analysis: Financial performance tracking

Customer Insights

  • Purchase Patterns: Buying behavior analysis
  • Customer Segmentation: Demographic and behavioral segments
  • Retention Analysis: Customer loyalty metrics
  • Personalization: Recommendation engine data

Data Governance

Access Control

  • Role-based Access: Different access levels for different users
  • Data Classification: Sensitive data identification
  • Audit Logging: All data access logged
  • Compliance: GDPR and privacy regulations

Data Lineage

  • Source Tracking: Data origin documentation
  • Transformation Logging: All data transformations logged
  • Quality Tracking: Data quality metrics over time
  • Impact Analysis: Change impact assessment

Future Enhancements

Planned Features

  • Real-time Analytics: Live customer behavior tracking
  • ML Models: Recommendation and prediction models
  • Personalization: Dynamic content and offers
  • A/B Testing: Experimentation framework

Integration Opportunities

  • External Data: Market data, competitor analysis
  • ML Pipeline: Machine learning model integration
  • API Development: Real-time data access APIs
  • Dashboard Enhancement: Advanced visualization features

Last update: October 3, 2025
Created: October 3, 2025