π Data Models DocumentationΒΆ
OverviewΒΆ
The Iceberg Data Engineering Platform implements a comprehensive data modeling strategy using Apache Iceberg table format with dbt transformations. The system follows a layered architecture with raw data ingestion, staging transformations, and analytical marts.
Data ArchitectureΒΆ
graph TD
A[Raw Data Sources] --> B[MinIO Bronze Storage]
B --> C[Spark Processing]
C --> D[MinIO Prepare Storage]
D --> E[Delta Tables]
E --> F[dbt Transformations]
F --> G[Iceberg Tables in Warehouse]
subgraph "Data Layers"
H[Bronze Layer: CSV/JSON/Parquet]
I[Prepare Layer: Delta Tables]
J[Analytics Layer: Iceberg Tables]
end
subgraph "Storage Buckets"
K[datalake/bronze: Raw Data]
L[datalake/prepare: Delta Tables]
M[warehouse: Iceberg Tables]
end
subgraph "Trino Catalogs"
N[spark_catalog: Preparation Tasks]
O[flight_radar: Flight Analytics]
P[ecommerce: E-commerce Analytics]
Q[asset_property: Property Analytics]
end
B --> H
D --> I
E --> I
F --> J
G --> M
B --> K
D --> L
G --> M
E --> N
G --> O
G --> P
G --> Q
Storage ArchitectureΒΆ
MinIO Bucket StructureΒΆ
The platform uses three main MinIO buckets for different data layers:
1. Bronze Layer (datalake/bronze)ΒΆ
- Purpose: Raw data storage from external APIs
- Formats: CSV, JSON, Parquet files
- Structure:
datalake/bronze/{domain}/{entity}/ - Examples:
datalake/bronze/flight_radar/airlines/datalake/bronze/ecommerce/products/datalake/bronze/asset_property/sales/
2. Prepare Layer (datalake/prepare)ΒΆ
- Purpose: Processed Delta tables from Spark preparation tasks
- Formats: Delta Lake tables
- Structure:
datalake/prepare/{domain}_prepared/{entity}/ - Examples:
datalake/prepare/flight_radar_prepared/airline/datalake/prepare/ecommerce_prepared/product/datalake/prepare/asset_property_prepared/sale/
3. Warehouse Layer (warehouse)ΒΆ
- Purpose: Final Iceberg tables generated by dbt transformations
- Formats: Apache Iceberg tables
- Structure:
warehouse/{domain}/{schema}/{table}/ - Examples:
warehouse/flight_radar/marts/company_with_the_most_active_flight/warehouse/ecommerce/marts/customer_analytics/warehouse/asset_property/marts/property_market_trends/
Trino Catalog StructureΒΆ
The platform uses multiple Trino catalogs for different purposes:
1. Spark Catalog (spark_catalog)ΒΆ
- Purpose: Access to Delta tables from preparation tasks
- Connector: Delta Lake connector
- Tables: All prepared Delta tables
- Usage: Spark preparation operations and initial data access
2. Domain-Specific CatalogsΒΆ
Each business domain has its own Iceberg catalog:
Flight Radar Catalog (flight_radar)ΒΆ
- Purpose: Flight analytics and reporting
- Connector: Iceberg connector
- Schemas:
prepared,intermediate,marts - Tables: All flight-related analytical models
E-commerce Catalog (ecommerce)ΒΆ
- Purpose: E-commerce analytics and reporting
- Connector: Iceberg connector
- Schemas:
prepared,intermediate,marts - Tables: All e-commerce analytical models
Asset Property Catalog (asset_property)ΒΆ
- Purpose: Real estate analytics and reporting
- Connector: Iceberg connector
- Schemas:
prepared,intermediate,marts - Tables: All property-related analytical models
Data DomainsΒΆ
1. Flight Radar DomainΒΆ
Bronze Layer (Raw Data)ΒΆ
Located in datalake/bronze/flight_radar/:
- airlines/: Raw airline data from FlightRadar24 API (JSON/CSV)
- airports/: Raw airport data from FlightRadar24 API (JSON/CSV)
- aircraft/: Raw aircraft data from FlightRadar24 API (JSON/CSV)
- flights/: Raw flight data from FlightRadar24 API (JSON/CSV)
Prepare Layer (Delta Tables)ΒΆ
Located in datalake/prepare/flight_radar_prepared/:
- airline: Processed airline information and metadata
- airport: Processed airport details and location data
- aircraft: Processed aircraft specifications and models
- complete_flight: Processed flight records with all details
- missing_flight: Processed flight records with missing or incomplete data
Staging Layer (Intermediate)ΒΆ
explode_complete_flightΒΆ
-- Materialized as incremental table
-- Partitioned by: on_ground
-- Unique key: ['id', 'time']
-- Location: s3a://datalake/intermediate/flight_radar/explode_complete_flight
SELECT
ID,
ALTITUDE,
LATITUDE,
LONGITUDE,
HEADING,
ICAO_24BIT,
VERTICAL_SPEED,
GROUND_SPEED,
NUMBER,
SQUAWK,
CALLSIGN,
REGISTRATION,
ON_GROUND,
TIME,
-- Departure airport details
DEPARTURE.IATA AS DEPARTURE_IATA,
DEPARTURE.LATITUDE AS DEPARTURE_LATITUDE,
DEPARTURE.LONGITUDE AS DEPARTURE_LONGITUDE,
DEPARTURE.NAME AS DEPARTURE_NAME,
DEPARTURE.COUNTRY_NAME AS DEPARTURE_COUNTRY_NAME,
-- Arrival airport details
ARRIVAL.IATA AS ARRIVAL_IATA,
ARRIVAL.LATITUDE AS ARRIVAL_LATITUDE,
ARRIVAL.LONGITUDE AS ARRIVAL_LONGITUDE,
ARRIVAL.NAME AS ARRIVAL_NAME,
ARRIVAL.COUNTRY_NAME AS ARRIVAL_COUNTRY_NAME,
-- Airline details
AIRLINE.IATA AS AIRLINE_IATA,
AIRLINE.NAME AS AIRLINE_NAME,
AIRLINE.ICAO AS AIRLINE_ICAO,
-- Aircraft details
AIRCRAFT.CODE AS AIRCRAFT_CODE,
MODEL_MISSING_DATA,
__TIMESTAMP,
-- Derived fields
CASE
WHEN AIRCRAFT.MODEL IS NOT NULL THEN split_part(AIRCRAFT.MODEL, ' ', 1)
ELSE NULL
END AS AIRCRAFT_MANUFACTURER
FROM {{ source('flight_radar_prepared', 'complete_flight') }}
explode_and_enrich_missing_flightΒΆ
-- Handles flight records with missing data
-- Enriches missing flight information where possible
-- Materialized as incremental table
full_flight_obtΒΆ
-- One Big Table (OBT) combining complete and missing flights
-- Provides unified view of all flight data
-- Materialized as table
Mart Layer (Analytics)ΒΆ
Located in warehouse/flight_radar/marts/:
company_with_the_most_active_flightΒΆ
-- Top 10 airlines by flight count
-- Materialized as Iceberg table
-- Location: warehouse/flight_radar/marts/company_with_the_most_active_flight
-- Catalog: flight_radar
-- Schema: marts
{{ config(
materialized='table',
alias='company_with_the_most_active_flight',
tags=['company_with_the_most_active_flight'],
database='flight_radar',
schema='marts',
location_root='s3a://warehouse/flight_radar/marts/company_with_the_most_active_flight'
) }}
SELECT
AIRLINE_NAME,
AIRLINE_IATA,
COUNT(*) AS TOTAL_FLIGHTS
FROM {{ ref('full_flight_obt') }}
WHERE AIRLINE_NAME IS NOT NULL
GROUP BY AIRLINE_NAME, AIRLINE_IATA
ORDER BY TOTAL_FLIGHTS DESC
LIMIT 10
airport_with_the_largest_traffic_imbalanceΒΆ
-- Identifies airports with significant departure/arrival imbalances
-- Useful for capacity planning and route optimization
companies_with_the_most_regional_flightΒΆ
company_builder_with_the_most_active_flightΒΆ
longest_active_flightΒΆ
longest_flight_in_each_continentΒΆ
2. E-commerce DomainΒΆ
Source TablesΒΆ
Located in spark_catalog.ecommerce_prepared schema:
- product: Product catalog and specifications
- customer: Customer information and demographics
Staging LayerΒΆ
customer_order (Intermediate)ΒΆ
-- Combines customer and order data
-- Provides unified customer transaction view
-- Materialized as table
Mart LayerΒΆ
Located in warehouse/ecommerce/marts/:
customer_analyticsΒΆ
-- Customer behavior and purchasing patterns
-- Segmentation and lifetime value analysis
-- Materialized as Iceberg table
-- Location: warehouse/ecommerce/marts/customer_analytics
-- Catalog: ecommerce
-- Schema: marts
{{ config(
materialized='table',
alias='customer_analytics',
tags=['customer_analytics'],
database='ecommerce',
schema='marts',
location_root='s3a://warehouse/ecommerce/marts/customer_analytics'
) }}
SELECT
customer_id,
customer_name,
COUNT(*) as total_orders,
SUM(order_amount) as total_spent,
AVG(order_amount) as avg_order_value,
MAX(order_date) as last_order_date
FROM {{ ref('customer_order') }}
GROUP BY customer_id, customer_name
ORDER BY total_spent DESC
3. Asset Property DomainΒΆ
Source TablesΒΆ
Located in spark_catalog.asset_property_prepared schema:
- real_estate_sales: Property sales transactions and details
Staging LayerΒΆ
stg_real_estate_salesΒΆ
-- Standardized real estate sales data
-- Cleansed and validated property information
-- Materialized as table
Mart LayerΒΆ
Located in warehouse/asset_property/marts/:
property_market_trendsΒΆ
-- Market analysis and trend identification
-- Price analysis and forecasting
-- Materialized as Iceberg table
-- Location: warehouse/asset_property/marts/property_market_trends
-- Catalog: asset_property
-- Schema: marts
{{ config(
materialized='table',
alias='property_market_trends',
tags=['property_market_trends'],
database='asset_property',
schema='marts',
location_root='s3a://warehouse/asset_property/marts/property_market_trends'
) }}
SELECT
property_type,
location,
AVG(price) as avg_price,
COUNT(*) as property_count,
MIN(price) as min_price,
MAX(price) as max_price,
STDDEV(price) as price_volatility
FROM {{ ref('stg_real_estate_sales') }}
GROUP BY property_type, location
ORDER BY avg_price DESC
Data Model PatternsΒΆ
1. Incremental ProcessingΒΆ
-- Incremental model configuration
{{
config(
materialized='incremental',
unique_key=['id', 'time'],
partition_by=['on_ground']
)
}}
-- Incremental logic
{% if is_incremental() %}
WHERE __TIMESTAMP > (SELECT MAX(__TIMESTAMP) FROM {{ this }})
{% endif %}
2. Partitioning StrategyΒΆ
-- Partition by business-relevant columns
partition_by=['on_ground'] -- Flight status
partition_by=['sale_date'] -- Time-based partitioning
partition_by=['customer_segment'] -- Business logic partitioning
3. Data Quality ChecksΒΆ
-- Built-in dbt tests
-- models/schema.yml
models:
- name: explode_complete_flight
columns:
- name: id
tests:
- unique
- not_null
- name: latitude
tests:
- not_null
- dbt_utils.accepted_range:
min_value: -90
max_value: 90
Schema EvolutionΒΆ
Apache Iceberg Schema EvolutionΒΆ
-- Add new column
ALTER TABLE flight_radar.intermediate.explode_complete_flight
ADD COLUMN new_field STRING;
-- Rename column
ALTER TABLE flight_radar.intermediate.explode_complete_flight
RENAME COLUMN old_field TO new_field;
-- Drop column
ALTER TABLE flight_radar.intermediate.explode_complete_flight
DROP COLUMN unused_field;
dbt Schema EvolutionΒΆ
# schema.yml
models:
- name: explode_complete_flight
columns:
- name: id
description: "Unique flight identifier"
tests:
- unique
- not_null
- name: new_field
description: "New field added in schema evolution"
tests:
- not_null
Data LineageΒΆ
dbt LineageΒΆ
graph TD
A[flight_radar_prepared.complete_flight] --> B[explode_complete_flight]
A --> C[explode_and_enrich_missing_flight]
B --> D[full_flight_obt]
C --> D
D --> E[company_with_the_most_active_flight]
D --> F[airport_with_the_largest_traffic_imbalance]
D --> G[companies_with_the_most_regional_flight]
Asset DependenciesΒΆ
# Dagster asset dependencies
@asset(deps=["explode_complete_flight", "explode_and_enrich_missing_flight"])
def full_flight_obt():
"""Combines complete and missing flight data"""
pass
@asset(deps=["full_flight_obt"])
def company_with_the_most_active_flight():
"""Top airlines by flight count"""
pass
Data Quality FrameworkΒΆ
1. Schema ValidationΒΆ
# Pydantic models for data validation
class FlightModel(BaseModel):
id: str
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
altitude: int = Field(ge=0, le=50000)
ground_speed: int = Field(ge=0, le=1000)
2. Data Quality TestsΒΆ
-- dbt tests
-- tests/assert_flight_data_quality.sql
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE latitude < -90 OR latitude > 90
OR longitude < -180 OR longitude > 180
OR altitude < 0 OR altitude > 50000
3. Anomaly DetectionΒΆ
-- Statistical anomaly detection
WITH flight_stats AS (
SELECT
AVG(ground_speed) as avg_speed,
STDDEV(ground_speed) as speed_stddev
FROM {{ ref('explode_complete_flight') }}
)
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE ground_speed > (
SELECT avg_speed + 3 * speed_stddev
FROM flight_stats
)
Performance OptimizationΒΆ
1. Partitioning StrategyΒΆ
-- Time-based partitioning for temporal data
partition_by=['time']
-- Business logic partitioning
partition_by=['on_ground', 'airline_iata']
-- Geographic partitioning
partition_by=['departure_country_name']
2. ClusteringΒΆ
3. IndexingΒΆ
-- Create indexes for common query patterns
CREATE INDEX idx_flight_time ON explode_complete_flight (time);
CREATE INDEX idx_flight_airline ON explode_complete_flight (airline_iata);
Data GovernanceΒΆ
1. Data ClassificationΒΆ
# Data sensitivity levels
sensitive_data:
- customer_personal_info
- financial_transactions
- location_data
public_data:
- flight_schedules
- airport_information
- airline_metadata
2. Access ControlΒΆ
-- Ranger policies for data access
-- Grant access to specific users/roles
GRANT SELECT ON flight_radar.marts.company_with_the_most_active_flight
TO ROLE analytics_users;
-- Restrict access to sensitive data
REVOKE SELECT ON ecommerce.customer
FROM ROLE public_users;
3. Data RetentionΒΆ
-- Automatic data retention policies
-- Delete data older than 2 years
DELETE FROM flight_radar.intermediate.explode_complete_flight
WHERE time < CURRENT_DATE - INTERVAL '2 years';
Monitoring and ObservabilityΒΆ
1. Data Freshness MonitoringΒΆ
# Dagster freshness checks
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def explode_complete_flight():
"""Flight data should be updated every hour"""
pass
2. Data Quality MonitoringΒΆ
-- Monitor data quality metrics
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT id) as unique_flights,
COUNT(CASE WHEN latitude IS NULL THEN 1 END) as missing_latitude,
COUNT(CASE WHEN longitude IS NULL THEN 1 END) as missing_longitude
FROM {{ ref('explode_complete_flight') }}
WHERE __TIMESTAMP >= CURRENT_DATE
3. Performance MonitoringΒΆ
-- Monitor query performance
SELECT
query_id,
query_text,
execution_time,
rows_returned,
bytes_scanned
FROM system.runtime.queries
WHERE query_text LIKE '%explode_complete_flight%'
ORDER BY execution_time DESC
LIMIT 10
Best PracticesΒΆ
1. Naming ConventionsΒΆ
-- Table naming
-- Raw: {domain}_prepared.{table_name}
-- Staging: {domain}.intermediate.stg_{table_name}
-- Marts: {domain}.marts.{business_concept}
-- Column naming
-- Use snake_case
-- Be descriptive and consistent
-- Include units where applicable
2. DocumentationΒΆ
# schema.yml
models:
- name: explode_complete_flight
description: "Flattened flight data with airport and airline details"
columns:
- name: id
description: "Unique flight identifier from FlightRadar24"
tests:
- unique
- not_null
- name: latitude
description: "Current latitude position of the aircraft"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: -90
max_value: 90
3. Testing StrategyΒΆ
-- Unit tests for transformations
-- Integration tests for data pipelines
-- Performance tests for query optimization
-- Data quality tests for validation
Future EnhancementsΒΆ
1. Real-time StreamingΒΆ
-- Kafka integration for real-time data
-- Stream processing with Apache Flink
-- Real-time analytics and alerting
2. Machine Learning IntegrationΒΆ
3. Advanced AnalyticsΒΆ
-- Graph analytics for flight networks
-- Time series analysis for trends
-- Spatial analytics for geographic insights
Created: October 3, 2025