Skip to main content

๐Ÿ›ก๏ธ Error Handling: Build Resilient Workflows That Never Fail

Robust error handling transforms fragile workflows into resilient production systems - it's the difference between pipelines that break at 3 AM requiring manual intervention and those that self-heal, retry intelligently, and gracefully degrade when necessary. Like building safety systems for a spacecraft, comprehensive error handling ensures your workflows can handle unexpected failures, recover automatically, and provide clear diagnostics when intervention is needed. Whether you're dealing with transient network issues, data quality problems, or system failures, mastering error handling is crucial for production-grade orchestration. Let's explore the comprehensive world of workflow error management! ๐Ÿšจ

The Error Handling Architecture

Think of error handling as your workflow's immune system - it detects problems, attempts recovery, isolates failures, and maintains overall system health even when individual components fail. Using Airflow's sophisticated retry mechanisms, callbacks, trigger rules, and custom error handlers, you can build workflows that anticipate failures, recover gracefully, and provide comprehensive diagnostics. Understanding error types, retry strategies, and recovery patterns is essential for building reliable automation!

graph TB A[Error Handling] --> B[Error Types] A --> C[Retry Strategies] A --> D[Recovery Patterns] A --> E[Monitoring] B --> F[Transient] B --> G[Permanent] B --> H[Data Quality] B --> I[System] C --> J[Fixed Retry] C --> K[Exponential Backoff] C --> L[Custom Logic] C --> M[Circuit Breaker] D --> N[Graceful Degradation] D --> O[Compensating Actions] D --> P[Rollback] D --> Q[Alternative Paths] E --> R[Alerting] E --> S[Logging] E --> T[Metrics] E --> U[Debugging] V[Implementation] --> W[Callbacks] V --> X[Hooks] V --> Y[Sensors] V --> Z[Handlers] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Fault-Tolerant Data Platform ๐Ÿ—๏ธ

You're building a fault-tolerant data platform that processes critical business data from unreliable sources, handles network failures and service outages gracefully, implements intelligent retry logic with backoff strategies, provides detailed error diagnostics and root cause analysis, maintains data consistency despite partial failures, implements circuit breakers for failing services, creates compensating transactions for rollback scenarios, and ensures zero data loss even during catastrophic failures. Your platform must handle thousands of potential failure points, provide clear visibility into error patterns, recover automatically when possible, and alert appropriately when manual intervention is required. Let's build a comprehensive error handling framework!

# Advanced Error Handling for Apache Airflow
# pip install apache-airflow tenacity requests circuitbreaker prometheus-client

import os
import json
import logging
import traceback
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union, Type
from dataclasses import dataclass, field
from enum import Enum, auto
from functools import wraps
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Airflow imports
from airflow import DAG, AirflowException
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable, TaskInstance, DagRun
from airflow.utils.state import State
from airflow.hooks.base import BaseHook

# Retry and resilience libraries
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
    after_log
)
import circuitbreaker

# ==================== Error Types ====================

class ErrorType(Enum):
    """Classification of error types."""
    TRANSIENT = auto()      # Temporary, likely to succeed on retry
    PERMANENT = auto()       # Permanent failure, won't succeed on retry
    DATA_QUALITY = auto()    # Data validation/quality issues
    CONFIGURATION = auto()   # Configuration/setup errors
    RESOURCE = auto()        # Resource availability issues
    NETWORK = auto()         # Network/connectivity issues
    AUTHENTICATION = auto()  # Auth/permission issues
    UNKNOWN = auto()         # Unknown error type

@dataclass
class ErrorContext:
    """Context information for errors."""
    error_type: ErrorType
    task_id: str
    dag_id: str
    execution_date: datetime
    attempt_number: int
    error_message: str
    stack_trace: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

# ==================== Error Classification ====================

