Skip to content

πŸ“Š Data Models DocumentationΒΆ

OverviewΒΆ

The Iceberg Data Engineering Platform implements a comprehensive data modeling strategy using Apache Iceberg table format with dbt transformations. The system follows a layered architecture with raw data ingestion, staging transformations, and analytical marts.

Data ArchitectureΒΆ

graph TD
    A[Raw Data Sources] --> B[MinIO Bronze Storage]
    B --> C[Spark Processing]
    C --> D[MinIO Prepare Storage]
    D --> E[Delta Tables]
    E --> F[dbt Transformations]
    F --> G[Iceberg Tables in Warehouse]

    subgraph "Data Layers"
        H[Bronze Layer: CSV/JSON/Parquet]
        I[Prepare Layer: Delta Tables]
        J[Analytics Layer: Iceberg Tables]
    end

    subgraph "Storage Buckets"
        K[datalake/bronze: Raw Data]
        L[datalake/prepare: Delta Tables]
        M[warehouse: Iceberg Tables]
    end

    subgraph "Trino Catalogs"
        N[spark_catalog: Preparation Tasks]
        O[flight_radar: Flight Analytics]
        P[ecommerce: E-commerce Analytics]
        Q[asset_property: Property Analytics]
    end

    B --> H
    D --> I
    E --> I
    F --> J
    G --> M

    B --> K
    D --> L
    G --> M

    E --> N
    G --> O
    G --> P
    G --> Q

Storage ArchitectureΒΆ

MinIO Bucket StructureΒΆ

The platform uses three main MinIO buckets for different data layers:

1. Bronze Layer (datalake/bronze)ΒΆ

  • Purpose: Raw data storage from external APIs
  • Formats: CSV, JSON, Parquet files
  • Structure: datalake/bronze/{domain}/{entity}/
  • Examples:
  • datalake/bronze/flight_radar/airlines/
  • datalake/bronze/ecommerce/products/
  • datalake/bronze/asset_property/sales/

2. Prepare Layer (datalake/prepare)ΒΆ

  • Purpose: Processed Delta tables from Spark preparation tasks
  • Formats: Delta Lake tables
  • Structure: datalake/prepare/{domain}_prepared/{entity}/
  • Examples:
  • datalake/prepare/flight_radar_prepared/airline/
  • datalake/prepare/ecommerce_prepared/product/
  • datalake/prepare/asset_property_prepared/sale/

3. Warehouse Layer (warehouse)ΒΆ

  • Purpose: Final Iceberg tables generated by dbt transformations
  • Formats: Apache Iceberg tables
  • Structure: warehouse/{domain}/{schema}/{table}/
  • Examples:
  • warehouse/flight_radar/marts/company_with_the_most_active_flight/
  • warehouse/ecommerce/marts/customer_analytics/
  • warehouse/asset_property/marts/property_market_trends/

Trino Catalog StructureΒΆ

The platform uses multiple Trino catalogs for different purposes:

1. Spark Catalog (spark_catalog)ΒΆ

  • Purpose: Access to Delta tables from preparation tasks
  • Connector: Delta Lake connector
  • Tables: All prepared Delta tables
  • Usage: Spark preparation operations and initial data access

2. Domain-Specific CatalogsΒΆ

Each business domain has its own Iceberg catalog:

Flight Radar Catalog (flight_radar)ΒΆ
  • Purpose: Flight analytics and reporting
  • Connector: Iceberg connector
  • Schemas: prepared, intermediate, marts
  • Tables: All flight-related analytical models
E-commerce Catalog (ecommerce)ΒΆ
  • Purpose: E-commerce analytics and reporting
  • Connector: Iceberg connector
  • Schemas: prepared, intermediate, marts
  • Tables: All e-commerce analytical models
Asset Property Catalog (asset_property)ΒΆ
  • Purpose: Real estate analytics and reporting
  • Connector: Iceberg connector
  • Schemas: prepared, intermediate, marts
  • Tables: All property-related analytical models

Data DomainsΒΆ

1. Flight Radar DomainΒΆ

Bronze Layer (Raw Data)ΒΆ

Located in datalake/bronze/flight_radar/:

  • airlines/: Raw airline data from FlightRadar24 API (JSON/CSV)
  • airports/: Raw airport data from FlightRadar24 API (JSON/CSV)
  • aircraft/: Raw aircraft data from FlightRadar24 API (JSON/CSV)
  • flights/: Raw flight data from FlightRadar24 API (JSON/CSV)

Prepare Layer (Delta Tables)ΒΆ

