Skip to content

πŸ“‘ API DocumentationΒΆ

OverviewΒΆ

The Iceberg Data Engineering Platform provides a comprehensive set of APIs for data ingestion across multiple domains. The system implements a modular architecture with domain-specific data collectors, validation layers, and standardized data contracts.

API ArchitectureΒΆ

graph TD
    A[External APIs] --> B[Data Collectors]
    B --> C[Validation Layer]
    C --> D[Data Models]
    D --> E[Storage Layer]

    subgraph "Data Collectors"
        B1[Flight Radar Collector]
        B2[Real Estate Collector]
        B3[E-commerce Collector]
    end

    subgraph "Validation Layer"
        C1[Pydantic Models]
        C2[Schema Validation]
        C3[Data Quality Checks]
    end

    subgraph "Data Models"
        D1[FlightModel]
        D2[RealEstateModel]
        D3[EcommerceModel]
    end

Data DomainsΒΆ

1. Flight Radar APIΒΆ

OverviewΒΆ

The Flight Radar API integration collects real-time flight data from FlightRadar24, including aircraft information, airline details, and airport data.

Base URLΒΆ

  • FlightRadar24 API: https://www.flightradar24.com/
  • Internal API: flight_radar/requests_api/

EndpointsΒΆ

Get Flight DataΒΆ
# Method: GET
# Endpoint: /flights
# Description: Retrieves current flight data

class RequestFlightRadarData:
    def fetch_data(self) -> Iterator[FlightModel | MissingFlightModel]:
        """
        Fetches flight data from FlightRadar24 API

        Returns:
            Iterator[FlightModel | MissingFlightModel]: Flight data records
        """
Get Flight DetailsΒΆ
# Method: GET
# Endpoint: /flight/{flight_id}/details
# Description: Retrieves detailed information for a specific flight

def get_details(self, flight: Flight) -> Tuple[Dict, str]:
    """
    Gets detailed information for a specific flight

    Args:
        flight: Flight object containing basic flight information

    Returns:
        Tuple[Dict, str]: Flight details and URL for additional data
    """

Data ModelsΒΆ

FlightModelΒΆ
class FlightModel(BaseModel):
    # Core flight information
    latitude: float
    longitude: float
    id: str
    icao_24bit: str
    heading: int
    altitude: int
    ground_speed: int
    squawk: str
    registration: str
    time: datetime
    number: str
    on_ground: int
    vertical_speed: int
    callsign: str
    model_missing_data: bool

    # Nested models
    departure: DepartureModel
    arrival: ArrivalModel
    airline: AirlineModel
    aircraft: AirCraftModel
AirlineModelΒΆ
class AirlineModel(BaseModel):
    iata: str
    icao: Optional[str]
    name: Optional[str]
    short_name: Optional[str]
AirCraftModelΒΆ
class AirCraftModel(BaseModel):
    code: str
    age: Optional[str]
    country_id: Optional[int]
    model: Optional[str]
AirportModel (Departure/Arrival)ΒΆ
class CommonArrivalAndDepartureModel(BaseModel):
    iata: str
    altitude: Optional[int]
    country_code: Optional[str]
    country_name: Optional[str]
    latitude: Optional[float]
    longitude: Optional[float]
    icao: Optional[str]
    baggage: Optional[str]
    gate: Optional[str]
    name: Optional[str]
    terminal: Optional[str]
    visible: Optional[bool]
    website: Optional[str]
    timezone_abbr: Optional[str]
    timezone_abbr_name: Optional[str]
    timezone_name: Optional[str]
    timezone_offset: Optional[int]
    timezone_offsetHours: Optional[str]

Error HandlingΒΆ

# Retry mechanism for Cloudflare errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(CloudflareError))
def get_details(self, flight: Flight):
    # Implementation with retry logic
    pass

# Missing flight handling
def _handle_missing_flight(self, flight: Flight) -> MissingFlightModel:
    """
    Handles cases where flight data is incomplete or missing

    Args:
        flight: Flight object with missing data

    Returns:
        MissingFlightModel: Model representing missing flight data
    """

2. Real Estate APIΒΆ

OverviewΒΆ

The Real Estate API collects property sales data from ArcGIS services, providing comprehensive real estate market information.

Base URLΒΆ

  • ArcGIS API: https://services1.arcgis.com/ioennV6PpG5Xodq0/ArcGIS/rest/services/OpenData_A5/FeatureServer/1/query
  • Internal API: asset_property/real_estate_request.py

EndpointsΒΆ

