Asset Property Domain¶
This document describes the Asset Property data domain, including data sources, processing pipelines, and analytical models.
Overview¶
The Asset Property domain focuses on real estate data, providing insights into property sales, market trends, and property characteristics. This domain demonstrates how to handle structured real estate data in a modern data platform.
Data Sources¶
Real Estate APIs¶
Primary Source: Real estate sales data APIs - Data Format: JSON/CSV - Update Frequency: Daily - Data Volume: ~10K records per day - Schema: Property details, sales information, location data
Data Schema¶
{
"property_id": "string",
"address": "string",
"city": "string",
"state": "string",
"zip_code": "string",
"property_type": "string",
"bedrooms": "integer",
"bathrooms": "decimal",
"square_feet": "integer",
"lot_size": "integer",
"year_built": "integer",
"sale_price": "decimal",
"sale_date": "date",
"listing_price": "decimal",
"days_on_market": "integer",
"latitude": "decimal",
"longitude": "decimal"
}
Data Pipeline¶
Bronze Layer (Raw Data)¶
Storage: MinIO datalake/bronze/asset_property/
Format: CSV, JSON
Partitioning: By date (year=YYYY/month=MM/day=DD)
@asset
def asset_property_bronze():
"""Ingest raw real estate data from APIs"""
# API data collection logic
# Schema validation
# Data quality checks
pass
Silver Layer (Prepared Data)¶
Storage: MinIO datalake/silver/asset_property/
Format: Delta Lake
Processing: Apache Spark
@asset(deps=[asset_property_bronze])
def asset_property_silver():
"""Clean and prepare real estate data"""
# Data cleaning
# Schema standardization
# Duplicate removal
# Data type conversion
pass
Gold Layer (Analytics Data)¶
Storage: MinIO warehouse/asset_property/
Format: Apache Iceberg
Processing: dbt transformations
-- Property sales table
CREATE TABLE asset_property_sales (
property_id STRING,
address STRING,
city STRING,
state STRING,
zip_code STRING,
property_type STRING,
bedrooms INT,
bathrooms DECIMAL(3,1),
square_feet INT,
lot_size INT,
year_built INT,
sale_price DECIMAL(12,2),
sale_date DATE,
listing_price DECIMAL(12,2),
days_on_market INT,
latitude DECIMAL(10,8),
longitude DECIMAL(11,8),
created_at TIMESTAMP,
updated_at TIMESTAMP
) USING ICEBERG
PARTITIONED BY (sale_date)
Analytical Models¶
Property Sales Model¶
Purpose: Core property transaction data Key Metrics: - Sale price trends - Days on market - Price per square foot - Market velocity
Market Trends Model¶
Purpose: Aggregated market analytics Key Metrics: - Average sale price by area - Price appreciation rates - Market activity levels - Seasonal trends
Property Features Model¶
Purpose: Property characteristics analysis Key Metrics: - Property type distribution - Size trends - Age analysis - Location-based insights
Data Quality¶
Validation Rules¶
# Data quality checks
def validate_property_data(df):
"""Validate property data quality"""
# Check required fields
required_fields = ['property_id', 'address', 'sale_price', 'sale_date']
for field in required_fields:
assert df[field].isnull().sum() == 0, f"Missing values in {field}"
# Check data types
assert df['sale_price'].dtype == 'decimal', "Invalid sale price format"
assert df['sale_date'].dtype == 'date', "Invalid sale date format"
# Check value ranges
assert df['sale_price'].min() > 0, "Invalid sale price range"
assert df['bedrooms'].min() >= 0, "Invalid bedroom count"
return True
Data Quality Metrics¶
- Completeness: Percentage of non-null values
- Accuracy: Data validation against source
- Consistency: Cross-field validation
- Timeliness: Data freshness tracking
Query Examples¶
Basic Queries¶
-- Top 10 most expensive properties
SELECT
address,
city,
state,
sale_price,
square_feet,
sale_price / square_feet as price_per_sqft
FROM asset_property_sales
ORDER BY sale_price DESC
LIMIT 10;
-- Average sale price by city
SELECT
city,
state,
COUNT(*) as property_count,
AVG(sale_price) as avg_sale_price,
AVG(sale_price / square_feet) as avg_price_per_sqft
FROM asset_property_sales
WHERE sale_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY city, state
ORDER BY avg_sale_price DESC;
Advanced Analytics¶
-- Market trend analysis
WITH monthly_trends AS (
SELECT
DATE_TRUNC('month', sale_date) as month,
city,
COUNT(*) as sales_count,
AVG(sale_price) as avg_price,
AVG(days_on_market) as avg_days_on_market
FROM asset_property_sales
WHERE sale_date >= CURRENT_DATE - INTERVAL '12' MONTH
GROUP BY DATE_TRUNC('month', sale_date), city
)
SELECT
month,
city,
sales_count,
avg_price,
avg_days_on_market,
LAG(avg_price) OVER (PARTITION BY city ORDER BY month) as prev_month_price,
(avg_price - LAG(avg_price) OVER (PARTITION BY city ORDER BY month)) /
LAG(avg_price) OVER (PARTITION BY city ORDER BY month) * 100 as price_change_pct
FROM monthly_trends
ORDER BY city, month;
Performance Optimization¶
Partitioning Strategy¶
-- Partition by sale date for time-based queries
PARTITIONED BY (sale_date)
-- Additional partitioning for geographic queries
PARTITIONED BY (sale_date, state, city)
Indexing¶
-- Create indexes for common query patterns
CREATE INDEX idx_property_sales_city ON asset_property_sales (city);
CREATE INDEX idx_property_sales_price ON asset_property_sales (sale_price);
CREATE INDEX idx_property_sales_date ON asset_property_sales (sale_date);
Caching¶
- Query Result Caching: Cache frequent query results
- Data Caching: Cache frequently accessed data
- Metadata Caching: Cache table metadata
Monitoring¶
Key Metrics¶
- Data Volume: Records ingested per day
- Data Quality: Quality score trends
- Processing Time: Pipeline execution duration
- Query Performance: Average query response time
Alerts¶
- Data Quality: Quality score below threshold
- Processing Delays: Pipeline execution time exceeded
- Data Volume: Unusual data volume changes
- Query Performance: Slow query detection
Business Use Cases¶
Real Estate Analytics¶
- Market Analysis: Price trends and market conditions
- Property Valuation: Comparative market analysis
- Investment Analysis: ROI and appreciation rates
- Market Forecasting: Predictive analytics
Customer Insights¶
- Buyer Behavior: Purchase patterns and preferences
- Market Segmentation: Geographic and demographic analysis
- Price Sensitivity: Market response to price changes
- Seasonal Trends: Time-based market patterns
Data Governance¶
Access Control¶
- Role-based Access: Different access levels for different users
- Data Classification: Sensitive data identification
- Audit Logging: All data access logged
- Compliance: GDPR and privacy regulations
Data Lineage¶
- Source Tracking: Data origin documentation
- Transformation Logging: All data transformations logged
- Quality Tracking: Data quality metrics over time
- Impact Analysis: Change impact assessment
Future Enhancements¶
Planned Features¶
- Real-time Data: Streaming data ingestion
- ML Models: Property valuation models
- Geographic Analysis: Spatial data analysis
- Market Predictions: Predictive analytics
Integration Opportunities¶
- External Data: Economic indicators, demographic data
- ML Pipeline: Machine learning model integration
- API Development: Real-time data access APIs
- Dashboard Enhancement: Advanced visualization features
Created: October 3, 2025