Located in datalake/prepare/flight_radar_prepared/:

  • airline: Processed airline information and metadata
  • airport: Processed airport details and location data
  • aircraft: Processed aircraft specifications and models
  • complete_flight: Processed flight records with all details
  • missing_flight: Processed flight records with missing or incomplete data

Staging Layer (Intermediate)ΒΆ

explode_complete_flightΒΆ
-- Materialized as incremental table
-- Partitioned by: on_ground
-- Unique key: ['id', 'time']
-- Location: s3a://datalake/intermediate/flight_radar/explode_complete_flight

SELECT
    ID,
    ALTITUDE,
    LATITUDE,
    LONGITUDE,
    HEADING,
    ICAO_24BIT,
    VERTICAL_SPEED,
    GROUND_SPEED,
    NUMBER,
    SQUAWK,
    CALLSIGN,
    REGISTRATION,
    ON_GROUND,
    TIME,
    -- Departure airport details
    DEPARTURE.IATA AS DEPARTURE_IATA,
    DEPARTURE.LATITUDE AS DEPARTURE_LATITUDE,
    DEPARTURE.LONGITUDE AS DEPARTURE_LONGITUDE,
    DEPARTURE.NAME AS DEPARTURE_NAME,
    DEPARTURE.COUNTRY_NAME AS DEPARTURE_COUNTRY_NAME,
    -- Arrival airport details
    ARRIVAL.IATA AS ARRIVAL_IATA,
    ARRIVAL.LATITUDE AS ARRIVAL_LATITUDE,
    ARRIVAL.LONGITUDE AS ARRIVAL_LONGITUDE,
    ARRIVAL.NAME AS ARRIVAL_NAME,
    ARRIVAL.COUNTRY_NAME AS ARRIVAL_COUNTRY_NAME,
    -- Airline details
    AIRLINE.IATA AS AIRLINE_IATA,
    AIRLINE.NAME AS AIRLINE_NAME,
    AIRLINE.ICAO AS AIRLINE_ICAO,
    -- Aircraft details
    AIRCRAFT.CODE AS AIRCRAFT_CODE,
    MODEL_MISSING_DATA,
    __TIMESTAMP,
    -- Derived fields
    CASE
        WHEN AIRCRAFT.MODEL IS NOT NULL THEN split_part(AIRCRAFT.MODEL, ' ', 1)
        ELSE NULL
    END AS AIRCRAFT_MANUFACTURER
FROM {{ source('flight_radar_prepared', 'complete_flight') }}
explode_and_enrich_missing_flightΒΆ
-- Handles flight records with missing data
-- Enriches missing flight information where possible
-- Materialized as incremental table
full_flight_obtΒΆ
-- One Big Table (OBT) combining complete and missing flights
-- Provides unified view of all flight data
-- Materialized as table

Mart Layer (Analytics)ΒΆ

Located in warehouse/flight_radar/marts/:

company_with_the_most_active_flightΒΆ
-- Top 10 airlines by flight count
-- Materialized as Iceberg table
-- Location: warehouse/flight_radar/marts/company_with_the_most_active_flight
-- Catalog: flight_radar
-- Schema: marts

{{ config(
    materialized='table',
    alias='company_with_the_most_active_flight',
    tags=['company_with_the_most_active_flight'],
    database='flight_radar',
    schema='marts',
    location_root='s3a://warehouse/flight_radar/marts/company_with_the_most_active_flight'
) }}

SELECT
    AIRLINE_NAME,
    AIRLINE_IATA,
    COUNT(*) AS TOTAL_FLIGHTS
FROM {{ ref('full_flight_obt') }}
WHERE AIRLINE_NAME IS NOT NULL
GROUP BY AIRLINE_NAME, AIRLINE_IATA
ORDER BY TOTAL_FLIGHTS DESC
LIMIT 10
airport_with_the_largest_traffic_imbalanceΒΆ
-- Identifies airports with significant departure/arrival imbalances
-- Useful for capacity planning and route optimization
companies_with_the_most_regional_flightΒΆ
-- Airlines operating the most regional flights
-- Focuses on short-haul domestic operations
company_builder_with_the_most_active_flightΒΆ
-- Aircraft manufacturers with most active flights
-- Provides insights into fleet composition
longest_active_flightΒΆ
-- Identifies the longest currently active flights
-- Real-time flight tracking insights
longest_flight_in_each_continentΒΆ
-- Geographic analysis of longest flights by continent
-- Route planning and market analysis

2. E-commerce DomainΒΆ

Source TablesΒΆ

Located in spark_catalog.ecommerce_prepared schema:

  • product: Product catalog and specifications
  • customer: Customer information and demographics

Staging LayerΒΆ

customer_order (Intermediate)ΒΆ
-- Combines customer and order data
-- Provides unified customer transaction view
-- Materialized as table

