Skip to content

🎯 Dagster Orchestration Documentation¶

Overview¶

The Dagster orchestration system in this project implements a sophisticated, configuration-driven data pipeline architecture. It uses a generic processing framework that dynamically creates ingestion and transformation workflows based on YAML configurations, enabling scalable and maintainable data engineering operations.

Pipeline Architecture Overview¶

graph TD
    A[YAML Configuration] --> B[GenericProcessingJob]
    B --> C[Dynamic Pipeline Generation]
    C --> D[Ingestion Operations]
    C --> E[Preparation Operations]
    C --> F[dbt Transformations]

    D --> G[External APIs]
    D --> H[MinIO Bronze Storage]
    E --> I[MinIO Prepare Storage]
    E --> J[Delta Tables]
    F --> K[MinIO Warehouse Storage]
    F --> L[Iceberg Tables]

    M[Scheduler] --> N[Ingestion Job]
    N --> O[Success Sensor]
    O --> P[dbt Job]

    subgraph "Data Layers"
        Q[Bronze Layer: CSV/JSON/Parquet]
        R[Prepare Layer: Delta Tables]
        S[Warehouse Layer: Iceberg Tables]
    end

    subgraph "Storage Buckets"
        T[datalake/bronze: Raw Data]
        U[datalake/prepare: Delta Tables]
        V[warehouse: Iceberg Tables]
    end

    H --> Q
    I --> R
    J --> R
    K --> S
    L --> S

    H --> T
    I --> U
    K --> V

Core Components¶

1. Generic Processing Framework¶

GenericProcessingJob Class¶

The heart of the orchestration system is the GenericProcessingJob class, which provides a flexible framework for creating data pipelines.

class GenericProcessingJob:
    """Generic class for ingestion and preparation jobs using YAML config."""

    def __init__(self, config_path: str):
        self.config = self._load_config(config_path)
        self.config_type = self._load_config_type()
        self.MainIngestion = self._load_main_ingestion_class()

Key Features:¶

  • Dynamic Configuration Loading: Loads pipeline definitions from YAML files
  • Type-Safe Configuration: Uses Pydantic models for configuration validation
  • Dynamic Class Loading: Loads ingestion classes at runtime
  • Dependency Management: Automatically handles operation dependencies

2. Configuration Structure¶

Workflow Configuration¶

Each domain (flight_radar, ecommerce, asset_property) has a main configuration file:

# flight_radar_config.yml
domain: flight_radar
unity_catalog_name: flight_radar
ingestion_config_path: "dagster_configurations/configurations/ingestion/flight_radar"
prepare_config_path: "dagster_configurations/configurations/loads/flight_radar"
dbt_models: "run --models +flight_radar.intermediate.full_flight_obt+ --target flight_radar"
cron_schedule: "0 */2 * * *"
main_ingestion_path: "flight_radar.__main__.MainIngestion"
config_type_path: "flight_radar.__main__.FlightRadarIngestionConfig"

workflows:
  airport:
    ingestion:
      method: "ingest_airport"
    preparation:
      - config_key: "airport"
        depends_on: "ingest_airport"

Configuration Components:¶

  • Domain: Identifies the data domain
  • Unity Catalog: Spark catalog name for the domain
  • Paths: Configuration file locations
  • dbt Models: dbt transformation commands
  • Schedule: Cron expression for automation
  • Workflows: Individual data processing workflows

3. Pipeline Generation Process¶

Step 1: Configuration Loading¶

def _load_config(self, config_path: str) -> WorkflowConfig:
    """Load YAML configuration file."""
    with open(config_path, "r") as f:
        config_dict = yaml.safe_load(f)
    return WorkflowConfig(config_dict)

Step 2: Dynamic Class Loading¶

def _load_main_ingestion_class(self):
    """Dynamically load the MainIngestion class."""
    module_path, class_name = self.config.main_ingestion_path.rsplit(".", 1)
    module = importlib.import_module(module_path)
    return getattr(module, class_name)

Step 3: Operation Creation¶

def create_ingestion_op(self, workflow_name: str, method_name: str):
    """Factory method to create ingestion ops."""

    @op(
        name=f"{workflow_name}_ingestion_op",
        required_resource_keys={"spark", "app_ingest_config"},
        out=Out(is_required=False),
    )
    def ingestion_op(context: OpExecutionContext) -> None:
        spark = context.resources.spark
        configuration = context.resources.app_ingest_config["configurations"].config

        # Initialize the dynamically loaded MainIngestion class
        ingestion_instance = self.MainIngestion(
            spark=spark, 
            ingestion_config=configuration, 
            logger=logger
        )

        # Dynamically call the specified method
        getattr(ingestion_instance, method_name)()
        spark.stop()

    return ingestion_op

