🛠️ Development Guide¶
Development Environment Setup¶
Prerequisites¶
Required Software¶
- Python: 3.12+ (recommended: 3.12.0)
- Docker: 20.10+
- Docker Compose: 2.0+
- Git: Latest version
- UV: Python package manager (recommended)
- Node.js: 18+ (for dbt development)
Optional Tools¶
- VS Code: With Python, Docker, and SQL extensions
- PyCharm: Professional or Community edition
- DBeaver: Database management tool
- Postman: API testing
Local Development Setup¶
1. Clone Repository¶
2. Python Environment Setup¶
# Install UV (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create virtual environment
uv venv
# Activate virtual environment
source .venv/bin/activate # Linux/macOS
# or
.venv\Scripts\activate # Windows
# Install dependencies
uv pip install -e .
uv pip install -e ".[dev]"
3. Docker Environment¶
# Start development services
docker-compose up -d
# Check service status
docker-compose ps
# View logs
docker-compose logs -f
4. Development Tools Setup¶
# Install pre-commit hooks
pre-commit install
# Run linting
pylint orchestration/ingestion/src/
# Run tests
pytest orchestration/ingestion/tests/
Project Structure¶
iceberg_data_engineering/
├── docs/ # Documentation
│ ├── ARCHITECTURE.md
│ ├── DEPLOYMENT.md
│ ├── API_DOCUMENTATION.md
│ ├── DATA_MODELS.md
│ └── DEVELOPMENT.md
├── orchestration/ # Main application code
│ ├── ingestion/ # Data ingestion modules
│ │ ├── src/ # Source code
│ │ │ ├── flight_radar/ # Flight radar data collection
│ │ │ ├── asset_property/ # Real estate data collection
│ │ │ ├── user_product_mock/ # E-commerce mock data
│ │ │ └── lakehouse_libs/ # Shared libraries
│ │ └── tests/ # Test code
│ ├── transformations/ # dbt transformations
│ │ ├── models/ # dbt models
│ │ ├── tests/ # dbt tests
│ │ └── macros/ # dbt macros
│ ├── dagster_configurations/ # Dagster workflows
│ └── pyproject.toml # Python dependencies
├── docker-compose.yml # Docker services
├── pytest.ini # Test configuration
└── README.md # Project overview
Development Workflow¶
1. Feature Development¶
Branch Strategy¶
# Create feature branch
git checkout -b feature/new-data-source
# Make changes
# ... development work ...
# Commit changes
git add .
git commit -m "feat: add new data source integration"
# Push branch
git push origin feature/new-data-source
# Create pull request
Code Standards¶
# Follow PEP 8 style guide
# Use type hints
def process_data(data: List[Dict[str, Any]]) -> Iterator[ProcessedRecord]:
"""Process raw data and return cleaned records."""
pass
# Use docstrings
class DataProcessor:
"""
Processes raw data from various sources.
Attributes:
source: Data source identifier
validator: Data validation instance
"""
pass
2. Testing Strategy¶
Unit Tests¶
# tests/test_data_processor.py
import pytest
from unittest.mock import Mock, patch
from ingestion.src.lakehouse_libs.processor import DataProcessor
class TestDataProcessor:
def test_process_valid_data(self):
"""Test processing valid data."""
processor = DataProcessor()
data = [{"id": "1", "value": "test"}]
result = list(processor.process(data))
assert len(result) == 1
assert result[0]["id"] == "1"
def test_process_invalid_data(self):
"""Test handling invalid data."""
processor = DataProcessor()
data = [{"id": None, "value": "test"}]
with pytest.raises(ValidationError):
list(processor.process(data))
Integration Tests¶
# tests/integration/test_api_integration.py
import pytest
from ingestion.src.flight_radar.requests_api import RequestFlightRadarData
class TestFlightRadarIntegration:
@pytest.fixture
def mock_api(self):
"""Mock FlightRadar24 API."""
with patch('FlightRadar24.api.FlightRadar24API') as mock:
yield mock
def test_fetch_flight_data(self, mock_api):
"""Test fetching flight data from API."""
mock_api.get_flights.return_value = [mock_flight]
collector = RequestFlightRadarData(mock_api)
result = list(collector.fetch_data())
assert len(result) > 0
dbt Tests¶
-- tests/assert_flight_data_quality.sql
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE latitude < -90 OR latitude > 90
OR longitude < -180 OR longitude > 180
OR altitude < 0 OR altitude > 50000
3. Data Pipeline Development¶
Adding New Data Source¶
1. Create Data Collector¶
# ingestion/src/new_domain/collector.py
from typing import Iterator, Dict, Any
from lakehouse_libs.ingestor.common_ingestor import CommonIngestor
class NewDomainCollector(CommonIngestor):
"""Collects data from new domain API."""
def fetch_data(self) -> Iterator[Dict[str, Any]]:
"""Fetch data from new domain API."""
# Implementation
pass
def upload_to_bronze(self, data: List[Dict], format_type: str = "csv"):
"""Upload data to MinIO Bronze layer."""
uploader = UploaderFactory.create_uploader(format_type)
uploader.upload(data, "datalake", f"bronze/new_domain/entity/")
2. Create Data Models¶
# ingestion/src/new_domain/models.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class NewDomainModel(BaseModel):
"""Data model for new domain."""
id: str
name: str
value: float
created_at: datetime = Field(default_factory=datetime.now)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "NewDomainModel":
"""Create model from dictionary."""
return cls(**data)
3. Create Preparation Configuration¶
# dagster_configurations/configurations/loads/new_domain/entity.yml
configurations:
catalog:
host: "hive-metastore"
port: "9083"
user: "hiveuser"
full_name: "new_domain.prepared.entity"
full_name_idx: "new_domain.index.entity"
catalog_table_path: "prepare/new_domain/entity"
storage:
minio_endpoint: "minio:9000"
minio_access_key: "minioadmin"
minio_secret_key: "minioadmin"
bucket_name: "datalake"
incremental_conditions:
query:
- condition: "start"
source_column: "id"
operator: "=="
target_column: "id"
read_parameter:
format: "csv" # or "json", "parquet"
schema:
- name: "id"
type_name: "STRING"
description: "Unique identifier"
nullable: false
- name: "name"
type_name: "STRING"
description: "Entity name"
nullable: true
4. Create Dagster Asset¶
# dagster_configurations/new_domain_pipeline.py
from dagster import asset, AssetExecutionContext
from ingestion.src.new_domain.collector import NewDomainCollector
@asset(group_name="new_domain")
def new_domain_raw_data(context: AssetExecutionContext):
"""Collect raw data from new domain."""
collector = NewDomainCollector()
data = list(collector.fetch_data())
# Upload to MinIO
uploader = MinIOUploader()
uploader.upload_json(data, "datalake", "new_domain/raw_data.json")
return len(data)
4. Create dbt Models¶
-- transformations/models/new_domain/sources.yml
version: 2
sources:
- name: new_domain_prepared
database: spark_catalog
schema: new_domain_prepared
tables:
- name: raw_data
-- transformations/models/new_domain/intermediate/stg_new_domain_data.sql
{{
config(
materialized='table',
alias='stg_new_domain_data'
)
}}
SELECT
id,
name,
value,
created_at,
__TIMESTAMP
FROM {{ source('new_domain_prepared', 'raw_data') }}
4. Database Development¶
Trino Development¶
-- Connect to Trino
trino --server http://localhost:8080
-- Create schema
CREATE SCHEMA IF NOT EXISTS iceberg.new_domain;
-- Create table
CREATE TABLE iceberg.new_domain.raw_data (
id VARCHAR,
name VARCHAR,
value DOUBLE,
created_at TIMESTAMP
) WITH (
location = 's3a://datalake/new_domain/raw_data'
);
dbt Development¶
# Install dbt
pip install dbt-trino
# Initialize dbt project
dbt init new_project
# Run models
dbt run --models new_domain
# Test models
dbt test --models new_domain
# Generate documentation
dbt docs generate
dbt docs serve
Code Quality¶
1. Linting and Formatting¶
Python Code¶
# Run pylint
pylint orchestration/ingestion/src/
# Run black formatting
black orchestration/ingestion/src/
# Run isort imports
isort orchestration/ingestion/src/
SQL Code¶
2. Pre-commit Hooks¶
# .pre-commit-config.yaml
repos:
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
language_version: python3.12
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/pycqa/pylint
rev: v2.17.4
hooks:
- id: pylint
args: [--rcfile=.pylintrc]
- repo: https://github.com/sqlfluff/sqlfluff
rev: 2.1.2
hooks:
- id: sqlfluff-lint
args: [--dialect=trino]
- id: sqlfluff-fix
args: [--dialect=trino]
3. Testing Standards¶
# Test coverage requirements
# Minimum 80% code coverage
# All new code must have tests
# Integration tests for API endpoints
# Unit tests for business logic
# Run tests with coverage
pytest --cov=orchestration/ingestion/src --cov-report=html
Debugging and Troubleshooting¶
1. Local Debugging¶
Python Debugging¶
# Use debugger
import pdb; pdb.set_trace()
# Or use IDE debugger
# Set breakpoints in VS Code/PyCharm
Docker Debugging¶
# Debug container
docker exec -it <container_name> /bin/bash
# View container logs
docker logs <container_name>
# Inspect container
docker inspect <container_name>
2. Common Issues¶
Service Connection Issues¶
# Check service health
curl -f http://localhost:8080/v1/info # Trino
curl -f http://localhost:3030/health # Dagster
# Check network connectivity
docker network ls
docker network inspect iceberg_data_engineering_default
Data Quality Issues¶
-- Check data quality
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT id) as unique_ids,
COUNT(CASE WHEN latitude IS NULL THEN 1 END) as null_latitudes
FROM iceberg.flight_radar.explode_complete_flight;
Performance Issues¶
-- Check query performance
EXPLAIN (FORMAT JSON)
SELECT * FROM iceberg.flight_radar.explode_complete_flight
WHERE airline_iata = 'AA';
3. Logging and Monitoring¶
Application Logging¶
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Use in code
logger.info("Processing data batch")
logger.error("Failed to process record", exc_info=True)
Dagster Monitoring¶
# Dagster asset monitoring
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def flight_data(context: AssetExecutionContext):
"""Flight data should be updated every hour."""
pass
Performance Optimization¶
1. Query Optimization¶
Trino Query Optimization¶
-- Use appropriate data types
-- Optimize WHERE clauses
-- Use LIMIT for testing
-- Avoid SELECT *
-- Good query
SELECT airline_iata, COUNT(*) as flight_count
FROM iceberg.flight_radar.explode_complete_flight
WHERE time >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY airline_iata
ORDER BY flight_count DESC
LIMIT 10;
Spark Optimization¶
# Optimize Spark operations
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# Use appropriate partitioning
df.write \
.partitionBy("airline_iata") \
.mode("overwrite") \
.saveAsTable("iceberg.flight_radar.partitioned_flights")
2. Data Processing Optimization¶
Batch Processing¶
# Process data in batches
def process_data_in_batches(data: List[Dict], batch_size: int = 1000):
"""Process data in batches for better performance."""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
yield process_batch(batch)
Memory Management¶
# Use generators for large datasets
def process_large_dataset(file_path: str):
"""Process large dataset without loading into memory."""
with open(file_path, 'r') as file:
for line in file:
yield process_line(line)
Deployment and CI/CD¶
1. Local Testing¶
# Test Docker services
docker-compose up -d
docker-compose exec dagster dagster-daemon run
# Test data pipeline
python -m orchestration.ingestion.src.flight_radar
# Test dbt models
dbt run --models flight_radar
2. Production Deployment¶
# Build production images
docker-compose -f docker-compose.prod.yml build
# Deploy to production
docker-compose -f docker-compose.prod.yml up -d
# Run health checks
./scripts/health-check.sh
3. CI/CD Pipeline¶
# .github/workflows/ci.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install uv
uv pip install -e ".[dev]"
- name: Run tests
run: pytest
- name: Run linting
run: pylint orchestration/ingestion/src/
Contributing Guidelines¶
1. Pull Request Process¶
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Ensure all tests pass
- Update documentation
- Submit a pull request
2. Code Review Process¶
- All code must be reviewed
- At least one approval required
- Address all review comments
- Ensure CI/CD pipeline passes
- Update documentation if needed
3. Documentation Requirements¶
- Update README.md for new features
- Add API documentation for new endpoints
- Update data models documentation
- Include examples and use cases
Resources and References¶
1. Documentation Links¶
- Apache Iceberg Documentation
- Apache Spark Documentation
- Trino Documentation
- dbt Documentation
- Dagster Documentation
2. Best Practices¶
3. Community Resources¶
Last update:
October 3, 2025
Created: October 3, 2025
Created: October 3, 2025