Get Real Estate DataΒΆ
# Method: POST
# Endpoint: /query
# Description: Retrieves real estate sales data

def fetch_real_estate_data() -> Iterator[Dict[str, Any] | None]:
    """
    Fetches real estate data from ArcGIS API

    Returns:
        Iterator[Dict[str, Any] | None]: Real estate records
    """

Request ParametersΒΆ

api_params = {
    "where": "1=1",           # SQL WHERE clause
    "outFields": "*",         # Fields to return
    "outSR": "4326",          # Spatial reference system
    "f": "json"              # Response format
}

HeadersΒΆ

headers = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/91.0.4472.124 Safari/537.36"
    )
}

Error HandlingΒΆ

class ValidationError(Exception):
    """Custom exception for API validation errors"""
    def __init__(self, message):
        self.message = message

# API response validation
if "error" in data:
    raise ValidationError(f"API returned an error: {data['error']}")

if "features" not in data:
    raise ValidationError("API response missing 'features' key.")

3. E-commerce APIΒΆ

OverviewΒΆ

The E-commerce API generates mock user and product data for testing and demonstration purposes.

Base URLΒΆ

  • Internal API: user_product_mock/

EndpointsΒΆ

Generate Mock DataΒΆ
# Method: POST
# Endpoint: /generate
# Description: Generates mock e-commerce data

def generate_mock_data() -> Iterator[Dict[str, Any]]:
    """
    Generates mock e-commerce data using Faker

    Returns:
        Iterator[Dict[str, Any]]: Mock e-commerce records
    """

Data ContractsΒΆ

Base Contract ModelΒΆ

class ContractModel(BaseModel):
    """Base contract model for all data entities"""
    id_record: str = Field(default_factory=lambda: str(uuid.uuid1()))
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: datetime = Field(default_factory=datetime.now)

Data Quality ContractsΒΆ

Schema ValidationΒΆ

# Pydantic model validation
class FlightModel(ContractModel, BaseModel):
    latitude: float = Field(ge=-90, le=90)
    longitude: float = Field(ge=-180, le=180)
    altitude: int = Field(ge=0, le=50000)
    ground_speed: int = Field(ge=0, le=1000)

Data Type ConversionΒΆ

def convert_type_func(string: Optional[str], function: Callable):
    """
    Converts string values to specified types

    Args:
        string: String value to convert
        function: Conversion function (int, float, etc.)

    Returns:
        Converted value or None if invalid
    """
    return function(string) if string not in [None, "N/A"] else None

Storage IntegrationΒΆ

MinIO UploadersΒΆ

CSV UploaderΒΆ

class CSVUploader:
    def upload(self, data: List[Dict], bucket: str, key: str) -> bool:
        """
        Uploads data as CSV to MinIO Bronze layer

        Args:
            data: List of dictionaries to upload
            bucket: MinIO bucket name (datalake)
            key: Object key/path (bronze/{domain}/{entity}/)

        Returns:
            bool: Upload success status
        """

JSON UploaderΒΆ

class JSONUploader:
    def upload(self, data: Dict, bucket: str, key: str) -> bool:
        """
        Uploads data as JSON to MinIO Bronze layer

        Args:
            data: Dictionary to upload
            bucket: MinIO bucket name (datalake)
            key: Object key/path (bronze/{domain}/{entity}/)

        Returns:
            bool: Upload success status
        """

Parquet UploaderΒΆ

class ParquetUploader:
    def upload(self, data: List[Dict], bucket: str, key: str) -> bool:
        """
        Uploads data as Parquet to MinIO Bronze layer

        Args:
            data: List of dictionaries to upload
            bucket: MinIO bucket name (datalake)
            key: Object key/path (bronze/{domain}/{entity}/)

        Returns:
            bool: Upload success status
        """

Uploader FactoryΒΆ

class UploaderFactory:
    @staticmethod
    def create_uploader(file_type: str) -> Uploader:
        """
        Creates appropriate uploader based on file type

        Args:
            file_type: Type of file (csv, json, parquet)

        Returns:
            Uploader: Appropriate uploader instance
        """
        uploaders = {
            "csv": CSVUploader(),
            "json": JSONUploader(),
            "parquet": ParquetUploader()
        }
        return uploaders.get(file_type.lower(), CSVUploader())

Error HandlingΒΆ

Retry MechanismsΒΆ

from tenacity import retry, retry_if_exception_type, stop_after_attempt

# Retry for network errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(CloudflareError))
def api_call():
    pass