Pipeline Workflows¶

1. Flight Radar Pipeline¶

Workflow Structure¶

graph TD
    A[Flight Radar Config] --> B[Airport Workflow]
    A --> C[Aircraft Workflow]
    A --> D[Airline Workflow]
    A --> E[Flight Workflow]

    B --> B1[ingest_airport]
    B1 --> B2[airport_preparation]
    B2 --> B3[Delta Table: airport]

    C --> C1[ingest_aircraft]
    C1 --> C2[aircraft_preparation]
    C2 --> C3[Delta Table: aircraft]

    D --> D1[ingest_airline]
    D1 --> D2[airline_preparation]
    D2 --> D3[Delta Table: airline]

    E --> E1[ingest_flight]
    E1 --> E2[complete_flight_preparation]
    E1 --> E3[missing_flight_preparation]
    E2 --> E4[Delta Table: complete_flight]
    E3 --> E5[Delta Table: missing_flight]

    B3 --> F[Success Sensor]
    C3 --> F
    D3 --> F
    E4 --> F
    E5 --> F
    F --> G[dbt Transformations]

Data Flow Details¶

Airport Workflow¶
  1. Ingestion: ingest_airport → CSV/JSON files in datalake/bronze/flight_radar/airports/
  2. Preparation: airport_preparation → Delta table in datalake/prepare/flight_radar_prepared/airport/
  3. Transformation: Success sensor triggers dbt models for analytical processing
Flight Workflow¶
  1. Ingestion: ingest_flight → CSV/JSON files in datalake/bronze/flight_radar/flights/
  2. Preparation:
  3. complete_flight_preparation → Delta table in datalake/prepare/flight_radar_prepared/complete_flight/
  4. missing_flight_preparation → Delta table in datalake/prepare/flight_radar_prepared/missing_flight/
  5. Transformation: Success sensor triggers dbt models for analytical processing

2. E-commerce Pipeline¶

Workflow Structure¶

graph TD
    A[E-commerce Config] --> B[Product Workflow]
    A --> C[Customer Workflow]

    B --> B1[product_ingestion]
    B1 --> B2[product_preparation]
    B2 --> B3[Delta Table: product]

    C --> C1[customer_ingestion]
    C1 --> C2[customer_preparation]
    C2 --> C3[Delta Table: customer]

    B3 --> D[Success Sensor]
    C3 --> D
    D --> E[dbt Transformations]

3. Asset Property Pipeline¶

Workflow Structure¶

graph TD
    A[Asset Property Config] --> B[Real Estate Workflow]

    B --> B1[real_estate_ingestion]
    B1 --> B2[real_estate_preparation]
    B2 --> B3[Delta Table: real_estate_sales]

    B3 --> C[Success Sensor]
    C --> D[dbt Transformations]

Operation Types¶

1. Ingestion Operations¶

Purpose¶

  • Collect data from external sources (APIs, databases, files)
  • Validate data quality and schema
  • Store raw data in MinIO Bronze layer as CSV/JSON/Parquet files
  • Support multiple output formats based on data characteristics

Output Storage¶

  • Bronze Layer: datalake/bronze/{domain}/{entity}/
  • Formats: CSV, JSON, Parquet based on data structure
  • Partitioning: Time-based partitioning for efficient querying
  • Metadata: Schema information and data lineage tracking

Implementation¶

@op(
    name=f"{workflow_name}_ingestion_op",
    required_resource_keys={"spark", "app_ingest_config"},
    out=Out(is_required=False),
)
def ingestion_op(context: OpExecutionContext) -> None:
    spark = context.resources.spark
    configuration = context.resources.app_ingest_config["configurations"].config

    # Initialize ingestion instance
    ingestion_instance = self.MainIngestion(
        spark=spark, 
        ingestion_config=configuration, 
        logger=logger
    )

    # Execute ingestion method
    getattr(ingestion_instance, method_name)()
    spark.stop()

2. Preparation Operations¶

Purpose¶

  • Process raw data from Bronze layer
  • Apply data cleaning and standardization
  • Transform data into structured Delta tables
  • Store processed data in MinIO Prepare layer

