π‘ API DocumentationΒΆ
OverviewΒΆ
The Iceberg Data Engineering Platform provides a comprehensive set of APIs for data ingestion across multiple domains. The system implements a modular architecture with domain-specific data collectors, validation layers, and standardized data contracts.
API ArchitectureΒΆ
graph TD
A[External APIs] --> B[Data Collectors]
B --> C[Validation Layer]
C --> D[Data Models]
D --> E[Storage Layer]
subgraph "Data Collectors"
B1[Flight Radar Collector]
B2[Real Estate Collector]
B3[E-commerce Collector]
end
subgraph "Validation Layer"
C1[Pydantic Models]
C2[Schema Validation]
C3[Data Quality Checks]
end
subgraph "Data Models"
D1[FlightModel]
D2[RealEstateModel]
D3[EcommerceModel]
end
Data DomainsΒΆ
1. Flight Radar APIΒΆ
OverviewΒΆ
The Flight Radar API integration collects real-time flight data from FlightRadar24, including aircraft information, airline details, and airport data.
Base URLΒΆ
- FlightRadar24 API:
https://www.flightradar24.com/ - Internal API:
flight_radar/requests_api/
EndpointsΒΆ
Get Flight DataΒΆ
# Method: GET
# Endpoint: /flights
# Description: Retrieves current flight data
class RequestFlightRadarData:
def fetch_data(self) -> Iterator[FlightModel | MissingFlightModel]:
"""
Fetches flight data from FlightRadar24 API
Returns:
Iterator[FlightModel | MissingFlightModel]: Flight data records
"""
Get Flight DetailsΒΆ
# Method: GET
# Endpoint: /flight/{flight_id}/details
# Description: Retrieves detailed information for a specific flight
def get_details(self, flight: Flight) -> Tuple[Dict, str]:
"""
Gets detailed information for a specific flight
Args:
flight: Flight object containing basic flight information
Returns:
Tuple[Dict, str]: Flight details and URL for additional data
"""
Data ModelsΒΆ
FlightModelΒΆ
class FlightModel(BaseModel):
# Core flight information
latitude: float
longitude: float
id: str
icao_24bit: str
heading: int
altitude: int
ground_speed: int
squawk: str
registration: str
time: datetime
number: str
on_ground: int
vertical_speed: int
callsign: str
model_missing_data: bool
# Nested models
departure: DepartureModel
arrival: ArrivalModel
airline: AirlineModel
aircraft: AirCraftModel
AirlineModelΒΆ
class AirlineModel(BaseModel):
iata: str
icao: Optional[str]
name: Optional[str]
short_name: Optional[str]
AirCraftModelΒΆ
class AirCraftModel(BaseModel):
code: str
age: Optional[str]
country_id: Optional[int]
model: Optional[str]
AirportModel (Departure/Arrival)ΒΆ
class CommonArrivalAndDepartureModel(BaseModel):
iata: str
altitude: Optional[int]
country_code: Optional[str]
country_name: Optional[str]
latitude: Optional[float]
longitude: Optional[float]
icao: Optional[str]
baggage: Optional[str]
gate: Optional[str]
name: Optional[str]
terminal: Optional[str]
visible: Optional[bool]
website: Optional[str]
timezone_abbr: Optional[str]
timezone_abbr_name: Optional[str]
timezone_name: Optional[str]
timezone_offset: Optional[int]
timezone_offsetHours: Optional[str]
Error HandlingΒΆ
# Retry mechanism for Cloudflare errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(CloudflareError))
def get_details(self, flight: Flight):
# Implementation with retry logic
pass
# Missing flight handling
def _handle_missing_flight(self, flight: Flight) -> MissingFlightModel:
"""
Handles cases where flight data is incomplete or missing
Args:
flight: Flight object with missing data
Returns:
MissingFlightModel: Model representing missing flight data
"""
2. Real Estate APIΒΆ
OverviewΒΆ
The Real Estate API collects property sales data from ArcGIS services, providing comprehensive real estate market information.
Base URLΒΆ
- ArcGIS API:
https://services1.arcgis.com/ioennV6PpG5Xodq0/ArcGIS/rest/services/OpenData_A5/FeatureServer/1/query - Internal API:
asset_property/real_estate_request.py
EndpointsΒΆ
Get Real Estate DataΒΆ
# Method: POST
# Endpoint: /query
# Description: Retrieves real estate sales data
def fetch_real_estate_data() -> Iterator[Dict[str, Any] | None]:
"""
Fetches real estate data from ArcGIS API
Returns:
Iterator[Dict[str, Any] | None]: Real estate records
"""
Request ParametersΒΆ
api_params = {
"where": "1=1", # SQL WHERE clause
"outFields": "*", # Fields to return
"outSR": "4326", # Spatial reference system
"f": "json" # Response format
}
HeadersΒΆ
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.124 Safari/537.36"
)
}
Error HandlingΒΆ
class ValidationError(Exception):
"""Custom exception for API validation errors"""
def __init__(self, message):
self.message = message
# API response validation
if "error" in data:
raise ValidationError(f"API returned an error: {data['error']}")
if "features" not in data:
raise ValidationError("API response missing 'features' key.")
3. E-commerce APIΒΆ
OverviewΒΆ
The E-commerce API generates mock user and product data for testing and demonstration purposes.
Base URLΒΆ
- Internal API:
user_product_mock/
EndpointsΒΆ
Generate Mock DataΒΆ
# Method: POST
# Endpoint: /generate
# Description: Generates mock e-commerce data
def generate_mock_data() -> Iterator[Dict[str, Any]]:
"""
Generates mock e-commerce data using Faker
Returns:
Iterator[Dict[str, Any]]: Mock e-commerce records
"""
Data ContractsΒΆ
Base Contract ModelΒΆ
class ContractModel(BaseModel):
"""Base contract model for all data entities"""
id_record: str = Field(default_factory=lambda: str(uuid.uuid1()))
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
Data Quality ContractsΒΆ
Schema ValidationΒΆ
# Pydantic model validation
class FlightModel(ContractModel, BaseModel):
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
altitude: int = Field(ge=0, le=50000)
ground_speed: int = Field(ge=0, le=1000)
Data Type ConversionΒΆ
def convert_type_func(string: Optional[str], function: Callable):
"""
Converts string values to specified types
Args:
string: String value to convert
function: Conversion function (int, float, etc.)
Returns:
Converted value or None if invalid
"""
return function(string) if string not in [None, "N/A"] else None
Storage IntegrationΒΆ
MinIO UploadersΒΆ
CSV UploaderΒΆ
class CSVUploader:
def upload(self, data: List[Dict], bucket: str, key: str) -> bool:
"""
Uploads data as CSV to MinIO Bronze layer
Args:
data: List of dictionaries to upload
bucket: MinIO bucket name (datalake)
key: Object key/path (bronze/{domain}/{entity}/)
Returns:
bool: Upload success status
"""
JSON UploaderΒΆ
class JSONUploader:
def upload(self, data: Dict, bucket: str, key: str) -> bool:
"""
Uploads data as JSON to MinIO Bronze layer
Args:
data: Dictionary to upload
bucket: MinIO bucket name (datalake)
key: Object key/path (bronze/{domain}/{entity}/)
Returns:
bool: Upload success status
"""
Parquet UploaderΒΆ
class ParquetUploader:
def upload(self, data: List[Dict], bucket: str, key: str) -> bool:
"""
Uploads data as Parquet to MinIO Bronze layer
Args:
data: List of dictionaries to upload
bucket: MinIO bucket name (datalake)
key: Object key/path (bronze/{domain}/{entity}/)
Returns:
bool: Upload success status
"""
Uploader FactoryΒΆ
class UploaderFactory:
@staticmethod
def create_uploader(file_type: str) -> Uploader:
"""
Creates appropriate uploader based on file type
Args:
file_type: Type of file (csv, json, parquet)
Returns:
Uploader: Appropriate uploader instance
"""
uploaders = {
"csv": CSVUploader(),
"json": JSONUploader(),
"parquet": ParquetUploader()
}
return uploaders.get(file_type.lower(), CSVUploader())
Error HandlingΒΆ
Retry MechanismsΒΆ
from tenacity import retry, retry_if_exception_type, stop_after_attempt
# Retry for network errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(CloudflareError))
def api_call():
pass
# Retry for JSON decode errors
@retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(JSONDecodeError))
def parse_json():
pass
Exception TypesΒΆ
# Custom exceptions
class ValidationError(Exception):
"""Raised when data validation fails"""
pass
class NoCalledMethodException(Exception):
"""Raised when required method is not called"""
pass
# HTTP exceptions
from requests.exceptions import HTTPError, JSONDecodeError
from FlightRadar24.errors import CloudflareError
Data Processing PipelineΒΆ
Ingestion FlowΒΆ
sequenceDiagram
participant API as External API
participant Collector as Data Collector
participant Validator as Validation Layer
participant Bronze as MinIO Bronze Storage
participant Prep as Spark Preparation
participant Prepare as MinIO Prepare Storage
participant Delta as Delta Tables
API->>Collector: Fetch data
Collector->>Validator: Validate schema
Validator->>Bronze: Upload CSV/JSON/Parquet
Bronze->>Prep: Process raw data
Prep->>Prepare: Store Delta tables
Prepare->>Delta: Create table metadata
Data Quality ChecksΒΆ
def validate_data_quality(data: Dict) -> bool:
"""
Performs data quality validation
Args:
data: Data dictionary to validate
Returns:
bool: Validation result
"""
checks = [
check_required_fields(data),
check_data_types(data),
check_value_ranges(data),
check_consistency(data)
]
return all(checks)
ConfigurationΒΆ
API ConfigurationΒΆ
# Flight Radar API
FLIGHT_RADAR_API_CONFIG = {
"base_url": "https://www.flightradar24.com/",
"timeout": 30,
"retry_attempts": 5,
"rate_limit": 100 # requests per minute
}
# Real Estate API
REAL_ESTATE_API_CONFIG = {
"base_url": "https://services1.arcgis.com/ioennV6PpG5Xodq0/ArcGIS/rest/services/OpenData_A5/FeatureServer/1/query",
"timeout": 300,
"retry_attempts": 3
}
MinIO ConfigurationΒΆ
MINIO_CONFIG = {
"endpoint": "http://minio:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin",
"buckets": ["datalake", "warehouse", "logger"]
}
Monitoring and LoggingΒΆ
Logging ConfigurationΒΆ
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("api.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
Metrics CollectionΒΆ
def collect_api_metrics():
"""
Collects API performance metrics
Returns:
Dict: Metrics dictionary
"""
return {
"requests_count": get_request_count(),
"success_rate": get_success_rate(),
"average_response_time": get_avg_response_time(),
"error_count": get_error_count()
}
TestingΒΆ
Unit TestsΒΆ
import pytest
from unittest.mock import Mock, patch
class TestFlightRadarAPI:
def test_fetch_data_success(self):
"""Test successful data fetching"""
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = mock_flight_data
result = api.fetch_data()
assert len(list(result)) > 0
def test_fetch_data_error_handling(self):
"""Test error handling in data fetching"""
with patch('requests.get') as mock_get:
mock_get.side_effect = HTTPError("API Error")
with pytest.raises(HTTPError):
api.fetch_data()
Integration TestsΒΆ
class TestAPIIntegration:
def test_end_to_end_flow(self):
"""Test complete data flow from API to storage"""
# Test data collection
data = collect_data()
# Test validation
validated_data = validate_data(data)
# Test storage
upload_result = upload_to_storage(validated_data)
assert upload_result is True
Best PracticesΒΆ
API DesignΒΆ
- Consistent Error Handling: Use standardized error responses
- Rate Limiting: Implement appropriate rate limiting
- Retry Logic: Use exponential backoff for retries
- Data Validation: Validate all input data
- Logging: Log all API interactions
Data QualityΒΆ
- Schema Validation: Use Pydantic for data validation
- Type Safety: Ensure proper data type conversion
- Null Handling: Handle null/empty values appropriately
- Data Consistency: Maintain data consistency across domains
PerformanceΒΆ
- Async Operations: Use async/await for I/O operations
- Connection Pooling: Reuse HTTP connections
- Caching: Implement appropriate caching strategies
- Batch Processing: Process data in batches when possible
Last update:
October 3, 2025
Created: October 3, 2025
Created: October 3, 2025