Skip to main content

šŸš€ Apache Airflow Basics: Orchestrate Complex Workflows at Scale

Apache Airflow is a powerful platform for programmatically authoring, scheduling, and monitoring workflows - it transforms complex task dependencies into manageable, visual pipelines that can scale from simple scripts to enterprise data operations. Like having an intelligent conductor for your data orchestra, Airflow ensures tasks run in the right order, at the right time, with automatic retry logic and comprehensive monitoring. Whether you're building ETL pipelines, orchestrating machine learning workflows, or automating business processes, Airflow provides the foundation for reliable, scalable task orchestration. Let's explore the fundamentals of workflow automation with Apache Airflow! šŸŽÆ

The Apache Airflow Architecture

Think of Airflow as a sophisticated workflow management system - it uses Directed Acyclic Graphs (DAGs) to define task dependencies, executors to run tasks in parallel, schedulers to trigger workflows, and a rich UI for monitoring and troubleshooting. Built on Python, Airflow treats workflows as code, enabling version control, testing, and dynamic pipeline generation. Understanding DAGs, operators, tasks, and scheduling is essential for mastering workflow orchestration!

graph TB A[Apache Airflow] --> B[Core Components] A --> C[DAG Structure] A --> D[Operators] A --> E[Execution] B --> F[Scheduler] B --> G[Executor] B --> H[Metadata DB] B --> I[Web Server] C --> J[Tasks] C --> K[Dependencies] C --> L[Schedule] C --> M[Parameters] D --> N[BashOperator] D --> O[PythonOperator] D --> P[Sensors] D --> Q[Hooks] E --> R[Sequential] E --> S[Parallel] E --> T[Branching] E --> U[Dynamic] V[Use Cases] --> W[ETL/ELT] V --> X[ML Pipelines] V --> Y[Data Processing] V --> Z[DevOps] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Data Pipeline Platform šŸ“Š

You're building a comprehensive data pipeline platform that processes daily sales data from multiple sources, performs data quality checks and transformations, trains machine learning models on clean data, generates reports and dashboards, sends notifications on completion or failure, handles dependencies between complex tasks, retries failed operations automatically, and scales to handle varying workloads. Your system must support scheduling flexibility, provide clear visibility into pipeline status, handle errors gracefully, and maintain audit trails. Let's build a robust Airflow orchestration system!

# First, install Apache Airflow:
# pip install apache-airflow==2.7.0
# pip install apache-airflow-providers-http
# pip install apache-airflow-providers-postgres
# pip install pandas numpy requests

# Initialize Airflow database:
# airflow db init
# Create a user:
# airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com
# Start webserver: airflow webserver --port 8080
# Start scheduler: airflow scheduler

import os
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from pathlib import Path
import pandas as pd
import numpy as np

# Airflow imports
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowException, AirflowSkipException

# ==================== Configuration ====================

# Default arguments for all tasks
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['admin@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'max_active_runs': 1,
    'catchup': False,  # Don't backfill missing runs
}

# ==================== Helper Functions ====================

def create_logger(name: str) -> logging.Logger:
    """Create a logger for the DAG."""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # Create console handler
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    
    # Add handler to logger
    if not logger.handlers:
        logger.addHandler(handler)
    
    return logger

# ==================== Basic DAG Example ====================

# Create a basic DAG
basic_dag = DAG(
    'basic_workflow',
    default_args=default_args,
    description='A basic Airflow workflow example',
    schedule_interval='@daily',  # Run daily
    tags=['example', 'basic'],
)

# Define tasks
def print_hello():
    """Simple Python task."""
    print("Hello from Airflow!")
    return "Hello completed"

def process_data(**context):
    """Process data with context."""
    # Access context variables
    execution_date = context['execution_date']
    dag_run = context['dag_run']
    
    print(f"Processing data for date: {execution_date}")
    print(f"DAG run ID: {dag_run.run_id}")
    
    # Simulate data processing
    data = {'date': str(execution_date), 'status': 'processed'}
    
    # Push data to XCom for downstream tasks
    context['task_instance'].xcom_push(key='processed_data', value=data)
    
    return "Processing completed"