# Retry for JSON decode errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(JSONDecodeError))
def parse_json():
    pass

Exception TypesΒΆ

# Custom exceptions
class ValidationError(Exception):
    """Raised when data validation fails"""
    pass

class NoCalledMethodException(Exception):
    """Raised when required method is not called"""
    pass

# HTTP exceptions
from requests.exceptions import HTTPError, JSONDecodeError
from FlightRadar24.errors import CloudflareError

Data Processing PipelineΒΆ

Ingestion FlowΒΆ

sequenceDiagram
    participant API as External API
    participant Collector as Data Collector
    participant Validator as Validation Layer
    participant Bronze as MinIO Bronze Storage
    participant Prep as Spark Preparation
    participant Prepare as MinIO Prepare Storage
    participant Delta as Delta Tables

    API->>Collector: Fetch data
    Collector->>Validator: Validate schema
    Validator->>Bronze: Upload CSV/JSON/Parquet
    Bronze->>Prep: Process raw data
    Prep->>Prepare: Store Delta tables
    Prepare->>Delta: Create table metadata

Data Quality ChecksΒΆ

def validate_data_quality(data: Dict) -> bool:
    """
    Performs data quality validation

    Args:
        data: Data dictionary to validate

    Returns:
        bool: Validation result
    """
    checks = [
        check_required_fields(data),
        check_data_types(data),
        check_value_ranges(data),
        check_consistency(data)
    ]
    return all(checks)

ConfigurationΒΆ

API ConfigurationΒΆ

# Flight Radar API
FLIGHT_RADAR_API_CONFIG = {
    "base_url": "https://www.flightradar24.com/",
    "timeout": 30,
    "retry_attempts": 5,
    "rate_limit": 100  # requests per minute
}

# Real Estate API
REAL_ESTATE_API_CONFIG = {
    "base_url": "https://services1.arcgis.com/ioennV6PpG5Xodq0/ArcGIS/rest/services/OpenData_A5/FeatureServer/1/query",
    "timeout": 300,
    "retry_attempts": 3
}

MinIO ConfigurationΒΆ

MINIO_CONFIG = {
    "endpoint": "http://minio:9000",
    "access_key": "minioadmin",
    "secret_key": "minioadmin",
    "buckets": ["datalake", "warehouse", "logger"]
}

Monitoring and LoggingΒΆ

Logging ConfigurationΒΆ

import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("api.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

Metrics CollectionΒΆ

def collect_api_metrics():
    """
    Collects API performance metrics

    Returns:
        Dict: Metrics dictionary
    """
    return {
        "requests_count": get_request_count(),
        "success_rate": get_success_rate(),
        "average_response_time": get_avg_response_time(),
        "error_count": get_error_count()
    }

TestingΒΆ

Unit TestsΒΆ

import pytest
from unittest.mock import Mock, patch

class TestFlightRadarAPI:
    def test_fetch_data_success(self):
        """Test successful data fetching"""
        with patch('requests.get') as mock_get:
            mock_get.return_value.json.return_value = mock_flight_data
            result = api.fetch_data()
            assert len(list(result)) > 0

    def test_fetch_data_error_handling(self):
        """Test error handling in data fetching"""
        with patch('requests.get') as mock_get:
            mock_get.side_effect = HTTPError("API Error")
            with pytest.raises(HTTPError):
                api.fetch_data()

Integration TestsΒΆ

class TestAPIIntegration:
    def test_end_to_end_flow(self):
        """Test complete data flow from API to storage"""
        # Test data collection
        data = collect_data()

        # Test validation
        validated_data = validate_data(data)

        # Test storage
        upload_result = upload_to_storage(validated_data)

        assert upload_result is True

Best PracticesΒΆ

API DesignΒΆ

  1. Consistent Error Handling: Use standardized error responses
  2. Rate Limiting: Implement appropriate rate limiting
  3. Retry Logic: Use exponential backoff for retries
  4. Data Validation: Validate all input data
  5. Logging: Log all API interactions

Data QualityΒΆ

  1. Schema Validation: Use Pydantic for data validation
  2. Type Safety: Ensure proper data type conversion
  3. Null Handling: Handle null/empty values appropriately
  4. Data Consistency: Maintain data consistency across domains

PerformanceΒΆ

  1. Async Operations: Use async/await for I/O operations
  2. Connection Pooling: Reuse HTTP connections
  3. Caching: Implement appropriate caching strategies
  4. Batch Processing: Process data in batches when possible

Last update: October 3, 2025
Created: October 3, 2025