Output Storage¶

  • Prepare Layer: datalake/prepare/{domain}_prepared/{entity}/
  • Format: Delta tables with ACID transactions
  • Features: Schema evolution, time travel, partitioning
  • Metadata: Hive Metastore integration for table catalog

Implementation¶

@op(
    name=f"{workflow_name}_preparation_op",
    required_resource_keys={"spark", "app_prepare_config"},
    out=Out(is_required=False),
)
def preparation_op(context: OpExecutionContext) -> None:
    spark = context.resources.spark
    prepare_parameter = context.resources.app_prepare_config[config_key].config

    # Execute preparation logic
    load_entrypoint(prepare_parameter, spark, logger_=logger)
    spark.stop()

3. dbt Transformations¶

Purpose¶

  • Create analytical models from prepared Delta tables
  • Apply business logic and transformations
  • Build data marts for specific use cases
  • Generate Iceberg tables in warehouse bucket
  • Provide domain-specific analytical catalogs

Output Storage¶

  • Warehouse Layer: warehouse/{domain}/{schema}/{table}/
  • Formats: Apache Iceberg tables
  • Catalogs: Domain-specific Trino catalogs
  • Features: ACID transactions, schema evolution, time travel

Domain-Specific Catalogs¶

  • Flight Radar: flight_radar catalog with prepared, intermediate, marts schemas
  • E-commerce: ecommerce catalog with prepared, intermediate, marts schemas
  • Asset Property: asset_property catalog with prepared, intermediate, marts schemas
  • Preparation Access: spark_catalog for Delta table access during preparation tasks

Success Sensor Integration¶

  • Trigger: Preparation operations completion
  • Action: Automatic dbt job execution
  • Monitoring: Pipeline success/failure tracking
  • Retry Logic: Automatic retry on failures

Implementation¶

@dbt_assets(
    name=asset_name,
    select=f"tag:{domain}",
    manifest=dbt_project_object.manifest_path,
    retry_policy=RetryPolicy(max_retries=2, delay=5),
)
def run_dbt_asset(context: AssetExecutionContext, config: DbtAssetConfig, dbt: DbtCliResource):
    """Dagster op to execute the dbt job."""
    command = config.command.replace(" --select fqn:*", "")
    logger.info(f"Final command for dbt: {command}")
    yield from dbt.cli(command.split(" "), context=context).stream()

Dependency Management¶

1. Dependency Graph Construction¶

Process¶

def build_dependency_graph(self) -> Tuple[Dict[str, OpDefinition], Dict[str, str | None]]:
    """Build the dependency graph based on configuration."""
    ops, depends_on_op = {}, {}

    # Create ingestion ops
    for workflow_name, workflow in self.config.workflows.items():
        ingestion_op = self.create_ingestion_op(workflow_name, workflow["ingestion"]["method"])
        ops[f"{workflow_name}_ingestion"] = ingestion_op

        # Create preparation ops with dependencies
        for prep_config in workflow["preparation"]:
            depends_on = prep_config["depends_on"]
            prep_op = self.create_preparation_op(
                workflow_name=f"{prep_config['config_key']}",
                config_key=prep_config["config_key"],
                depends_on=depends_on,
            )
            prep_op_name = f"{prep_config['config_key']}_prep"
            ops[prep_op_name] = prep_op

            if depends_on == workflow["ingestion"]["method"]:
                depends_on_op[prep_op_name] = f"{workflow_name}_ingestion"
            else:
                depends_on_op[prep_op_name] = None

    return ops, depends_on_op

2. Dependency Resolution¶

Grouping Strategy¶

def make_group(prep_names: List[str], ing_name: str | None = None) -> GraphDefinition:
    """Create operation groups based on dependencies."""
    graph_name = (
        f"{ing_name.split('_')[0]}_pipeline" if ing_name 
        else f"{prep_names[0].split('_')[-1]}_pipeline"
    )

    @graph(name=graph_name, tags={"subgraph": graph_name})
    def op_group():
        if ing_name is not None:
            ing_result = ops[ing_name]()
            for prep_name in prep_names:
                ops[prep_name](ing_result)
        else:
            for prep_name in prep_names:
                ops[prep_name]()

    return op_group

Scheduling and Automation¶

1. Schedule Definition¶

Cron-based Scheduling¶

def create_schedule(self):
    """Create a schedule for the ingestion job."""
    ingestion_job = self.create_ingestion_job()
    return ScheduleDefinition(
        job=ingestion_job,
        cron_schedule=self.config.cron_schedule,
    )