def save_results(**context):
    """Save results from upstream task."""
    # Pull data from XCom
    task_instance = context['task_instance']
    processed_data = task_instance.xcom_pull(
        task_ids='process_task',
        key='processed_data'
    )
    
    print(f"Saving results: {processed_data}")
    
    # Simulate saving to database
    return "Results saved"

# Create task instances
with basic_dag:
    hello_task = PythonOperator(
        task_id='hello_task',
        python_callable=print_hello,
    )
    
    process_task = PythonOperator(
        task_id='process_task',
        python_callable=process_data,
        provide_context=True,
    )
    
    save_task = PythonOperator(
        task_id='save_task',
        python_callable=save_results,
        provide_context=True,
    )
    
    # Define dependencies
    hello_task >> process_task >> save_task

# ==================== Advanced DAG with Task Groups ====================

@dag(
    'advanced_data_pipeline',
    default_args=default_args,
    description='Advanced data pipeline with task groups',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    tags=['advanced', 'data_pipeline'],
)
def advanced_data_pipeline():
    """Advanced DAG using TaskFlow API."""
    
    @task
    def extract_data(source: str) -> Dict[str, Any]:
        """Extract data from source."""
        print(f"Extracting data from {source}")
        
        # Simulate data extraction
        data = {
            'source': source,
            'records': np.random.randint(1000, 10000),
            'timestamp': datetime.now().isoformat()
        }
        
        return data
    
    @task
    def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate extracted data."""
        print(f"Validating {data['records']} records from {data['source']}")
        
        # Simulate validation
        data['validation'] = {
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        if data['records'] < 5000:
            data['validation']['warnings'].append('Low record count')
        
        return data
    
    @task
    def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform validated data."""
        print(f"Transforming data from {data['source']}")
        
        # Simulate transformation
        data['transformed'] = {
            'records_processed': data['records'],
            'transformations_applied': ['normalize', 'deduplicate', 'enrich']
        }
        
        return data
    
    @task
    def load_to_warehouse(data: Dict[str, Any], table: str) -> str:
        """Load data to data warehouse."""
        print(f"Loading {data['transformed']['records_processed']} records to {table}")
        
        # Simulate loading to warehouse
        return f"Loaded to {table} successfully"
    
    @task
    def send_notification(results: List[str]):
        """Send completion notification."""
        print("Sending notification...")
        for result in results:
            print(f"  - {result}")
        
        # Simulate sending email/slack notification
        return "Notification sent"
    
    # Define task groups for different data sources
    with TaskGroup('sales_pipeline') as sales_group:
        sales_data = extract_data('sales_database')
        validated_sales = validate_data(sales_data)
        transformed_sales = transform_data(validated_sales)
        loaded_sales = load_to_warehouse(transformed_sales, 'fact_sales')
    
    with TaskGroup('inventory_pipeline') as inventory_group:
        inventory_data = extract_data('inventory_system')
        validated_inventory = validate_data(inventory_data)
        transformed_inventory = transform_data(validated_inventory)
        loaded_inventory = load_to_warehouse(transformed_inventory, 'fact_inventory')
    
    with TaskGroup('customer_pipeline') as customer_group:
        customer_data = extract_data('crm_system')
        validated_customer = validate_data(customer_data)
        transformed_customer = transform_data(validated_customer)
        loaded_customer = load_to_warehouse(transformed_customer, 'dim_customer')
    
    # Combine results and send notification
    notification = send_notification([loaded_sales, loaded_inventory, loaded_customer])
    
    # Dependencies are automatically handled by TaskFlow API
    
# Instantiate the DAG
advanced_dag = advanced_data_pipeline()

# ==================== Dynamic DAG Generation ====================

def create_dynamic_dag(
    dag_id: str,
    sources: List[str],
    schedule: str = '@daily'
) -> DAG:
    """Dynamically create a DAG based on configuration."""
    
    dag = DAG(
        dag_id,
        default_args=default_args,
        description=f'Dynamic DAG for {", ".join(sources)}',
        schedule_interval=schedule,
        tags=['dynamic'],
    )
    
    with dag:
        start = DummyOperator(task_id='start')
        end = DummyOperator(task_id='end')
        
        for source in sources:
            # Create tasks for each source
            extract = PythonOperator(
                task_id=f'extract_{source}',
                python_callable=lambda s=source: print(f"Extracting from {s}"),
            )
            
            process = PythonOperator(
                task_id=f'process_{source}',
                python_callable=lambda s=source: print(f"Processing {s}"),
            )
            
            # Define dependencies
            start >> extract >> process >> end
    
    return dag