class ErrorClassifier:
    """Classify errors to determine handling strategy."""
    
    # Error patterns for classification
    TRANSIENT_PATTERNS = [
        'timeout', 'connection reset', 'temporary', 'unavailable',
        'too many requests', 'rate limit', 'throttled'
    ]
    
    PERMANENT_PATTERNS = [
        'not found', 'invalid', 'unauthorized', 'forbidden',
        'bad request', 'malformed', 'unsupported'
    ]
    
    NETWORK_PATTERNS = [
        'connection', 'network', 'socket', 'dns', 'resolution',
        'refused', 'unreachable'
    ]
    
    DATA_PATTERNS = [
        'validation', 'schema', 'constraint', 'duplicate',
        'integrity', 'null value', 'data type'
    ]
    
    @classmethod
    def classify(cls, exception: Exception) -> ErrorType:
        """Classify an exception into an error type."""
        error_str = str(exception).lower()
        
        # Check for specific exception types
        if isinstance(exception, (ConnectionError, TimeoutError)):
            return ErrorType.NETWORK
        elif isinstance(exception, PermissionError):
            return ErrorType.AUTHENTICATION
        elif isinstance(exception, ValueError):
            return ErrorType.DATA_QUALITY
        elif isinstance(exception, KeyError):
            return ErrorType.CONFIGURATION
        
        # Pattern matching
        for pattern in cls.TRANSIENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.TRANSIENT
        
        for pattern in cls.PERMANENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.PERMANENT
        
        for pattern in cls.NETWORK_PATTERNS:
            if pattern in error_str:
                return ErrorType.NETWORK
        
        for pattern in cls.DATA_PATTERNS:
            if pattern in error_str:
                return ErrorType.DATA_QUALITY
        
        return ErrorType.UNKNOWN
    
    @classmethod
    def should_retry(cls, error_type: ErrorType) -> bool:
        """Determine if error type should be retried."""
        retryable_types = [
            ErrorType.TRANSIENT,
            ErrorType.NETWORK,
            ErrorType.RESOURCE,
            ErrorType.UNKNOWN  # Retry unknown errors cautiously
        ]
        return error_type in retryable_types

# ==================== Retry Strategies ====================

