Skip to content

🛠️ Development Guide

Development Environment Setup

Prerequisites

Required Software

  • Python: 3.12+ (recommended: 3.12.0)
  • Docker: 20.10+
  • Docker Compose: 2.0+
  • Git: Latest version
  • UV: Python package manager (recommended)
  • Node.js: 18+ (for dbt development)

Optional Tools

  • VS Code: With Python, Docker, and SQL extensions
  • PyCharm: Professional or Community edition
  • DBeaver: Database management tool
  • Postman: API testing

Local Development Setup

1. Clone Repository

git clone <repository-url>
cd iceberg_data_engineering

2. Python Environment Setup

# Install UV (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create virtual environment
uv venv

# Activate virtual environment
source .venv/bin/activate  # Linux/macOS
# or
.venv\Scripts\activate     # Windows

# Install dependencies
uv pip install -e .
uv pip install -e ".[dev]"

3. Docker Environment

# Start development services
docker-compose up -d

# Check service status
docker-compose ps

# View logs
docker-compose logs -f

4. Development Tools Setup

# Install pre-commit hooks
pre-commit install

# Run linting
pylint orchestration/ingestion/src/

# Run tests
pytest orchestration/ingestion/tests/

Project Structure

iceberg_data_engineering/
├── docs/                           # Documentation
│   ├── ARCHITECTURE.md
│   ├── DEPLOYMENT.md
│   ├── API_DOCUMENTATION.md
│   ├── DATA_MODELS.md
│   └── DEVELOPMENT.md
├── orchestration/                  # Main application code
│   ├── ingestion/                  # Data ingestion modules
│   │   ├── src/                   # Source code
│   │   │   ├── flight_radar/     # Flight radar data collection
│   │   │   ├── asset_property/    # Real estate data collection
│   │   │   ├── user_product_mock/ # E-commerce mock data
│   │   │   └── lakehouse_libs/    # Shared libraries
│   │   └── tests/                 # Test code
│   ├── transformations/          # dbt transformations
│   │   ├── models/               # dbt models
│   │   ├── tests/                # dbt tests
│   │   └── macros/               # dbt macros
│   ├── dagster_configurations/    # Dagster workflows
│   └── pyproject.toml            # Python dependencies
├── docker-compose.yml            # Docker services
├── pytest.ini                   # Test configuration
└── README.md                    # Project overview

Development Workflow

1. Feature Development

Branch Strategy

# Create feature branch
git checkout -b feature/new-data-source

# Make changes
# ... development work ...

# Commit changes
git add .
git commit -m "feat: add new data source integration"

# Push branch
git push origin feature/new-data-source

# Create pull request

Code Standards

# Follow PEP 8 style guide
# Use type hints
def process_data(data: List[Dict[str, Any]]) -> Iterator[ProcessedRecord]:
    """Process raw data and return cleaned records."""
    pass

# Use docstrings
class DataProcessor:
    """
    Processes raw data from various sources.

    Attributes:
        source: Data source identifier
        validator: Data validation instance
    """
    pass

2. Testing Strategy

Unit Tests

# tests/test_data_processor.py
import pytest
from unittest.mock import Mock, patch
from ingestion.src.lakehouse_libs.processor import DataProcessor

class TestDataProcessor:
    def test_process_valid_data(self):
        """Test processing valid data."""
        processor = DataProcessor()
        data = [{"id": "1", "value": "test"}]
        result = list(processor.process(data))
        assert len(result) == 1
        assert result[0]["id"] == "1"

    def test_process_invalid_data(self):
        """Test handling invalid data."""
        processor = DataProcessor()
        data = [{"id": None, "value": "test"}]
        with pytest.raises(ValidationError):
            list(processor.process(data))

Integration Tests

# tests/integration/test_api_integration.py
import pytest
from ingestion.src.flight_radar.requests_api import RequestFlightRadarData

class TestFlightRadarIntegration:
    @pytest.fixture
    def mock_api(self):
        """Mock FlightRadar24 API."""
        with patch('FlightRadar24.api.FlightRadar24API') as mock:
            yield mock

    def test_fetch_flight_data(self, mock_api):
        """Test fetching flight data from API."""
        mock_api.get_flights.return_value = [mock_flight]
        collector = RequestFlightRadarData(mock_api)
        result = list(collector.fetch_data())
        assert len(result) > 0

dbt Tests