Schedule Examples:¶

  • Flight Radar: "0 */2 * * *" (Every 2 hours)
  • E-commerce: "0 0 * * *" (Daily at midnight)
  • Asset Property: "0 6 * * *" (Daily at 6 AM)

2. Sensor-based Automation¶

Success Sensor¶

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    monitored_jobs=[ingestion_job],
    request_job=dbt_job,
)
def run_job_generic_sensor():
    """Sensor that triggers dbt job after successful ingestion."""
    return RunRequest()

Sensor Flow:¶

  1. Ingestion job completes successfully
  2. Sensor detects success status
  3. Sensor triggers dbt transformation job
  4. dbt job processes the new data

Resource Management¶

1. Spark Resource¶

Configuration¶

@resource(
    config_schema={"unity_catalog_name": str},
    description="Spark resources.",
)
def spark_resource(context: InitResourceContext):
    final_config = {
        **SPARK_CONNECTION,
    }
    context.log.info("Building Spark session with packages...")
    spark = SparkUtils.get_spark_session(**final_config)

    context.log.info("Initializing Spark packages...")
    spark.range(1).count()
    context.log.info("Package initialization complete")
    return spark

Spark Configuration:¶

SPARK_CONNECTION = {
    "master": "spark://spark-master:7077",
    "minio_endpoint": os.getenv("MINIO_ENDPOINT", "http://minio:9000"),
    "minio_access_key": os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
    "minio_secret_key": os.getenv("MINIO_SECRET_KEY", "minioadmin"),
    "hive_uri": os.getenv("HIVE_URI", "thrift://hive-metastore:9083"),
}

2. Configuration Resources¶

Pydantic-based Configuration¶

def create_pydantic_config_resource(
    config_model: Type[ModelType],
) -> Callable[[InitResourceContext], Dict[str, ConfigResource[ModelType]]]:
    def pydantic_config_resource(
        context: InitResourceContext,
    ) -> Dict[str, ConfigResource[ModelType]]:
        """Resource that loads and validates configuration from YAML files."""
        configuration_path = context.resource_config["configuration_path"]
        result_dict: Dict[str, ConfigResource[ModelType]] = {}

        for path in Path(configuration_path).glob("*.yml"):
            with open(path, "r") as f:
                config_dict = yaml.safe_load(f) or {}
            config_name = path.parts[-1].split(".")[0]
            result_dict[config_name] = ConfigResource(config_model, config_dict["configurations"])

        return result_dict

    return pydantic_config_resource

Data Flow Architecture¶

1. Complete Pipeline Flow¶

sequenceDiagram
    participant Scheduler as Dagster Scheduler
    participant Ingestion as Ingestion Job
    participant Preparation as Preparation Job
    participant Sensor as Success Sensor
    participant dbt as dbt Job
    participant MinIO as MinIO Storage
    participant Iceberg as Iceberg Tables

    Scheduler->>Ingestion: Trigger scheduled run
    Ingestion->>MinIO: Store raw data
    Ingestion->>Preparation: Trigger preparation
    Preparation->>Iceberg: Store processed data
    Preparation->>Sensor: Notify completion
    Sensor->>dbt: Trigger transformation
    dbt->>Iceberg: Create analytical models

2. Error Handling and Retry¶

Retry Policies¶

retry_policy=RetryPolicy(
    max_retries=2,
    delay=5,
)

Error Handling Strategy:¶

  1. Ingestion Errors: Retry with exponential backoff
  2. Preparation Errors: Log and continue with other workflows
  3. dbt Errors: Retry up to 2 times with 5-second delay
  4. Resource Errors: Fail fast and alert

Monitoring and Observability¶

1. Logging¶

Structured Logging¶

logger = get_dagster_logger()

# In operations
context.log.info("Starting ingestion process")
context.log.error("Failed to process data", exc_info=True)

Log Levels:¶

  • INFO: Normal operation progress
  • WARNING: Non-critical issues
  • ERROR: Critical failures
  • DEBUG: Detailed debugging information

2. Asset Monitoring¶

Freshness Policies¶

@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def flight_data(context: AssetExecutionContext):
    """Flight data should be updated every hour."""
    pass

Monitoring Metrics:¶

  • Data Freshness: Time since last update
  • Pipeline Success Rate: Percentage of successful runs
  • Processing Time: Duration of each operation
  • Data Quality: Validation results

Configuration Management¶

1. Domain-specific Configurations¶

Flight Radar Configuration¶