# Create dynamic DAGs from configuration
data_sources = ['api', 'database', 'files']
dynamic_dag = create_dynamic_dag('dynamic_pipeline', data_sources)

# ==================== Sensor Example ====================

sensor_dag = DAG(
    'sensor_workflow',
    default_args=default_args,
    description='DAG with sensors',
    schedule_interval=None,  # Triggered manually or by other DAGs
    tags=['sensors'],
)

with sensor_dag:
    # Wait for file to appear
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/tmp/data/input.csv',
        fs_conn_id='fs_default',
        poke_interval=30,  # Check every 30 seconds
        timeout=300,  # Timeout after 5 minutes
        mode='poke',  # or 'reschedule' to free up worker
    )
    
    # Wait for API to be available
    wait_for_api = HttpSensor(
        task_id='wait_for_api',
        http_conn_id='api_default',
        endpoint='health',
        poke_interval=60,
        timeout=600,
    )
    
    # Process after both sensors succeed
    process = PythonOperator(
        task_id='process_when_ready',
        python_callable=lambda: print("All dependencies met, processing..."),
    )
    
    [wait_for_file, wait_for_api] >> process

# ==================== Branching Example ====================

@dag(
    'branching_workflow',
    default_args=default_args,
    description='DAG with conditional branching',
    schedule_interval='@daily',
    tags=['branching'],
)
def branching_workflow():
    """DAG with conditional execution paths."""
    
    @task.branch
    def check_data_quality() -> str:
        """Check data quality and decide path."""
        import random
        
        # Simulate quality check
        quality_score = random.random()
        
        if quality_score > 0.8:
            return 'high_quality_path'
        elif quality_score > 0.5:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'
    
    @task
    def high_quality_processing():
        """Process high quality data."""
        print("Processing high quality data - minimal cleaning needed")
        return "High quality processed"
    
    @task
    def medium_quality_processing():
        """Process medium quality data."""
        print("Processing medium quality data - standard cleaning")
        return "Medium quality processed"
    
    @task
    def low_quality_processing():
        """Process low quality data."""
        print("Processing low quality data - extensive cleaning")
        return "Low quality processed"
    
    @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
    def merge_results(results: Optional[str] = None):
        """Merge results from any path."""
        print(f"Merging results: {results}")
        return "Results merged"
    
    # Define the workflow
    quality_check = check_data_quality()
    
    high_path = high_quality_processing()
    high_path.task_id = 'high_quality_path'
    
    medium_path = medium_quality_processing()
    medium_path.task_id = 'medium_quality_path'
    
    low_path = low_quality_processing()
    low_path.task_id = 'low_quality_path'
    
    merge = merge_results()
    
    # Set up branching
    quality_check >> [high_path, medium_path, low_path] >> merge

branching_dag = branching_workflow()