Mart LayerΒΆ

Located in warehouse/ecommerce/marts/:

customer_analyticsΒΆ
-- Customer behavior and purchasing patterns
-- Segmentation and lifetime value analysis
-- Materialized as Iceberg table
-- Location: warehouse/ecommerce/marts/customer_analytics
-- Catalog: ecommerce
-- Schema: marts

{{ config(
    materialized='table',
    alias='customer_analytics',
    tags=['customer_analytics'],
    database='ecommerce',
    schema='marts',
    location_root='s3a://warehouse/ecommerce/marts/customer_analytics'
) }}

SELECT
    customer_id,
    customer_name,
    COUNT(*) as total_orders,
    SUM(order_amount) as total_spent,
    AVG(order_amount) as avg_order_value,
    MAX(order_date) as last_order_date
FROM {{ ref('customer_order') }}
GROUP BY customer_id, customer_name
ORDER BY total_spent DESC

3. Asset Property DomainΒΆ

Source TablesΒΆ

Located in spark_catalog.asset_property_prepared schema:

  • real_estate_sales: Property sales transactions and details

Staging LayerΒΆ

stg_real_estate_salesΒΆ
-- Standardized real estate sales data
-- Cleansed and validated property information
-- Materialized as table

Mart LayerΒΆ

Located in warehouse/asset_property/marts/:

-- Market analysis and trend identification
-- Price analysis and forecasting
-- Materialized as Iceberg table
-- Location: warehouse/asset_property/marts/property_market_trends
-- Catalog: asset_property
-- Schema: marts

{{ config(
    materialized='table',
    alias='property_market_trends',
    tags=['property_market_trends'],
    database='asset_property',
    schema='marts',
    location_root='s3a://warehouse/asset_property/marts/property_market_trends'
) }}

SELECT
    property_type,
    location,
    AVG(price) as avg_price,
    COUNT(*) as property_count,
    MIN(price) as min_price,
    MAX(price) as max_price,
    STDDEV(price) as price_volatility
FROM {{ ref('stg_real_estate_sales') }}
GROUP BY property_type, location
ORDER BY avg_price DESC

Data Model PatternsΒΆ

1. Incremental ProcessingΒΆ

-- Incremental model configuration
{{
  config(
    materialized='incremental',
    unique_key=['id', 'time'],
    partition_by=['on_ground']
  )
}}

-- Incremental logic
{% if is_incremental() %}
WHERE __TIMESTAMP > (SELECT MAX(__TIMESTAMP) FROM {{ this }})
{% endif %}

2. Partitioning StrategyΒΆ

-- Partition by business-relevant columns
partition_by=['on_ground']           -- Flight status
partition_by=['sale_date']           -- Time-based partitioning
partition_by=['customer_segment']    -- Business logic partitioning

3. Data Quality ChecksΒΆ

-- Built-in dbt tests
-- models/schema.yml
models:
  - name: explode_complete_flight
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: latitude
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: -90
              max_value: 90

Schema EvolutionΒΆ

Apache Iceberg Schema EvolutionΒΆ

-- Add new column
ALTER TABLE flight_radar.intermediate.explode_complete_flight 
ADD COLUMN new_field STRING;

-- Rename column
ALTER TABLE flight_radar.intermediate.explode_complete_flight 
RENAME COLUMN old_field TO new_field;

-- Drop column
ALTER TABLE flight_radar.intermediate.explode_complete_flight 
DROP COLUMN unused_field;

dbt Schema EvolutionΒΆ

# schema.yml
models:
  - name: explode_complete_flight
    columns:
      - name: id
        description: "Unique flight identifier"
        tests:
          - unique
          - not_null
      - name: new_field
        description: "New field added in schema evolution"
        tests:
          - not_null

Data LineageΒΆ

dbt LineageΒΆ

graph TD
    A[flight_radar_prepared.complete_flight] --> B[explode_complete_flight]
    A --> C[explode_and_enrich_missing_flight]
    B --> D[full_flight_obt]
    C --> D
    D --> E[company_with_the_most_active_flight]
    D --> F[airport_with_the_largest_traffic_imbalance]
    D --> G[companies_with_the_most_regional_flight]

Asset DependenciesΒΆ

# Dagster asset dependencies
@asset(deps=["explode_complete_flight", "explode_and_enrich_missing_flight"])
def full_flight_obt():
    """Combines complete and missing flight data"""
    pass

@asset(deps=["full_flight_obt"])
def company_with_the_most_active_flight():
    """Top airlines by flight count"""
    pass

Data Quality FrameworkΒΆ

1. Schema ValidationΒΆ