-- tests/assert_flight_data_quality.sql
SELECT *
FROM {{ ref('explode_complete_flight') }}
WHERE latitude < -90 OR latitude > 90
   OR longitude < -180 OR longitude > 180
   OR altitude < 0 OR altitude > 50000

3. Data Pipeline Development

Adding New Data Source

1. Create Data Collector
# ingestion/src/new_domain/collector.py
from typing import Iterator, Dict, Any
from lakehouse_libs.ingestor.common_ingestor import CommonIngestor

class NewDomainCollector(CommonIngestor):
    """Collects data from new domain API."""

    def fetch_data(self) -> Iterator[Dict[str, Any]]:
        """Fetch data from new domain API."""
        # Implementation
        pass

    def upload_to_bronze(self, data: List[Dict], format_type: str = "csv"):
        """Upload data to MinIO Bronze layer."""
        uploader = UploaderFactory.create_uploader(format_type)
        uploader.upload(data, "datalake", f"bronze/new_domain/entity/")
2. Create Data Models
# ingestion/src/new_domain/models.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime

class NewDomainModel(BaseModel):
    """Data model for new domain."""
    id: str
    name: str
    value: float
    created_at: datetime = Field(default_factory=datetime.now)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "NewDomainModel":
        """Create model from dictionary."""
        return cls(**data)
3. Create Preparation Configuration
# dagster_configurations/configurations/loads/new_domain/entity.yml
configurations:
  catalog:
    host: "hive-metastore"
    port: "9083"
    user: "hiveuser"
    full_name: "new_domain.prepared.entity"
    full_name_idx: "new_domain.index.entity"
    catalog_table_path: "prepare/new_domain/entity"
  storage:
    minio_endpoint: "minio:9000"
    minio_access_key: "minioadmin"
    minio_secret_key: "minioadmin"
    bucket_name: "datalake"
  incremental_conditions:
    query:
      - condition: "start"
        source_column: "id"
        operator: "=="
        target_column: "id"
  read_parameter:
    format: "csv"  # or "json", "parquet"
  schema:
    - name: "id"
      type_name: "STRING"
      description: "Unique identifier"
      nullable: false
    - name: "name"
      type_name: "STRING"
      description: "Entity name"
      nullable: true
4. Create Dagster Asset
# dagster_configurations/new_domain_pipeline.py
from dagster import asset, AssetExecutionContext
from ingestion.src.new_domain.collector import NewDomainCollector

@asset(group_name="new_domain")
def new_domain_raw_data(context: AssetExecutionContext):
    """Collect raw data from new domain."""
    collector = NewDomainCollector()
    data = list(collector.fetch_data())

    # Upload to MinIO
    uploader = MinIOUploader()
    uploader.upload_json(data, "datalake", "new_domain/raw_data.json")

    return len(data)
4. Create dbt Models
-- transformations/models/new_domain/sources.yml
version: 2

sources:
  - name: new_domain_prepared
    database: spark_catalog
    schema: new_domain_prepared
    tables:
      - name: raw_data
-- transformations/models/new_domain/intermediate/stg_new_domain_data.sql
{{
  config(
    materialized='table',
    alias='stg_new_domain_data'
  )
}}

SELECT
    id,
    name,
    value,
    created_at,
    __TIMESTAMP
FROM {{ source('new_domain_prepared', 'raw_data') }}

4. Database Development

Trino Development

-- Connect to Trino
trino --server http://localhost:8080

-- Create schema
CREATE SCHEMA IF NOT EXISTS iceberg.new_domain;

-- Create table
CREATE TABLE iceberg.new_domain.raw_data (
    id VARCHAR,
    name VARCHAR,
    value DOUBLE,
    created_at TIMESTAMP
) WITH (
    location = 's3a://datalake/new_domain/raw_data'
);

dbt Development

# Install dbt
pip install dbt-trino

# Initialize dbt project
dbt init new_project

# Run models
dbt run --models new_domain

# Test models
dbt test --models new_domain

# Generate documentation
dbt docs generate
dbt docs serve

Code Quality

1. Linting and Formatting

Python Code

# Run pylint
pylint orchestration/ingestion/src/

# Run black formatting
black orchestration/ingestion/src/

# Run isort imports
isort orchestration/ingestion/src/

SQL Code

# Run sqlfluff
sqlfluff lint transformations/models/
sqlfluff fix transformations/models/