class RetryStrategy:
    """Advanced retry strategies."""
    
    @staticmethod
    def fixed_retry(max_attempts: int = 3, delay: float = 1.0):
        """Fixed delay retry strategy."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                for attempt in range(max_attempts):
                    try:
                        return func(*args, **kwargs)
                    except Exception as e:
                        if attempt == max_attempts - 1:
                            raise
                        logging.warning(f"Attempt {attempt + 1} failed: {e}")
                        time.sleep(delay)
                return None
            return wrapper
        return decorator
    
    @staticmethod
    def exponential_backoff(
        max_attempts: int = 5,
        initial_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0
    ):
        """Exponential backoff retry strategy."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                delay = initial_delay
                
                for attempt in range(max_attempts):
                    try:
                        return func(*args, **kwargs)
                    except Exception as e:
                        if attempt == max_attempts - 1:
                            raise
                        
                        # Classify error
                        error_type = ErrorClassifier.classify(e)
                        
                        if not ErrorClassifier.should_retry(error_type):
                            logging.error(f"Non-retryable error: {e}")
                            raise
                        
                        logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
                        time.sleep(delay)
                        
                        # Increase delay exponentially
                        delay = min(delay * exponential_base, max_delay)
                
                return None
            return wrapper
        return decorator
    
    @staticmethod
    def fibonacci_backoff(max_attempts: int = 7):
        """Fibonacci sequence backoff strategy."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                fib_prev, fib_curr = 0, 1
                
                for attempt in range(max_attempts):
                    try:
                        return func(*args, **kwargs)
                    except Exception as e:
                        if attempt == max_attempts - 1:
                            raise
                        
                        delay = fib_curr
                        logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
                        time.sleep(delay)
                        
                        # Next Fibonacci number
                        fib_prev, fib_curr = fib_curr, fib_prev + fib_curr
                
                return None
            return wrapper
        return decorator
    
    @staticmethod
    def adaptive_retry(context: Dict[str, Any]):
        """Adaptive retry based on error history."""
        error_history = context.get('error_history', [])
        
        # Analyze error pattern
        if len(error_history) > 5:
            # Too many errors, use longer delays
            return RetryStrategy.exponential_backoff(max_attempts=3, initial_delay=10)
        elif any(e.error_type == ErrorType.NETWORK for e in error_history):
            # Network issues, use exponential backoff
            return RetryStrategy.exponential_backoff(max_attempts=5)
        else:
            # Default to fixed retry
            return RetryStrategy.fixed_retry(max_attempts=3)

# ==================== Circuit Breaker Pattern ====================

class CircuitBreakerManager:
    """Manage circuit breakers for external services."""
    
    def __init__(self):
        self.breakers = {}
        self.logger = logging.getLogger(__name__)
    
    def get_breaker(
        self,
        service_name: str,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0
    ) -> circuitbreaker.CircuitBreaker:
        """Get or create circuit breaker for service."""
        if service_name not in self.breakers:
            self.breakers[service_name] = circuitbreaker.CircuitBreaker(
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
                expected_exception=Exception
            )
        return self.breakers[service_name]
    
    def call_with_breaker(
        self,
        service_name: str,
        func: Callable,
        *args,
        **kwargs
    ) -> Any:
        """Call function with circuit breaker protection."""
        breaker = self.get_breaker(service_name)
        
        try:
            with breaker:
                return func(*args, **kwargs)
        except circuitbreaker.CircuitBreakerError:
            self.logger.error(f"Circuit breaker open for {service_name}")
            raise AirflowException(f"Service {service_name} is currently unavailable")
        except Exception as e:
            self.logger.error(f"Error calling {service_name}: {e}")
            raise

# ==================== Error Recovery Patterns ====================

class ErrorRecovery:
    """Error recovery patterns and strategies."""
    
    @staticmethod
    def graceful_degradation(primary_func: Callable, fallback_func: Callable):
        """Gracefully degrade to fallback function on error."""
        def wrapper(*args, **kwargs):
            try:
                return primary_func(*args, **kwargs)
            except Exception as e:
                logging.warning(f"Primary function failed: {e}, using fallback")
                return fallback_func(*args, **kwargs)
        return wrapper
    
    @staticmethod
    def compensating_transaction(
        action_func: Callable,
        compensate_func: Callable
    ):
        """Execute compensating transaction on failure."""
        def wrapper(*args, **kwargs):
            try:
                result = action_func(*args, **kwargs)
                return result
            except Exception as e:
                logging.error(f"Action failed: {e}, executing compensation")
                try:
                    compensate_func(*args, **kwargs)
                except Exception as comp_error:
                    logging.error(f"Compensation also failed: {comp_error}")
                raise e
        return wrapper
    
    @staticmethod
    def checkpoint_recovery(
        func: Callable,
        checkpoint_manager: 'CheckpointManager'
    ):
        """Recover from last checkpoint on failure."""
        def wrapper(*args, **kwargs):
            checkpoint_id = f"{func.__name__}_{datetime.now().isoformat()}"
            
            try:
                # Try to restore from checkpoint
                checkpoint_data = checkpoint_manager.restore(checkpoint_id)
                if checkpoint_data:
                    logging.info(f"Restored from checkpoint: {checkpoint_id}")
                    kwargs.update(checkpoint_data)
                
                # Execute function
                result = func(*args, **kwargs)
                
                # Clear checkpoint on success
                checkpoint_manager.clear(checkpoint_id)
                
                return result
                
            except Exception as e:
                # Save checkpoint for recovery
                checkpoint_manager.save(checkpoint_id, kwargs)
                logging.error(f"Failed, checkpoint saved: {checkpoint_id}")
                raise e
        
        return wrapper

# ==================== Checkpoint Manager ====================

class CheckpointManager:
    """Manage checkpoints for recovery."""
    
    def __init__(self, storage_path: str = "/tmp/airflow_checkpoints"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.logger = logging.getLogger(__name__)
    
    def save(self, checkpoint_id: str, data: Any):
        """Save checkpoint data."""
        checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
        
        try:
            with open(checkpoint_file, 'w') as f:
                json.dump(data, f, default=str)
            self.logger.info(f"Checkpoint saved: {checkpoint_id}")
        except Exception as e:
            self.logger.error(f"Failed to save checkpoint: {e}")
    
    def restore(self, checkpoint_id: str) -> Optional[Any]:
        """Restore checkpoint data."""
        checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
        
        if not checkpoint_file.exists():
            return None
        
        try:
            with open(checkpoint_file, 'r') as f:
                data = json.load(f)
            self.logger.info(f"Checkpoint restored: {checkpoint_id}")
            return data
        except Exception as e:
            self.logger.error(f"Failed to restore checkpoint: {e}")
            return None
    
    def clear(self, checkpoint_id: str):
        """Clear checkpoint."""
        checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
        
        if checkpoint_file.exists():
            checkpoint_file.unlink()
            self.logger.info(f"Checkpoint cleared: {checkpoint_id}")

# ==================== Error Callbacks ====================

class ErrorCallbacks:
    """Airflow callback functions for error handling."""
    
    @staticmethod
    def on_failure_callback(context: Dict[str, Any]):
        """Callback for task failure."""
        task_instance = context['task_instance']
        exception = context.get('exception')
        
        # Create error context
        error_context = ErrorContext(
            error_type=ErrorClassifier.classify(exception) if exception else ErrorType.UNKNOWN,
            task_id=task_instance.task_id,
            dag_id=task_instance.dag_id,
            execution_date=context['execution_date'],
            attempt_number=task_instance.try_number,
            error_message=str(exception) if exception else 'Unknown error',
            stack_trace=traceback.format_exc(),
            metadata={
                'log_url': task_instance.log_url,
                'hostname': task_instance.hostname,
                'state': task_instance.state
            }
        )
        
        # Log error
        logging.error(f"Task failed: {error_context}")
        
        # Send alert
        ErrorCallbacks._send_failure_alert(error_context)
        
        # Store error for analysis
        ErrorCallbacks._store_error(error_context)
    
    @staticmethod
    def on_retry_callback(context: Dict[str, Any]):
        """Callback for task retry."""
        task_instance = context['task_instance']
        
        logging.info(f"Task {task_instance.task_id} retrying (attempt {task_instance.try_number})")
        
        # Check if we should continue retrying
        if task_instance.try_number >= 3:
            # Consider circuit breaking or changing strategy
            logging.warning(f"Task {task_instance.task_id} has failed {task_instance.try_number} times")
    
    @staticmethod
    def on_success_callback(context: Dict[str, Any]):
        """Callback for task success."""
        task_instance = context['task_instance']
        
        if task_instance.try_number > 1:
            logging.info(f"Task {task_instance.task_id} succeeded after {task_instance.try_number} attempts")
    
    @staticmethod
    def _send_failure_alert(error_context: ErrorContext):
        """Send failure alert."""
        try:
            # In production, integrate with alerting system
            alert_config = Variable.get('alert_config', deserialize_json=True, default_var={})
            
            if alert_config.get('enabled', False):
                # Send email alert
                subject = f"[Airflow Alert] Task Failed: {error_context.task_id}"
                body = f"""
                Task: {error_context.task_id}
                DAG: {error_context.dag_id}
                Error Type: {error_context.error_type.name}
                Message: {error_context.error_message}
                Attempt: {error_context.attempt_number}
                Time: {error_context.timestamp}
                
                Stack Trace:
                {error_context.stack_trace}
                """
                
                # Send via SMTP or notification service
                logging.info(f"Alert sent for {error_context.task_id}")
        
        except Exception as e:
            logging.error(f"Failed to send alert: {e}")
    
    @staticmethod
    def _store_error(error_context: ErrorContext):
        """Store error for analysis."""
        try:
            # Store in database or monitoring system
            # This is a simplified example
            error_log_path = Path('/tmp/airflow_errors')
            error_log_path.mkdir(exist_ok=True)
            
            error_file = error_log_path / f"{error_context.dag_id}_{error_context.task_id}_{error_context.timestamp.isoformat()}.json"
            
            with open(error_file, 'w') as f:
                json.dump({
                    'error_type': error_context.error_type.name,
                    'task_id': error_context.task_id,
                    'dag_id': error_context.dag_id,
                    'execution_date': error_context.execution_date.isoformat(),
                    'attempt_number': error_context.attempt_number,
                    'error_message': error_context.error_message,
                    'stack_trace': error_context.stack_trace,
                    'metadata': error_context.metadata,
                    'timestamp': error_context.timestamp.isoformat()
                }, f, indent=2)
        
        except Exception as e:
            logging.error(f"Failed to store error: {e}")

# ==================== DAG with Error Handling ====================

@dag(
    'resilient_pipeline',
    default_args={
        'owner': 'data_team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(hours=1),
        'on_failure_callback': ErrorCallbacks.on_failure_callback,
        'on_retry_callback': ErrorCallbacks.on_retry_callback,
        'on_success_callback': ErrorCallbacks.on_success_callback,
    },
    description='Resilient pipeline with comprehensive error handling',
    schedule_interval='@daily',
    start_date=days_ago(1),
    tags=['resilient', 'error_handling'],
    catchup=False,
)
def resilient_pipeline():
    """DAG demonstrating comprehensive error handling."""
    
    # Initialize managers
    circuit_manager = CircuitBreakerManager()
    checkpoint_manager = CheckpointManager()
    
    @task(retries=5, retry_delay=timedelta(seconds=30))
    @RetryStrategy.exponential_backoff(max_attempts=3)
    def extract_with_retry():
        """Extract data with retry logic."""
        import random
        
        # Simulate occasional failures
        if random.random() < 0.3:
            raise ConnectionError("Failed to connect to data source")
        
        return {'data': 'extracted', 'records': 1000}
    
    @task
    def validate_data(data: Dict[str, Any]):
        """Validate data with quality checks."""
        if not data.get('records'):
            raise ValueError("No records found in data")
        
        if data['records'] < 100:
            raise AirflowException("Insufficient data for processing")
        
        return {**data, 'validated': True}
    
    @task
    def process_with_circuit_breaker(data: Dict[str, Any]):
        """Process data with circuit breaker protection."""
        
        def api_call():
            # Simulate external API call
            import random
            if random.random() < 0.2:
                raise Exception("API service unavailable")
            return {'processed': True}
        
        # Use circuit breaker
        try:
            result = circuit_manager.call_with_breaker(
                'external_api',
                api_call
            )
            return {**data, **result}
        except AirflowException:
            # Circuit breaker is open, use fallback
            return {**data, 'processed': False, 'fallback': True}
    
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def error_handler(**context):
        """Handle any errors from upstream tasks."""
        task_instance = context['task_instance']
        
        # Check upstream task states
        upstream_states = {}
        for task_id in task_instance.task.upstream_task_ids:
            ti = TaskInstance(
                task=task_instance.task.dag.get_task(task_id),
                execution_date=task_instance.execution_date
            )
            upstream_states[task_id] = ti.current_state()
        
        failed_tasks = [
            task_id for task_id, state in upstream_states.items()
            if state in [State.FAILED, State.UPSTREAM_FAILED]
        ]
        
        if failed_tasks:
            logging.error(f"Failed tasks: {failed_tasks}")
            # Implement recovery logic
            return {'recovery_needed': True, 'failed_tasks': failed_tasks}
        
        return {'recovery_needed': False}
    
    @task
    def compensate_on_failure(error_info: Dict[str, Any]):
        """Execute compensating actions for failures."""
        if error_info.get('recovery_needed'):
            logging.info("Executing compensating transactions")
            
            # Rollback or cleanup actions
            for task_id in error_info.get('failed_tasks', []):
                logging.info(f"Compensating for {task_id}")
                # Implement specific compensation logic
            
            return {'compensated': True}
        
        return {'compensated': False}
    
    @task(retries=0)  # Don't retry cleanup
    def cleanup(trigger_rule=TriggerRule.ALL_DONE):
        """Cleanup resources regardless of pipeline status."""
        logging.info("Cleaning up resources")
        
        # Cleanup temporary files
        import shutil
        temp_dirs = ['/tmp/airflow_checkpoints', '/tmp/airflow_errors']
        
        for temp_dir in temp_dirs:
            if Path(temp_dir).exists():
                try:
                    # Keep last 7 days of data
                    cutoff_date = datetime.now() - timedelta(days=7)
                    for file in Path(temp_dir).iterdir():
                        if file.stat().st_mtime < cutoff_date.timestamp():
                            file.unlink()
                except Exception as e:
                    logging.warning(f"Cleanup failed for {temp_dir}: {e}")
        
        return "Cleanup completed"
    
    # Build pipeline
    extracted = extract_with_retry()
    validated = validate_data(extracted)
    processed = process_with_circuit_breaker(validated)
    
    # Error handling branch
    error_info = error_handler()
    compensation = compensate_on_failure(error_info)
    
    # Cleanup always runs
    clean = cleanup()
    
    # Dependencies
    processed >> error_info >> compensation >> clean

# Instantiate DAG
resilient_dag = resilient_pipeline()

# ==================== Error Monitoring and Analysis ====================

class ErrorMonitor:
    """Monitor and analyze errors across DAGs."""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.error_stats = {}
    
    def analyze_error_patterns(self, dag_id: str, days: int = 7) -> Dict[str, Any]:
        """Analyze error patterns for a DAG."""
        from airflow.models import TaskFail
        from collections import Counter
        
        # Get recent task failures
        cutoff_date = datetime.now() - timedelta(days=days)
        
        failures = TaskFail.dag_id == dag_id
        # Note: This is simplified - use proper Airflow DB queries in production
        
        # Analyze patterns
        error_types = Counter()
        task_failures = Counter()
        time_patterns = Counter()
        
        # Return analysis
        return {
            'total_failures': len(failures),
            'error_types': dict(error_types),
            'failed_tasks': dict(task_failures),
            'time_patterns': dict(time_patterns),
            'recommendations': self._generate_recommendations(error_types, task_failures)
        }
    
    def _generate_recommendations(
        self,
        error_types: Counter,
        task_failures: Counter
    ) -> List[str]:
        """Generate recommendations based on error patterns."""
        recommendations = []
        
        # Check for high network errors
        if error_types.get('NETWORK', 0) > 10:
            recommendations.append("High network errors - consider increasing timeouts or adding circuit breakers")
        
        # Check for recurring task failures
        for task, count in task_failures.items():
            if count > 5:
                recommendations.append(f"Task '{task}' failing frequently - review logic and add better error handling")
        
        return recommendations
    
    def create_error_dashboard(self) -> Dict[str, Any]:
        """Create error dashboard data."""
        return {
            'total_dags_with_errors': len(self.error_stats),
            'top_error_types': self._get_top_errors(),
            'recent_failures': self._get_recent_failures(),
            'recovery_success_rate': self._calculate_recovery_rate()
        }
    
    def _get_top_errors(self, limit: int = 10) -> List[Dict[str, Any]]:
        """Get top error types."""
        # Implementation would query error storage
        return []
    
    def _get_recent_failures(self, limit: int = 20) -> List[Dict[str, Any]]:
        """Get recent failures."""
        # Implementation would query error storage
        return []
    
    def _calculate_recovery_rate(self) -> float:
        """Calculate automatic recovery success rate."""
        # Implementation would analyze retry success
        return 0.0

# ==================== Testing Error Handling ====================

class ErrorHandlingTester:
    """Test error handling mechanisms."""
    
    @staticmethod
    def test_retry_strategy():
        """Test different retry strategies."""
        print("Testing retry strategies...")
        
        # Test exponential backoff
        @RetryStrategy.exponential_backoff(max_attempts=3, initial_delay=0.1)
        def flaky_function():
            import random
            if random.random() < 0.7:
                raise ConnectionError("Simulated failure")
            return "Success"
        
        try:
            result = flaky_function()
            print(f"Result: {result}")
        except Exception as e:
            print(f"Failed after retries: {e}")
    
    @staticmethod
    def test_circuit_breaker():
        """Test circuit breaker pattern."""
        print("Testing circuit breaker...")
        
        manager = CircuitBreakerManager()
        
        def unreliable_service():
            import random
            if random.random() < 0.8:
                raise Exception("Service error")
            return "Service response"
        
        # Test until circuit opens
        for i in range(10):
            try:
                result = manager.call_with_breaker(
                    'test_service',
                    unreliable_service
                )
                print(f"Call {i}: Success - {result}")
            except Exception as e:
                print(f"Call {i}: Failed - {e}")
            
            time.sleep(0.1)

# Example usage
if __name__ == "__main__":
    print("๐Ÿ›ก๏ธ Error Handling Examples\n")
    
    # Example 1: Error classification
    print("1๏ธโƒฃ Error Classification:")
    
    errors = [
        ConnectionError("Connection reset by peer"),
        ValueError("Invalid data format"),
        PermissionError("Access denied"),
        TimeoutError("Operation timed out"),
        KeyError("Missing configuration key")
    ]
    
    for error in errors:
        error_type = ErrorClassifier.classify(error)
        should_retry = ErrorClassifier.should_retry(error_type)
        print(f"   {error.__class__.__name__}: {error_type.name} (Retry: {should_retry})")
    
    # Example 2: Retry strategies
    print("\n2๏ธโƒฃ Retry Strategies:")
    
    strategies = [
        "Fixed retry - constant delay between attempts",
        "Exponential backoff - increasing delays",
        "Fibonacci backoff - Fibonacci sequence delays",
        "Adaptive retry - based on error history",
        "Circuit breaker - stop trying after threshold"
    ]
    
    for strategy in strategies:
        print(f"   โ€ข {strategy}")
    
    # Example 3: Error handling patterns
    print("\n3๏ธโƒฃ Error Handling Patterns:")
    
    patterns = [
        ("Graceful Degradation", "Fall back to simpler functionality"),
        ("Compensating Transaction", "Undo actions on failure"),
        ("Checkpoint Recovery", "Resume from last good state"),
        ("Circuit Breaker", "Prevent cascading failures"),
        ("Bulkhead", "Isolate failures to compartments"),
        ("Timeout", "Fail fast on slow operations")
    ]
    
    for pattern, description in patterns:
        print(f"   {pattern}: {description}")
    
    # Example 4: Trigger rules for error handling
    print("\n4๏ธโƒฃ Trigger Rules for Error Handling:")
    
    rules = [
        ("ALL_DONE", "Cleanup tasks that run regardless"),
        ("ONE_FAILED", "Error handlers that activate on failure"),
        ("NONE_FAILED_MIN_ONE_SUCCESS", "Success path convergence"),
        ("ALL_FAILED", "Disaster recovery tasks")
    ]
    
    for rule, usage in rules:
        print(f"   {rule}: {usage}")
    
    # Example 5: Error monitoring metrics
    print("\n5๏ธโƒฃ Error Monitoring Metrics:")
    
    metrics = [
        "Error rate by type",
        "Mean time to recovery (MTTR)",
        "Retry success rate",
        "Circuit breaker trip frequency",
        "Error distribution by time",
        "Task failure patterns",
        "Resource exhaustion events"
    ]
    
    for metric in metrics:
        print(f"   โ€ข {metric}")
    
    # Example 6: Best practices
    print("\n6๏ธโƒฃ Error Handling Best Practices:")
    
    practices = [
        "๐ŸŽฏ Classify errors to determine handling strategy",
        "๐Ÿ”„ Use appropriate retry strategies for error types",
        "โšก Implement circuit breakers for external services",
        "๐Ÿ’พ Create checkpoints for long-running tasks",
        "๐Ÿ“Š Monitor error patterns and trends",
        "๐Ÿ”” Set up intelligent alerting (not alert fatigue)",
        "๐Ÿ“ Log comprehensive error context",
        "๐Ÿงช Test error handling paths explicitly",
        "โ™ป๏ธ Implement compensating transactions where needed",
        "๐Ÿ›ก๏ธ Always have cleanup tasks with ALL_DONE"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 7: Common anti-patterns
    print("\n7๏ธโƒฃ Error Handling Anti-Patterns:")
    
    antipatterns = [
        ("Catching all exceptions", "Be specific about what you handle"),
        ("Infinite retries", "Always have a maximum retry limit"),
        ("Ignoring errors", "Log and monitor even handled errors"),
        ("No cleanup", "Always clean up resources"),
        ("Alert fatigue", "Alert on actionable issues only"),
        ("No error classification", "Different errors need different handling")
    ]
    
    for antipattern, better in antipatterns:
        print(f"   โŒ {antipattern}")
        print(f"      โœ… {better}")
    
    # Example 8: Test error handling
    print("\n8๏ธโƒฃ Testing Error Handling:")
    
    print("   Testing retry strategy...")
    ErrorHandlingTester.test_retry_strategy()
    
    print("\n   Testing circuit breaker...")
    # ErrorHandlingTester.test_circuit_breaker()  # Commented to avoid long output
    
    print("\nโœ… Error handling demonstration complete!")

# Path import fix
from pathlib import Path

Key Takeaways and Best Practices ๐ŸŽฏ

Error Handling Best Practices ๐Ÿ“‹

Pro Tip: Think of error handling as your workflow's safety net and immune system combined - it should catch problems, attempt recovery, and learn from failures. Always classify errors first - transient errors (network timeouts) should be retried with exponential backoff, while permanent errors (invalid configuration) should fail fast with clear diagnostics. Implement the circuit breaker pattern for external services to prevent cascading failures - after a threshold of failures, stop trying and fail fast. Use checkpoint recovery for long-running tasks so they can resume from the last good state rather than starting over. Design for graceful degradation - if the primary data source fails, can you use a cache or alternative source? Implement compensating transactions for critical operations that must be rolled back on failure. Set up comprehensive monitoring to detect patterns - if a task fails every Monday morning, there's likely a systemic issue. Use appropriate trigger rules: ALL_DONE for cleanup tasks that must always run, ONE_FAILED for error handlers, NONE_FAILED_MIN_ONE_SUCCESS for convergence points. Log rich context with errors - not just the error message but the full stack trace, input parameters, and system state. Test your error handling explicitly - inject failures and verify recovery. Remember: production systems fail in unexpected ways, so build defense in depth with multiple layers of error handling!

Mastering error handling transforms brittle workflows into resilient production systems. You can now classify and handle different error types appropriately, implement sophisticated retry strategies with backoff, use circuit breakers to prevent cascade failures, create checkpoints for recovery, and monitor error patterns for continuous improvement. Whether you're building critical data pipelines, ML training workflows, or business automation, these error handling skills ensure your systems stay resilient and self-healing! ๐Ÿš€