# ==================== Custom Operators ====================

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class DataQualityOperator(BaseOperator):
    """Custom operator for data quality checks."""
    
    template_fields = ['table_name', 'quality_checks']
    
    @apply_defaults
    def __init__(
        self,
        table_name: str,
        quality_checks: List[Dict[str, Any]],
        conn_id: str = 'postgres_default',
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.table_name = table_name
        self.quality_checks = quality_checks
        self.conn_id = conn_id
    
    def execute(self, context):
        """Execute data quality checks."""
        self.log.info(f"Running quality checks on {self.table_name}")
        
        failed_checks = []
        
        for check in self.quality_checks:
            check_name = check.get('name', 'Unknown')
            check_sql = check.get('sql')
            expected = check.get('expected')
            
            self.log.info(f"Running check: {check_name}")
            
            # Simulate running SQL query
            # In real implementation, use PostgresHook
            result = self._run_check(check_sql)
            
            if result != expected:
                failed_checks.append(check_name)
                self.log.error(f"Check failed: {check_name}")
            else:
                self.log.info(f"Check passed: {check_name}")
        
        if failed_checks:
            raise AirflowException(f"Data quality checks failed: {failed_checks}")
        
        self.log.info("All quality checks passed")
        return True
    
    def _run_check(self, sql: str) -> Any:
        """Run a single quality check."""
        # Simulate SQL execution
        import random
        return random.choice([True, False])

# ==================== Configuration Management ====================

class AirflowConfig:
    """Centralized configuration for Airflow DAGs."""
    
    @staticmethod
    def get_config(key: str, default: Any = None) -> Any:
        """Get configuration value."""
        try:
            # Try to get from Airflow Variables
            return Variable.get(key, default_var=default)
        except:
            # Fall back to environment variables
            return os.environ.get(key, default)
    
    @staticmethod
    def get_connection_string(conn_id: str) -> str:
        """Get connection string for database."""
        # In production, use Airflow Connections
        return f"postgresql://user:pass@localhost:5432/{conn_id}"
    
    @staticmethod
    def get_email_config() -> Dict[str, Any]:
        """Get email configuration."""
        return {
            'to': Variable.get('alert_emails', default_var=['admin@example.com']),
            'subject_prefix': '[Airflow Alert]',
            'smtp_host': Variable.get('smtp_host', default_var='localhost'),
            'smtp_port': Variable.get('smtp_port', default_var=587),
        }

# ==================== Monitoring and Alerting ====================

def create_monitoring_dag() -> DAG:
    """Create a DAG for monitoring other DAGs."""
    
    dag = DAG(
        'airflow_monitoring',
        default_args=default_args,
        description='Monitor Airflow DAGs and send alerts',
        schedule_interval='*/30 * * * *',  # Every 30 minutes
        tags=['monitoring', 'meta'],
    )
    
    with dag:
        
        def check_dag_status(**context):
            """Check status of all DAGs."""
            from airflow.models import DagRun, DagModel
            from airflow.utils.state import State
            
            # Get all active DAGs
            active_dags = context['dag'].get_active_dags()
            
            issues = []
            
            for dag_id in active_dags:
                # Check for failed runs
                failed_runs = DagRun.find(
                    dag_id=dag_id,
                    state=State.FAILED,
                    execution_date__gte=context['execution_date'] - timedelta(hours=24)
                )
                
                if failed_runs:
                    issues.append(f"DAG {dag_id} has {len(failed_runs)} failed runs")
            
            if issues:
                context['task_instance'].xcom_push(key='issues', value=issues)
                return 'send_alert'
            return 'all_healthy'
        
        check_status = PythonOperator(
            task_id='check_dag_status',
            python_callable=check_dag_status,
            provide_context=True,
        )
        
        send_alert = EmailOperator(
            task_id='send_alert',
            to=AirflowConfig.get_email_config()['to'],
            subject='Airflow Monitoring Alert',
            html_content="""
            

Airflow Issues Detected

The following issues were detected:

    {% for issue in task_instance.xcom_pull(task_ids='check_dag_status', key='issues') %}
  • {{ issue }}
  • {% endfor %}
""", trigger_rule=TriggerRule.ONE_SUCCESS, ) all_healthy = DummyOperator( task_id='all_healthy', trigger_rule=TriggerRule.ONE_SUCCESS, ) check_status >> [send_alert, all_healthy] return dag monitoring_dag = create_monitoring_dag() # ==================== Best Practices Examples ==================== @dag( 'best_practices_dag', default_args=default_args, description='DAG demonstrating Airflow best practices', schedule_interval='@daily', tags=['best_practices'], max_active_runs=1, catchup=False, ) def best_practices_dag(): """DAG demonstrating Airflow best practices.""" @task(retries=3, retry_delay=timedelta(minutes=2)) def idempotent_task(date: str) -> Dict[str, Any]: """ Idempotent task - can be safely retried. Always produces same result for same input. """ # Use execution date for idempotency result_key = f"result_{date}" # Check if already processed existing_result = check_if_processed(result_key) if existing_result: print(f"Already processed for {date}, returning cached result") return existing_result # Process data result = { 'date': date, 'processed_at': datetime.now().isoformat(), 'status': 'success' } # Save result for idempotency save_result(result_key, result) return result @task def atomic_operation(data: Dict[str, Any]) -> bool: """ Atomic operation - either fully completes or fully fails. No partial state left behind. """ import tempfile from contextlib import contextmanager @contextmanager def atomic_write(filepath): """Write atomically using temporary file and rename.""" temp_path = f"{filepath}.tmp" try: yield temp_path # Atomic rename os.rename(temp_path, filepath) except Exception as e: # Clean up on failure if os.path.exists(temp_path): os.remove(temp_path) raise e output_file = f"/tmp/output_{data['date']}.json" with atomic_write(output_file) as temp_file: with open(temp_file, 'w') as f: json.dump(data, f) return True @task(pool='limited_resources', pool_slots=1) def resource_limited_task(data: Dict[str, Any]) -> Dict[str, Any]: """ Task with resource limitations. Uses Airflow pools to limit concurrent execution. """ print(f"Processing with limited resources: {data}") # Simulate resource-intensive operation import time time.sleep(5) return {'processed': True, **data} @task def cleanup_task(**context): """ Cleanup task that always runs. Uses trigger_rule to run even if upstream fails. """ print("Performing cleanup...") # Clean up temporary files temp_dir = Path('/tmp') for temp_file in temp_dir.glob('*.tmp'): try: temp_file.unlink() print(f"Cleaned up: {temp_file}") except Exception as e: print(f"Failed to clean up {temp_file}: {e}") return "Cleanup completed" # Define workflow execution_date = "{{ ds }}" # Airflow template variable processed_data = idempotent_task(execution_date) atomic_result = atomic_operation(processed_data) limited_result = resource_limited_task(processed_data) # Cleanup runs regardless of upstream success/failure cleanup = cleanup_task() cleanup.trigger_rule = TriggerRule.ALL_DONE [atomic_result, limited_result] >> cleanup best_practices = best_practices_dag() # ==================== Helper Functions ==================== def check_if_processed(key: str) -> Optional[Dict[str, Any]]: """Check if data has already been processed.""" # In production, check database or cache # This is a simplified example cache_file = Path(f'/tmp/airflow_cache/{key}.json') if cache_file.exists(): with open(cache_file, 'r') as f: return json.load(f) return None def save_result(key: str, result: Dict[str, Any]): """Save processing result for idempotency.""" cache_dir = Path('/tmp/airflow_cache') cache_dir.mkdir(exist_ok=True) cache_file = cache_dir / f'{key}.json' with open(cache_file, 'w') as f: json.dump(result, f) # Example usage and testing if __name__ == "__main__": print("šŸš€ Apache Airflow DAG Examples\n") # Example 1: Basic DAG structure print("1ļøāƒ£ Basic DAG Structure:") print(f" DAG ID: {basic_dag.dag_id}") print(f" Schedule: {basic_dag.schedule_interval}") print(f" Tasks: {[task.task_id for task in basic_dag.tasks]}") # Example 2: Task dependencies print("\n2ļøāƒ£ Task Dependencies:") for task in basic_dag.tasks: upstream = [t.task_id for t in task.upstream_list] downstream = [t.task_id for t in task.downstream_list] print(f" {task.task_id}:") if upstream: print(f" Upstream: {upstream}") if downstream: print(f" Downstream: {downstream}") # Example 3: DAG configuration print("\n3ļøāƒ£ Default Arguments:") for key, value in default_args.items(): print(f" {key}: {value}") # Example 4: Available operators print("\n4ļøāƒ£ Common Operators:") operators = [ "PythonOperator - Execute Python functions", "BashOperator - Run bash commands", "EmailOperator - Send emails", "SqlOperator - Execute SQL queries", "HttpOperator - Make HTTP requests", "FileSensor - Wait for files", "S3KeySensor - Wait for S3 objects", "ExternalTaskSensor - Wait for other DAGs" ] for op in operators: print(f" • {op}") # Example 5: Schedule intervals print("\n5ļøāƒ£ Schedule Intervals:") schedules = [ ("@once", "Run once"), ("@hourly", "Run every hour"), ("@daily", "Run every day at midnight"), ("@weekly", "Run every week at midnight on Sunday"), ("@monthly", "Run every month at midnight on the first day"), ("@yearly", "Run every year at midnight on January 1"), ("*/5 * * * *", "Run every 5 minutes"), ("0 2 * * *", "Run daily at 2 AM") ] for schedule, description in schedules: print(f" {schedule}: {description}") # Example 6: Trigger rules print("\n6ļøāƒ£ Trigger Rules:") rules = [ ("all_success", "All upstream tasks succeeded (default)"), ("all_failed", "All upstream tasks failed"), ("all_done", "All upstream tasks completed"), ("one_success", "At least one upstream task succeeded"), ("one_failed", "At least one upstream task failed"), ("none_failed", "No upstream tasks failed"), ("none_failed_min_one_success", "No failures and at least one success"), ("dummy", "Dependencies are for show only") ] for rule, description in rules: print(f" {rule}: {description}") # Example 7: XCom usage print("\n7ļøāƒ£ XCom (Cross-Communication):") print(" Push: context['task_instance'].xcom_push(key='data', value=result)") print(" Pull: context['task_instance'].xcom_pull(task_ids='task_id', key='data')") # Example 8: Template variables print("\n8ļøāƒ£ Template Variables:") variables = [ ("{{ ds }}", "Execution date as YYYY-MM-DD"), ("{{ ds_nodash }}", "Execution date as YYYYMMDD"), ("{{ prev_ds }}", "Previous execution date"), ("{{ next_ds }}", "Next execution date"), ("{{ ti }}", "Task instance object"), ("{{ task.task_id }}", "Current task ID"), ("{{ dag.dag_id }}", "Current DAG ID"), ("{{ run_id }}", "Current run ID") ] for var, description in variables: print(f" {var}: {description}") # Example 9: Best practices print("\n9ļøāƒ£ Airflow Best Practices:") practices = [ "šŸŽÆ Keep tasks idempotent - same input produces same output", "āš›ļø Make tasks atomic - fully complete or fully fail", "šŸ“¦ Keep tasks small and focused - one task, one purpose", "šŸ”„ Use retries appropriately - configure retry logic", "šŸ’¾ Don't store large data in XCom - use external storage", "šŸ“… Set proper start_date - use static dates, not dynamic", "🚫 Avoid top-level code in DAG files - use functions", "šŸ“Š Use pools to limit resource usage", "šŸ·ļø Tag DAGs for better organization", "šŸ“ Document DAGs and tasks clearly" ] for practice in practices: print(f" {practice}") # Example 10: CLI commands print("\nšŸ”Ÿ Useful CLI Commands:") commands = [ "airflow db init - Initialize database", "airflow webserver - Start web UI", "airflow scheduler - Start scheduler", "airflow dags list - List all DAGs", "airflow tasks list - List tasks in DAG", "airflow dags trigger - Manually trigger DAG", "airflow dags pause - Pause DAG", "airflow dags unpause - Unpause DAG", "airflow tasks test - Test single task" ] for cmd in commands: print(f" {cmd}") print("\nāœ… Apache Airflow basics demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Apache Airflow Best Practices šŸ“‹

Pro Tip: Think of Airflow as the conductor of your data orchestra - it doesn't play the instruments (run the actual processing) but ensures everyone plays at the right time. Always design tasks to be idempotent - they should produce the same result when run multiple times with the same input. This makes retries safe and debugging easier. Keep tasks atomic - use transactions, temporary files with atomic rename, or two-phase commits. Use static start_dates (not datetime.now()) to ensure consistent scheduling. Leverage XCom for small data passing between tasks, but use external storage (S3, database) for large datasets. Set appropriate retry policies - not all tasks should retry the same way. Use pools to limit concurrent access to resources like database connections. Implement proper monitoring with SLAs and alerts. Use task groups to organize related tasks visually. Test tasks individually using 'airflow tasks test' before running the full DAG. Version control your DAGs and use CI/CD for deployment. Remember that DAG files are parsed frequently - avoid heavy computation at the module level. Most importantly: treat your DAGs as production code with proper testing, documentation, and code review!

Mastering Apache Airflow basics provides the foundation for building robust, scalable workflow orchestration. You can now create DAGs with complex dependencies, use various operators for different tasks, implement branching and dynamic workflows, handle errors with retries and alerts, and monitor pipeline execution effectively. Whether you're building ETL pipelines, orchestrating ML workflows, or automating business processes, Airflow provides the tools for reliable task orchestration at scale! šŸš€