2. Pre-commit Hooks

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/psf/black
    rev: 23.3.0
    hooks:
      - id: black
        language_version: python3.12

  - repo: https://github.com/pycqa/isort
    rev: 5.12.0
    hooks:
      - id: isort

  - repo: https://github.com/pycqa/pylint
    rev: v2.17.4
    hooks:
      - id: pylint
        args: [--rcfile=.pylintrc]

  - repo: https://github.com/sqlfluff/sqlfluff
    rev: 2.1.2
    hooks:
      - id: sqlfluff-lint
        args: [--dialect=trino]
      - id: sqlfluff-fix
        args: [--dialect=trino]

3. Testing Standards

# Test coverage requirements
# Minimum 80% code coverage
# All new code must have tests
# Integration tests for API endpoints
# Unit tests for business logic

# Run tests with coverage
pytest --cov=orchestration/ingestion/src --cov-report=html

Debugging and Troubleshooting

1. Local Debugging

Python Debugging

# Use debugger
import pdb; pdb.set_trace()

# Or use IDE debugger
# Set breakpoints in VS Code/PyCharm

Docker Debugging

# Debug container
docker exec -it <container_name> /bin/bash

# View container logs
docker logs <container_name>

# Inspect container
docker inspect <container_name>

2. Common Issues

Service Connection Issues

# Check service health
curl -f http://localhost:8080/v1/info  # Trino
curl -f http://localhost:3030/health     # Dagster

# Check network connectivity
docker network ls
docker network inspect iceberg_data_engineering_default

Data Quality Issues

-- Check data quality
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT id) as unique_ids,
    COUNT(CASE WHEN latitude IS NULL THEN 1 END) as null_latitudes
FROM iceberg.flight_radar.explode_complete_flight;

Performance Issues

-- Check query performance
EXPLAIN (FORMAT JSON) 
SELECT * FROM iceberg.flight_radar.explode_complete_flight 
WHERE airline_iata = 'AA';

3. Logging and Monitoring

Application Logging

import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('app.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Use in code
logger.info("Processing data batch")
logger.error("Failed to process record", exc_info=True)

Dagster Monitoring

# Dagster asset monitoring
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def flight_data(context: AssetExecutionContext):
    """Flight data should be updated every hour."""
    pass

Performance Optimization

1. Query Optimization

Trino Query Optimization

-- Use appropriate data types
-- Optimize WHERE clauses
-- Use LIMIT for testing
-- Avoid SELECT *

-- Good query
SELECT airline_iata, COUNT(*) as flight_count
FROM iceberg.flight_radar.explode_complete_flight
WHERE time >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY airline_iata
ORDER BY flight_count DESC
LIMIT 10;

Spark Optimization

# Optimize Spark operations
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Use appropriate partitioning
df.write \
    .partitionBy("airline_iata") \
    .mode("overwrite") \
    .saveAsTable("iceberg.flight_radar.partitioned_flights")

2. Data Processing Optimization

Batch Processing

# Process data in batches
def process_data_in_batches(data: List[Dict], batch_size: int = 1000):
    """Process data in batches for better performance."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        yield process_batch(batch)

Memory Management

# Use generators for large datasets
def process_large_dataset(file_path: str):
    """Process large dataset without loading into memory."""
    with open(file_path, 'r') as file:
        for line in file:
            yield process_line(line)

Deployment and CI/CD

1. Local Testing

# Test Docker services
docker-compose up -d
docker-compose exec dagster dagster-daemon run

# Test data pipeline
python -m orchestration.ingestion.src.flight_radar

# Test dbt models
dbt run --models flight_radar

2. Production Deployment

# Build production images
docker-compose -f docker-compose.prod.yml build

# Deploy to production
docker-compose -f docker-compose.prod.yml up -d

# Run health checks
./scripts/health-check.sh

3. CI/CD Pipeline

# .github/workflows/ci.yml
name: CI/CD Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.12'
      - name: Install dependencies
        run: |
          pip install uv
          uv pip install -e ".[dev]"
      - name: Run tests
        run: pytest
      - name: Run linting
        run: pylint orchestration/ingestion/src/

Contributing Guidelines

1. Pull Request Process

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Ensure all tests pass
  6. Update documentation
  7. Submit a pull request

2. Code Review Process

  1. All code must be reviewed
  2. At least one approval required
  3. Address all review comments
  4. Ensure CI/CD pipeline passes
  5. Update documentation if needed

3. Documentation Requirements

  • Update README.md for new features
  • Add API documentation for new endpoints
  • Update data models documentation
  • Include examples and use cases

Resources and References

2. Best Practices

3. Community Resources


Last update: October 3, 2025
Created: October 3, 2025