๐ก๏ธ Error Handling: Build Resilient Workflows That Never Fail
Robust error handling transforms fragile workflows into resilient production systems - it's the difference between pipelines that break at 3 AM requiring manual intervention and those that self-heal, retry intelligently, and gracefully degrade when necessary. Like building safety systems for a spacecraft, comprehensive error handling ensures your workflows can handle unexpected failures, recover automatically, and provide clear diagnostics when intervention is needed. Whether you're dealing with transient network issues, data quality problems, or system failures, mastering error handling is crucial for production-grade orchestration. Let's explore the comprehensive world of workflow error management! ๐จ
The Error Handling Architecture
Think of error handling as your workflow's immune system - it detects problems, attempts recovery, isolates failures, and maintains overall system health even when individual components fail. Using Airflow's sophisticated retry mechanisms, callbacks, trigger rules, and custom error handlers, you can build workflows that anticipate failures, recover gracefully, and provide comprehensive diagnostics. Understanding error types, retry strategies, and recovery patterns is essential for building reliable automation!
Real-World Scenario: The Fault-Tolerant Data Platform ๐๏ธ
You're building a fault-tolerant data platform that processes critical business data from unreliable sources, handles network failures and service outages gracefully, implements intelligent retry logic with backoff strategies, provides detailed error diagnostics and root cause analysis, maintains data consistency despite partial failures, implements circuit breakers for failing services, creates compensating transactions for rollback scenarios, and ensures zero data loss even during catastrophic failures. Your platform must handle thousands of potential failure points, provide clear visibility into error patterns, recover automatically when possible, and alert appropriately when manual intervention is required. Let's build a comprehensive error handling framework!
# Advanced Error Handling for Apache Airflow
# pip install apache-airflow tenacity requests circuitbreaker prometheus-client
import os
import json
import logging
import traceback
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union, Type
from dataclasses import dataclass, field
from enum import Enum, auto
from functools import wraps
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Airflow imports
from airflow import DAG, AirflowException
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable, TaskInstance, DagRun
from airflow.utils.state import State
from airflow.hooks.base import BaseHook
# Retry and resilience libraries
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
after_log
)
import circuitbreaker
# ==================== Error Types ====================
class ErrorType(Enum):
"""Classification of error types."""
TRANSIENT = auto() # Temporary, likely to succeed on retry
PERMANENT = auto() # Permanent failure, won't succeed on retry
DATA_QUALITY = auto() # Data validation/quality issues
CONFIGURATION = auto() # Configuration/setup errors
RESOURCE = auto() # Resource availability issues
NETWORK = auto() # Network/connectivity issues
AUTHENTICATION = auto() # Auth/permission issues
UNKNOWN = auto() # Unknown error type
@dataclass
class ErrorContext:
"""Context information for errors."""
error_type: ErrorType
task_id: str
dag_id: str
execution_date: datetime
attempt_number: int
error_message: str
stack_trace: str
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
# ==================== Error Classification ====================
class ErrorClassifier:
"""Classify errors to determine handling strategy."""
# Error patterns for classification
TRANSIENT_PATTERNS = [
'timeout', 'connection reset', 'temporary', 'unavailable',
'too many requests', 'rate limit', 'throttled'
]
PERMANENT_PATTERNS = [
'not found', 'invalid', 'unauthorized', 'forbidden',
'bad request', 'malformed', 'unsupported'
]
NETWORK_PATTERNS = [
'connection', 'network', 'socket', 'dns', 'resolution',
'refused', 'unreachable'
]
DATA_PATTERNS = [
'validation', 'schema', 'constraint', 'duplicate',
'integrity', 'null value', 'data type'
]
@classmethod
def classify(cls, exception: Exception) -> ErrorType:
"""Classify an exception into an error type."""
error_str = str(exception).lower()
# Check for specific exception types
if isinstance(exception, (ConnectionError, TimeoutError)):
return ErrorType.NETWORK
elif isinstance(exception, PermissionError):
return ErrorType.AUTHENTICATION
elif isinstance(exception, ValueError):
return ErrorType.DATA_QUALITY
elif isinstance(exception, KeyError):
return ErrorType.CONFIGURATION
# Pattern matching
for pattern in cls.TRANSIENT_PATTERNS:
if pattern in error_str:
return ErrorType.TRANSIENT
for pattern in cls.PERMANENT_PATTERNS:
if pattern in error_str:
return ErrorType.PERMANENT
for pattern in cls.NETWORK_PATTERNS:
if pattern in error_str:
return ErrorType.NETWORK
for pattern in cls.DATA_PATTERNS:
if pattern in error_str:
return ErrorType.DATA_QUALITY
return ErrorType.UNKNOWN
@classmethod
def should_retry(cls, error_type: ErrorType) -> bool:
"""Determine if error type should be retried."""
retryable_types = [
ErrorType.TRANSIENT,
ErrorType.NETWORK,
ErrorType.RESOURCE,
ErrorType.UNKNOWN # Retry unknown errors cautiously
]
return error_type in retryable_types
# ==================== Retry Strategies ====================
class RetryStrategy:
"""Advanced retry strategies."""
@staticmethod
def fixed_retry(max_attempts: int = 3, delay: float = 1.0):
"""Fixed delay retry strategy."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
logging.warning(f"Attempt {attempt + 1} failed: {e}")
time.sleep(delay)
return None
return wrapper
return decorator
@staticmethod
def exponential_backoff(
max_attempts: int = 5,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""Exponential backoff retry strategy."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
# Classify error
error_type = ErrorClassifier.classify(e)
if not ErrorClassifier.should_retry(error_type):
logging.error(f"Non-retryable error: {e}")
raise
logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
time.sleep(delay)
# Increase delay exponentially
delay = min(delay * exponential_base, max_delay)
return None
return wrapper
return decorator
@staticmethod
def fibonacci_backoff(max_attempts: int = 7):
"""Fibonacci sequence backoff strategy."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
fib_prev, fib_curr = 0, 1
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = fib_curr
logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
time.sleep(delay)
# Next Fibonacci number
fib_prev, fib_curr = fib_curr, fib_prev + fib_curr
return None
return wrapper
return decorator
@staticmethod
def adaptive_retry(context: Dict[str, Any]):
"""Adaptive retry based on error history."""
error_history = context.get('error_history', [])
# Analyze error pattern
if len(error_history) > 5:
# Too many errors, use longer delays
return RetryStrategy.exponential_backoff(max_attempts=3, initial_delay=10)
elif any(e.error_type == ErrorType.NETWORK for e in error_history):
# Network issues, use exponential backoff
return RetryStrategy.exponential_backoff(max_attempts=5)
else:
# Default to fixed retry
return RetryStrategy.fixed_retry(max_attempts=3)
# ==================== Circuit Breaker Pattern ====================
class CircuitBreakerManager:
"""Manage circuit breakers for external services."""
def __init__(self):
self.breakers = {}
self.logger = logging.getLogger(__name__)
def get_breaker(
self,
service_name: str,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
) -> circuitbreaker.CircuitBreaker:
"""Get or create circuit breaker for service."""
if service_name not in self.breakers:
self.breakers[service_name] = circuitbreaker.CircuitBreaker(
failure_threshold=failure_threshold,
recovery_timeout=recovery_timeout,
expected_exception=Exception
)
return self.breakers[service_name]
def call_with_breaker(
self,
service_name: str,
func: Callable,
*args,
**kwargs
) -> Any:
"""Call function with circuit breaker protection."""
breaker = self.get_breaker(service_name)
try:
with breaker:
return func(*args, **kwargs)
except circuitbreaker.CircuitBreakerError:
self.logger.error(f"Circuit breaker open for {service_name}")
raise AirflowException(f"Service {service_name} is currently unavailable")
except Exception as e:
self.logger.error(f"Error calling {service_name}: {e}")
raise
# ==================== Error Recovery Patterns ====================
class ErrorRecovery:
"""Error recovery patterns and strategies."""
@staticmethod
def graceful_degradation(primary_func: Callable, fallback_func: Callable):
"""Gracefully degrade to fallback function on error."""
def wrapper(*args, **kwargs):
try:
return primary_func(*args, **kwargs)
except Exception as e:
logging.warning(f"Primary function failed: {e}, using fallback")
return fallback_func(*args, **kwargs)
return wrapper
@staticmethod
def compensating_transaction(
action_func: Callable,
compensate_func: Callable
):
"""Execute compensating transaction on failure."""
def wrapper(*args, **kwargs):
try:
result = action_func(*args, **kwargs)
return result
except Exception as e:
logging.error(f"Action failed: {e}, executing compensation")
try:
compensate_func(*args, **kwargs)
except Exception as comp_error:
logging.error(f"Compensation also failed: {comp_error}")
raise e
return wrapper
@staticmethod
def checkpoint_recovery(
func: Callable,
checkpoint_manager: 'CheckpointManager'
):
"""Recover from last checkpoint on failure."""
def wrapper(*args, **kwargs):
checkpoint_id = f"{func.__name__}_{datetime.now().isoformat()}"
try:
# Try to restore from checkpoint
checkpoint_data = checkpoint_manager.restore(checkpoint_id)
if checkpoint_data:
logging.info(f"Restored from checkpoint: {checkpoint_id}")
kwargs.update(checkpoint_data)
# Execute function
result = func(*args, **kwargs)
# Clear checkpoint on success
checkpoint_manager.clear(checkpoint_id)
return result
except Exception as e:
# Save checkpoint for recovery
checkpoint_manager.save(checkpoint_id, kwargs)
logging.error(f"Failed, checkpoint saved: {checkpoint_id}")
raise e
return wrapper
# ==================== Checkpoint Manager ====================
class CheckpointManager:
"""Manage checkpoints for recovery."""
def __init__(self, storage_path: str = "/tmp/airflow_checkpoints"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
self.logger = logging.getLogger(__name__)
def save(self, checkpoint_id: str, data: Any):
"""Save checkpoint data."""
checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
try:
with open(checkpoint_file, 'w') as f:
json.dump(data, f, default=str)
self.logger.info(f"Checkpoint saved: {checkpoint_id}")
except Exception as e:
self.logger.error(f"Failed to save checkpoint: {e}")
def restore(self, checkpoint_id: str) -> Optional[Any]:
"""Restore checkpoint data."""
checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
if not checkpoint_file.exists():
return None
try:
with open(checkpoint_file, 'r') as f:
data = json.load(f)
self.logger.info(f"Checkpoint restored: {checkpoint_id}")
return data
except Exception as e:
self.logger.error(f"Failed to restore checkpoint: {e}")
return None
def clear(self, checkpoint_id: str):
"""Clear checkpoint."""
checkpoint_file = self.storage_path / f"{checkpoint_id}.json"
if checkpoint_file.exists():
checkpoint_file.unlink()
self.logger.info(f"Checkpoint cleared: {checkpoint_id}")
# ==================== Error Callbacks ====================
class ErrorCallbacks:
"""Airflow callback functions for error handling."""
@staticmethod
def on_failure_callback(context: Dict[str, Any]):
"""Callback for task failure."""
task_instance = context['task_instance']
exception = context.get('exception')
# Create error context
error_context = ErrorContext(
error_type=ErrorClassifier.classify(exception) if exception else ErrorType.UNKNOWN,
task_id=task_instance.task_id,
dag_id=task_instance.dag_id,
execution_date=context['execution_date'],
attempt_number=task_instance.try_number,
error_message=str(exception) if exception else 'Unknown error',
stack_trace=traceback.format_exc(),
metadata={
'log_url': task_instance.log_url,
'hostname': task_instance.hostname,
'state': task_instance.state
}
)
# Log error
logging.error(f"Task failed: {error_context}")
# Send alert
ErrorCallbacks._send_failure_alert(error_context)
# Store error for analysis
ErrorCallbacks._store_error(error_context)
@staticmethod
def on_retry_callback(context: Dict[str, Any]):
"""Callback for task retry."""
task_instance = context['task_instance']
logging.info(f"Task {task_instance.task_id} retrying (attempt {task_instance.try_number})")
# Check if we should continue retrying
if task_instance.try_number >= 3:
# Consider circuit breaking or changing strategy
logging.warning(f"Task {task_instance.task_id} has failed {task_instance.try_number} times")
@staticmethod
def on_success_callback(context: Dict[str, Any]):
"""Callback for task success."""
task_instance = context['task_instance']
if task_instance.try_number > 1:
logging.info(f"Task {task_instance.task_id} succeeded after {task_instance.try_number} attempts")
@staticmethod
def _send_failure_alert(error_context: ErrorContext):
"""Send failure alert."""
try:
# In production, integrate with alerting system
alert_config = Variable.get('alert_config', deserialize_json=True, default_var={})
if alert_config.get('enabled', False):
# Send email alert
subject = f"[Airflow Alert] Task Failed: {error_context.task_id}"
body = f"""
Task: {error_context.task_id}
DAG: {error_context.dag_id}
Error Type: {error_context.error_type.name}
Message: {error_context.error_message}
Attempt: {error_context.attempt_number}
Time: {error_context.timestamp}
Stack Trace:
{error_context.stack_trace}
"""
# Send via SMTP or notification service
logging.info(f"Alert sent for {error_context.task_id}")
except Exception as e:
logging.error(f"Failed to send alert: {e}")
@staticmethod
def _store_error(error_context: ErrorContext):
"""Store error for analysis."""
try:
# Store in database or monitoring system
# This is a simplified example
error_log_path = Path('/tmp/airflow_errors')
error_log_path.mkdir(exist_ok=True)
error_file = error_log_path / f"{error_context.dag_id}_{error_context.task_id}_{error_context.timestamp.isoformat()}.json"
with open(error_file, 'w') as f:
json.dump({
'error_type': error_context.error_type.name,
'task_id': error_context.task_id,
'dag_id': error_context.dag_id,
'execution_date': error_context.execution_date.isoformat(),
'attempt_number': error_context.attempt_number,
'error_message': error_context.error_message,
'stack_trace': error_context.stack_trace,
'metadata': error_context.metadata,
'timestamp': error_context.timestamp.isoformat()
}, f, indent=2)
except Exception as e:
logging.error(f"Failed to store error: {e}")
# ==================== DAG with Error Handling ====================
@dag(
'resilient_pipeline',
default_args={
'owner': 'data_team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'on_failure_callback': ErrorCallbacks.on_failure_callback,
'on_retry_callback': ErrorCallbacks.on_retry_callback,
'on_success_callback': ErrorCallbacks.on_success_callback,
},
description='Resilient pipeline with comprehensive error handling',
schedule_interval='@daily',
start_date=days_ago(1),
tags=['resilient', 'error_handling'],
catchup=False,
)
def resilient_pipeline():
"""DAG demonstrating comprehensive error handling."""
# Initialize managers
circuit_manager = CircuitBreakerManager()
checkpoint_manager = CheckpointManager()
@task(retries=5, retry_delay=timedelta(seconds=30))
@RetryStrategy.exponential_backoff(max_attempts=3)
def extract_with_retry():
"""Extract data with retry logic."""
import random
# Simulate occasional failures
if random.random() < 0.3:
raise ConnectionError("Failed to connect to data source")
return {'data': 'extracted', 'records': 1000}
@task
def validate_data(data: Dict[str, Any]):
"""Validate data with quality checks."""
if not data.get('records'):
raise ValueError("No records found in data")
if data['records'] < 100:
raise AirflowException("Insufficient data for processing")
return {**data, 'validated': True}
@task
def process_with_circuit_breaker(data: Dict[str, Any]):
"""Process data with circuit breaker protection."""
def api_call():
# Simulate external API call
import random
if random.random() < 0.2:
raise Exception("API service unavailable")
return {'processed': True}
# Use circuit breaker
try:
result = circuit_manager.call_with_breaker(
'external_api',
api_call
)
return {**data, **result}
except AirflowException:
# Circuit breaker is open, use fallback
return {**data, 'processed': False, 'fallback': True}
@task(trigger_rule=TriggerRule.ALL_DONE)
def error_handler(**context):
"""Handle any errors from upstream tasks."""
task_instance = context['task_instance']
# Check upstream task states
upstream_states = {}
for task_id in task_instance.task.upstream_task_ids:
ti = TaskInstance(
task=task_instance.task.dag.get_task(task_id),
execution_date=task_instance.execution_date
)
upstream_states[task_id] = ti.current_state()
failed_tasks = [
task_id for task_id, state in upstream_states.items()
if state in [State.FAILED, State.UPSTREAM_FAILED]
]
if failed_tasks:
logging.error(f"Failed tasks: {failed_tasks}")
# Implement recovery logic
return {'recovery_needed': True, 'failed_tasks': failed_tasks}
return {'recovery_needed': False}
@task
def compensate_on_failure(error_info: Dict[str, Any]):
"""Execute compensating actions for failures."""
if error_info.get('recovery_needed'):
logging.info("Executing compensating transactions")
# Rollback or cleanup actions
for task_id in error_info.get('failed_tasks', []):
logging.info(f"Compensating for {task_id}")
# Implement specific compensation logic
return {'compensated': True}
return {'compensated': False}
@task(retries=0) # Don't retry cleanup
def cleanup(trigger_rule=TriggerRule.ALL_DONE):
"""Cleanup resources regardless of pipeline status."""
logging.info("Cleaning up resources")
# Cleanup temporary files
import shutil
temp_dirs = ['/tmp/airflow_checkpoints', '/tmp/airflow_errors']
for temp_dir in temp_dirs:
if Path(temp_dir).exists():
try:
# Keep last 7 days of data
cutoff_date = datetime.now() - timedelta(days=7)
for file in Path(temp_dir).iterdir():
if file.stat().st_mtime < cutoff_date.timestamp():
file.unlink()
except Exception as e:
logging.warning(f"Cleanup failed for {temp_dir}: {e}")
return "Cleanup completed"
# Build pipeline
extracted = extract_with_retry()
validated = validate_data(extracted)
processed = process_with_circuit_breaker(validated)
# Error handling branch
error_info = error_handler()
compensation = compensate_on_failure(error_info)
# Cleanup always runs
clean = cleanup()
# Dependencies
processed >> error_info >> compensation >> clean
# Instantiate DAG
resilient_dag = resilient_pipeline()
# ==================== Error Monitoring and Analysis ====================
class ErrorMonitor:
"""Monitor and analyze errors across DAGs."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_stats = {}
def analyze_error_patterns(self, dag_id: str, days: int = 7) -> Dict[str, Any]:
"""Analyze error patterns for a DAG."""
from airflow.models import TaskFail
from collections import Counter
# Get recent task failures
cutoff_date = datetime.now() - timedelta(days=days)
failures = TaskFail.dag_id == dag_id
# Note: This is simplified - use proper Airflow DB queries in production
# Analyze patterns
error_types = Counter()
task_failures = Counter()
time_patterns = Counter()
# Return analysis
return {
'total_failures': len(failures),
'error_types': dict(error_types),
'failed_tasks': dict(task_failures),
'time_patterns': dict(time_patterns),
'recommendations': self._generate_recommendations(error_types, task_failures)
}
def _generate_recommendations(
self,
error_types: Counter,
task_failures: Counter
) -> List[str]:
"""Generate recommendations based on error patterns."""
recommendations = []
# Check for high network errors
if error_types.get('NETWORK', 0) > 10:
recommendations.append("High network errors - consider increasing timeouts or adding circuit breakers")
# Check for recurring task failures
for task, count in task_failures.items():
if count > 5:
recommendations.append(f"Task '{task}' failing frequently - review logic and add better error handling")
return recommendations
def create_error_dashboard(self) -> Dict[str, Any]:
"""Create error dashboard data."""
return {
'total_dags_with_errors': len(self.error_stats),
'top_error_types': self._get_top_errors(),
'recent_failures': self._get_recent_failures(),
'recovery_success_rate': self._calculate_recovery_rate()
}
def _get_top_errors(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top error types."""
# Implementation would query error storage
return []
def _get_recent_failures(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get recent failures."""
# Implementation would query error storage
return []
def _calculate_recovery_rate(self) -> float:
"""Calculate automatic recovery success rate."""
# Implementation would analyze retry success
return 0.0
# ==================== Testing Error Handling ====================
class ErrorHandlingTester:
"""Test error handling mechanisms."""
@staticmethod
def test_retry_strategy():
"""Test different retry strategies."""
print("Testing retry strategies...")
# Test exponential backoff
@RetryStrategy.exponential_backoff(max_attempts=3, initial_delay=0.1)
def flaky_function():
import random
if random.random() < 0.7:
raise ConnectionError("Simulated failure")
return "Success"
try:
result = flaky_function()
print(f"Result: {result}")
except Exception as e:
print(f"Failed after retries: {e}")
@staticmethod
def test_circuit_breaker():
"""Test circuit breaker pattern."""
print("Testing circuit breaker...")
manager = CircuitBreakerManager()
def unreliable_service():
import random
if random.random() < 0.8:
raise Exception("Service error")
return "Service response"
# Test until circuit opens
for i in range(10):
try:
result = manager.call_with_breaker(
'test_service',
unreliable_service
)
print(f"Call {i}: Success - {result}")
except Exception as e:
print(f"Call {i}: Failed - {e}")
time.sleep(0.1)
# Example usage
if __name__ == "__main__":
print("๐ก๏ธ Error Handling Examples\n")
# Example 1: Error classification
print("1๏ธโฃ Error Classification:")
errors = [
ConnectionError("Connection reset by peer"),
ValueError("Invalid data format"),
PermissionError("Access denied"),
TimeoutError("Operation timed out"),
KeyError("Missing configuration key")
]
for error in errors:
error_type = ErrorClassifier.classify(error)
should_retry = ErrorClassifier.should_retry(error_type)
print(f" {error.__class__.__name__}: {error_type.name} (Retry: {should_retry})")
# Example 2: Retry strategies
print("\n2๏ธโฃ Retry Strategies:")
strategies = [
"Fixed retry - constant delay between attempts",
"Exponential backoff - increasing delays",
"Fibonacci backoff - Fibonacci sequence delays",
"Adaptive retry - based on error history",
"Circuit breaker - stop trying after threshold"
]
for strategy in strategies:
print(f" โข {strategy}")
# Example 3: Error handling patterns
print("\n3๏ธโฃ Error Handling Patterns:")
patterns = [
("Graceful Degradation", "Fall back to simpler functionality"),
("Compensating Transaction", "Undo actions on failure"),
("Checkpoint Recovery", "Resume from last good state"),
("Circuit Breaker", "Prevent cascading failures"),
("Bulkhead", "Isolate failures to compartments"),
("Timeout", "Fail fast on slow operations")
]
for pattern, description in patterns:
print(f" {pattern}: {description}")
# Example 4: Trigger rules for error handling
print("\n4๏ธโฃ Trigger Rules for Error Handling:")
rules = [
("ALL_DONE", "Cleanup tasks that run regardless"),
("ONE_FAILED", "Error handlers that activate on failure"),
("NONE_FAILED_MIN_ONE_SUCCESS", "Success path convergence"),
("ALL_FAILED", "Disaster recovery tasks")
]
for rule, usage in rules:
print(f" {rule}: {usage}")
# Example 5: Error monitoring metrics
print("\n5๏ธโฃ Error Monitoring Metrics:")
metrics = [
"Error rate by type",
"Mean time to recovery (MTTR)",
"Retry success rate",
"Circuit breaker trip frequency",
"Error distribution by time",
"Task failure patterns",
"Resource exhaustion events"
]
for metric in metrics:
print(f" โข {metric}")
# Example 6: Best practices
print("\n6๏ธโฃ Error Handling Best Practices:")
practices = [
"๐ฏ Classify errors to determine handling strategy",
"๐ Use appropriate retry strategies for error types",
"โก Implement circuit breakers for external services",
"๐พ Create checkpoints for long-running tasks",
"๐ Monitor error patterns and trends",
"๐ Set up intelligent alerting (not alert fatigue)",
"๐ Log comprehensive error context",
"๐งช Test error handling paths explicitly",
"โป๏ธ Implement compensating transactions where needed",
"๐ก๏ธ Always have cleanup tasks with ALL_DONE"
]
for practice in practices:
print(f" {practice}")
# Example 7: Common anti-patterns
print("\n7๏ธโฃ Error Handling Anti-Patterns:")
antipatterns = [
("Catching all exceptions", "Be specific about what you handle"),
("Infinite retries", "Always have a maximum retry limit"),
("Ignoring errors", "Log and monitor even handled errors"),
("No cleanup", "Always clean up resources"),
("Alert fatigue", "Alert on actionable issues only"),
("No error classification", "Different errors need different handling")
]
for antipattern, better in antipatterns:
print(f" โ {antipattern}")
print(f" โ
{better}")
# Example 8: Test error handling
print("\n8๏ธโฃ Testing Error Handling:")
print(" Testing retry strategy...")
ErrorHandlingTester.test_retry_strategy()
print("\n Testing circuit breaker...")
# ErrorHandlingTester.test_circuit_breaker() # Commented to avoid long output
print("\nโ
Error handling demonstration complete!")
# Path import fix
from pathlib import Path
Key Takeaways and Best Practices ๐ฏ
- Error Classification: Categorize errors to determine appropriate handling strategies.
- Intelligent Retries: Use exponential backoff and respect retry limits.
- Circuit Breakers: Prevent cascading failures with circuit breaker pattern.
- Graceful Degradation: Provide fallback functionality when primary fails.
- Comprehensive Logging: Log detailed context for debugging and analysis.
- Cleanup Tasks: Always clean up resources with ALL_DONE trigger rule.
- Error Monitoring: Track patterns and trends for proactive improvement.
- Testing: Explicitly test error handling paths and recovery mechanisms.
Error Handling Best Practices ๐
Mastering error handling transforms brittle workflows into resilient production systems. You can now classify and handle different error types appropriately, implement sophisticated retry strategies with backoff, use circuit breakers to prevent cascade failures, create checkpoints for recovery, and monitor error patterns for continuous improvement. Whether you're building critical data pipelines, ML training workflows, or business automation, these error handling skills ensure your systems stay resilient and self-healing! ๐
Pro Tip: Think of error handling as your workflow's safety net and immune system combined - it should catch problems, attempt recovery, and learn from failures. Always classify errors first - transient errors (network timeouts) should be retried with exponential backoff, while permanent errors (invalid configuration) should fail fast with clear diagnostics. Implement the circuit breaker pattern for external services to prevent cascading failures - after a threshold of failures, stop trying and fail fast. Use checkpoint recovery for long-running tasks so they can resume from the last good state rather than starting over. Design for graceful degradation - if the primary data source fails, can you use a cache or alternative source? Implement compensating transactions for critical operations that must be rolled back on failure. Set up comprehensive monitoring to detect patterns - if a task fails every Monday morning, there's likely a systemic issue. Use appropriate trigger rules: ALL_DONE for cleanup tasks that must always run, ONE_FAILED for error handlers, NONE_FAILED_MIN_ONE_SUCCESS for convergence points. Log rich context with errors - not just the error message but the full stack trace, input parameters, and system state. Test your error handling explicitly - inject failures and verify recovery. Remember: production systems fail in unexpected ways, so build defense in depth with multiple layers of error handling!