# Pydantic models for data validation
class FlightModel(BaseModel):
    id: str
    latitude: float = Field(ge=-90, le=90)
    longitude: float = Field(ge=-180, le=180)
    altitude: int = Field(ge=0, le=50000)
    ground_speed: int = Field(ge=0, le=1000)

2. Data Quality TestsΒΆ

-- dbt tests
-- tests/assert_flight_data_quality.sql
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE latitude < -90 OR latitude > 90
   OR longitude < -180 OR longitude > 180
   OR altitude < 0 OR altitude > 50000

3. Anomaly DetectionΒΆ

-- Statistical anomaly detection
WITH flight_stats AS (
    SELECT 
        AVG(ground_speed) as avg_speed,
        STDDEV(ground_speed) as speed_stddev
    FROM {{ ref('explode_complete_flight') }}
)
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE ground_speed > (
    SELECT avg_speed + 3 * speed_stddev 
    FROM flight_stats
)

Performance OptimizationΒΆ

1. Partitioning StrategyΒΆ

-- Time-based partitioning for temporal data
partition_by=['time']

-- Business logic partitioning
partition_by=['on_ground', 'airline_iata']

-- Geographic partitioning
partition_by=['departure_country_name']

2. ClusteringΒΆ

-- Cluster by frequently queried columns
CLUSTER BY airline_iata, departure_iata, arrival_iata

3. IndexingΒΆ

-- Create indexes for common query patterns
CREATE INDEX idx_flight_time ON explode_complete_flight (time);
CREATE INDEX idx_flight_airline ON explode_complete_flight (airline_iata);

Data GovernanceΒΆ

1. Data ClassificationΒΆ

# Data sensitivity levels
sensitive_data:
  - customer_personal_info
  - financial_transactions
  - location_data

public_data:
  - flight_schedules
  - airport_information
  - airline_metadata

2. Access ControlΒΆ

-- Ranger policies for data access
-- Grant access to specific users/roles
GRANT SELECT ON flight_radar.marts.company_with_the_most_active_flight 
TO ROLE analytics_users;

-- Restrict access to sensitive data
REVOKE SELECT ON ecommerce.customer 
FROM ROLE public_users;

3. Data RetentionΒΆ

-- Automatic data retention policies
-- Delete data older than 2 years
DELETE FROM flight_radar.intermediate.explode_complete_flight 
WHERE time < CURRENT_DATE - INTERVAL '2 years';

Monitoring and ObservabilityΒΆ

1. Data Freshness MonitoringΒΆ

# Dagster freshness checks
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def explode_complete_flight():
    """Flight data should be updated every hour"""
    pass

2. Data Quality MonitoringΒΆ

-- Monitor data quality metrics
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT id) as unique_flights,
    COUNT(CASE WHEN latitude IS NULL THEN 1 END) as missing_latitude,
    COUNT(CASE WHEN longitude IS NULL THEN 1 END) as missing_longitude
FROM {{ ref('explode_complete_flight') }}
WHERE __TIMESTAMP >= CURRENT_DATE

3. Performance MonitoringΒΆ

-- Monitor query performance
SELECT 
    query_id,
    query_text,
    execution_time,
    rows_returned,
    bytes_scanned
FROM system.runtime.queries
WHERE query_text LIKE '%explode_complete_flight%'
ORDER BY execution_time DESC
LIMIT 10

Best PracticesΒΆ

1. Naming ConventionsΒΆ

-- Table naming
-- Raw: {domain}_prepared.{table_name}
-- Staging: {domain}.intermediate.stg_{table_name}
-- Marts: {domain}.marts.{business_concept}

-- Column naming
-- Use snake_case
-- Be descriptive and consistent
-- Include units where applicable

2. DocumentationΒΆ

# schema.yml
models:
  - name: explode_complete_flight
    description: "Flattened flight data with airport and airline details"
    columns:
      - name: id
        description: "Unique flight identifier from FlightRadar24"
        tests:
          - unique
          - not_null
      - name: latitude
        description: "Current latitude position of the aircraft"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: -90
              max_value: 90

3. Testing StrategyΒΆ

-- Unit tests for transformations
-- Integration tests for data pipelines
-- Performance tests for query optimization
-- Data quality tests for validation

Future EnhancementsΒΆ

1. Real-time StreamingΒΆ

-- Kafka integration for real-time data
-- Stream processing with Apache Flink
-- Real-time analytics and alerting

2. Machine Learning IntegrationΒΆ

-- Feature engineering for ML models
-- Model training and inference
-- Automated anomaly detection

3. Advanced AnalyticsΒΆ

-- Graph analytics for flight networks
-- Time series analysis for trends
-- Spatial analytics for geographic insights

Last update: October 3, 2025
Created: October 3, 2025