# flight_radar_config.yml
domain: flight_radar
unity_catalog_name: flight_radar
ingestion_config_path: "dagster_configurations/configurations/ingestion/flight_radar"
prepare_config_path: "dagster_configurations/configurations/loads/flight_radar"
dbt_models: "run --models +flight_radar.intermediate.full_flight_obt+ --target flight_radar"
cron_schedule: "0 */2 * * *"
main_ingestion_path: "flight_radar.__main__.MainIngestion"
config_type_path: "flight_radar.__main__.FlightRadarIngestionConfig"

E-commerce Configuration¶

# ecommerce_config.yml
domain: ecommerce
unity_catalog_name: ecommerce
ingestion_config_path: "dagster_configurations/configurations/ingestion/ecommerce"
prepare_config_path: "dagster_configurations/configurations/loads/ecommerce"
dbt_models: "run --select ecommerce.intermediate.customer_order+ --target ecommerce_dev"
cron_schedule: "0 0 * * *"
main_ingestion_path: "user_product_mock.__main__.MainIngestion"
config_type_path: "user_product_mock.__main__.IngestionConfig"

2. Preparation Configurations¶

Airport Preparation Config¶

# airport.yml
configurations:
  catalog:
    host: "hive-metastore"
    port: "9083"
    user: "hiveuser"
    full_name: "flight_radar.prepared.airport"
    full_name_idx: "flight_radar.index.airport"
    catalog_table_path: "prepare/flight_radar/airport"
  storage:
    minio_endpoint: "minio:9000"
    minio_access_key: "minioadmin"
    minio_secret_key: "minioadmin"
    bucket_name: "datalake"
  schema:
    - name: "latitude"
      type_name: "FLOAT"
      description: "latitude of the airport"
      nullable: false
    - name: "longitude"
      type_name: "FLOAT"
      description: "longitude of the airport"
      nullable: false

Best Practices¶

1. Configuration Management¶

  • Use YAML: Human-readable configuration format
  • Validate Configs: Use Pydantic for type safety
  • Environment Variables: Use for sensitive data
  • Version Control: Track configuration changes

2. Error Handling¶

  • Retry Logic: Implement exponential backoff
  • Graceful Degradation: Continue processing other workflows
  • Alerting: Notify on critical failures
  • Logging: Comprehensive error logging

3. Performance Optimization¶

  • Resource Management: Properly manage Spark sessions
  • Parallel Processing: Use Dagster's parallel execution
  • Caching: Cache expensive computations
  • Monitoring: Track performance metrics

4. Testing¶

  • Unit Tests: Test individual operations
  • Integration Tests: Test complete pipelines
  • Configuration Tests: Validate YAML configurations
  • Mocking: Mock external dependencies

Troubleshooting¶

1. Common Issues¶

Configuration Errors¶

# Check configuration syntax
python -c "import yaml; yaml.safe_load(open('config.yml'))"

# Validate Pydantic models
python -c "from config_model import ConfigModel; ConfigModel.model_validate(config_dict)"

Resource Issues¶

# Check Spark connectivity
docker exec -it spark-driver spark-submit --version

# Check MinIO connectivity
docker exec -it minio-setup-bucket /usr/bin/mc ls minio/

Dependency Issues¶

# Check Dagster dependencies
dagster asset list

# Check dbt dependencies
dbt deps

2. Debugging Techniques¶

Enable Debug Logging¶

import logging
logging.basicConfig(level=logging.DEBUG)

Use Dagster UI¶

  • Access Dagster UI at http://localhost:3030
  • View run logs and execution details
  • Monitor asset lineage and dependencies

Check Resource Status¶

# In Dagster operations
context.log.info(f"Spark session: {spark}")
context.log.info(f"Configuration: {configuration}")

Future Enhancements¶

1. Advanced Features¶

  • Dynamic Scaling: Auto-scale resources based on workload
  • Data Quality Monitoring: Real-time data quality checks
  • ML Integration: Machine learning pipeline integration
  • Streaming Support: Real-time data processing

2. Monitoring Improvements¶

  • Custom Metrics: Business-specific metrics
  • Alerting: Advanced alerting rules
  • Dashboards: Real-time monitoring dashboards
  • Anomaly Detection: Automated anomaly detection

3. Performance Optimizations¶

  • Caching: Advanced caching strategies
  • Partitioning: Intelligent data partitioning
  • Compression: Data compression optimization
  • Indexing: Automated index management

Last update: October 3, 2025
Created: October 3, 2025