Skip to main content

๐Ÿ”ง Creating DAGs: Design Complex Workflows with Code

Creating effective DAGs (Directed Acyclic Graphs) in Apache Airflow is both an art and a science - it requires understanding workflow patterns, dependency management, and best practices for maintainable pipeline design. Like architecting a complex system, well-designed DAGs are modular, reusable, testable, and scalable. Whether you're building simple linear pipelines or complex branching workflows with hundreds of tasks, mastering DAG creation patterns is essential for production-grade orchestration. Let's explore the comprehensive world of DAG design and implementation! ๐Ÿ—๏ธ

The DAG Design Architecture

Think of DAG creation as building with intelligent LEGO blocks - each task is a self-contained unit that connects to others through dependencies, forming complex workflows that can branch, merge, and adapt dynamically. Using Python's flexibility, Airflow allows you to create DAGs programmatically, generate them from configuration, implement complex patterns, and build reusable components. Understanding DAG lifecycle, task patterns, and design principles is crucial for scalable orchestration!

graph TB A[DAG Creation] --> B[Design Patterns] A --> C[Task Patterns] A --> D[Dependencies] A --> E[Advanced Features] B --> F[Linear Pipeline] B --> G[Fan-Out/Fan-In] B --> H[Conditional] B --> I[Dynamic] C --> J[ETL Tasks] C --> K[ML Tasks] C --> L[Notification] C --> M[Validation] D --> N[Sequential] D --> O[Parallel] D --> P[Cross-DAG] D --> Q[External] E --> R[SubDAGs] E --> S[Task Groups] E --> T[Dynamic Tasks] E --> U[Templating] V[Components] --> W[Operators] V --> X[Sensors] V --> Y[Hooks] V --> Z[Executors] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Enterprise Data Platform ๐Ÿข

You're building an enterprise data platform that orchestrates hundreds of data pipelines, handles complex ETL/ELT workflows with dependencies, manages machine learning model training and deployment, processes real-time and batch data streams, coordinates multi-cloud data operations, implements data quality gates and validation, provides self-service DAG generation for teams, and maintains audit trails and compliance. Your platform must support dynamic workflow generation, handle failures gracefully, scale horizontally, and provide clear visibility into pipeline operations. Let's build a comprehensive DAG creation framework!

# Advanced DAG Creation Patterns and Best Practices
# pip install apache-airflow pandas pyyaml jinja2

import os
import json
import yaml
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import logging
from functools import wraps

# Airflow imports
from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from airflow.exceptions import AirflowException

# ==================== DAG Factory Pattern ====================

class DAGFactory:
    """Factory for creating DAGs with consistent configuration."""
    
    def __init__(self, default_args: Optional[Dict[str, Any]] = None):
        """Initialize DAG factory with default arguments."""
        self.default_args = default_args or {
            'owner': 'data_team',
            'depends_on_past': False,
            'email_on_failure': True,
            'email_on_retry': False,
            'retries': 2,
            'retry_delay': timedelta(minutes=5),
        }
        self.logger = logging.getLogger(__name__)
    
    def create_dag(
        self,
        dag_id: str,
        description: str,
        schedule_interval: str = '@daily',
        tags: Optional[List[str]] = None,
        **kwargs
    ) -> DAG:
        """Create a DAG with standard configuration."""
        
        # Merge default args with any provided args
        dag_args = {**self.default_args, **kwargs.get('default_args', {})}
        
        # Create DAG
        dag = DAG(
            dag_id=dag_id,
            description=description,
            default_args=dag_args,
            schedule_interval=schedule_interval,
            start_date=kwargs.get('start_date', days_ago(1)),
            catchup=kwargs.get('catchup', False),
            tags=tags or [],
            max_active_runs=kwargs.get('max_active_runs', 1),
            **{k: v for k, v in kwargs.items() 
               if k not in ['default_args', 'start_date', 'catchup', 'max_active_runs']}
        )
        
        self.logger.info(f"Created DAG: {dag_id}")
        return dag
    
    def create_etl_dag(
        self,
        dag_id: str,
        source: str,
        destination: str,
        transform_func: Optional[Callable] = None,
        **kwargs
    ) -> DAG:
        """Create a standard ETL DAG."""
        
        dag = self.create_dag(
            dag_id=dag_id,
            description=f"ETL pipeline from {source} to {destination}",
            tags=['etl', source, destination],
            **kwargs
        )
        
        with dag:
            # Extract task
            extract = PythonOperator(
                task_id='extract',
                python_callable=self._create_extract_task(source),
                op_kwargs={'source': source}
            )
            
            # Transform task
            if transform_func:
                transform = PythonOperator(
                    task_id='transform',
                    python_callable=transform_func,
                )
            else:
                transform = DummyOperator(task_id='transform')
            
            # Load task
            load = PythonOperator(
                task_id='load',
                python_callable=self._create_load_task(destination),
                op_kwargs={'destination': destination}
            )
            
            # Set dependencies
            extract >> transform >> load
        
        return dag
    
    def _create_extract_task(self, source: str) -> Callable:
        """Create extraction function for source."""
        def extract_data(**context):
            print(f"Extracting data from {source}")
            # Implement extraction logic based on source type
            data = {'source': source, 'extracted_at': datetime.now().isoformat()}
            context['task_instance'].xcom_push(key='extracted_data', value=data)
            return data
        return extract_data
    
    def _create_load_task(self, destination: str) -> Callable:
        """Create load function for destination."""
        def load_data(**context):
            data = context['task_instance'].xcom_pull(task_ids='transform', key='return_value')
            print(f"Loading data to {destination}: {data}")
            # Implement load logic based on destination type
            return {'status': 'loaded', 'destination': destination}
        return load_data

# ==================== Configuration-Driven DAGs ====================

@dataclass
class PipelineConfig:
    """Configuration for a data pipeline."""
    name: str
    schedule: str
    source: Dict[str, Any]
    transformations: List[Dict[str, Any]]
    destination: Dict[str, Any]
    notifications: Optional[Dict[str, Any]] = None
    quality_checks: Optional[List[Dict[str, Any]]] = None
    tags: List[str] = field(default_factory=list)

class ConfigDrivenDAG:
    """Create DAGs from YAML/JSON configuration."""
    
    def __init__(self, config_path: str):
        """Initialize with configuration file path."""
        self.config_path = Path(config_path)
        self.configs = self._load_configs()
        self.dags = {}
        self._create_dags()
    
    def _load_configs(self) -> List[PipelineConfig]:
        """Load pipeline configurations from file."""
        configs = []
        
        if self.config_path.suffix == '.yaml':
            with open(self.config_path, 'r') as f:
                data = yaml.safe_load(f)
        elif self.config_path.suffix == '.json':
            with open(self.config_path, 'r') as f:
                data = json.load(f)
        else:
            raise ValueError(f"Unsupported config format: {self.config_path.suffix}")
        
        for pipeline_data in data.get('pipelines', []):
            configs.append(PipelineConfig(**pipeline_data))
        
        return configs
    
    def _create_dags(self):
        """Create DAGs from configurations."""
        factory = DAGFactory()
        
        for config in self.configs:
            dag_id = f"pipeline_{config.name}"
            
            dag = factory.create_dag(
                dag_id=dag_id,
                description=f"Auto-generated pipeline: {config.name}",
                schedule_interval=config.schedule,
                tags=config.tags
            )
            
            with dag:
                tasks = self._build_pipeline_tasks(config)
                self._set_dependencies(tasks)
            
            self.dags[dag_id] = dag
    
    def _build_pipeline_tasks(self, config: PipelineConfig) -> Dict[str, Any]:
        """Build tasks for pipeline configuration."""
        tasks = {}
        
        # Create extraction task
        tasks['extract'] = self._create_source_task(config.source)
        
        # Create transformation tasks
        for i, transform in enumerate(config.transformations):
            task_id = f"transform_{i}_{transform['type']}"
            tasks[task_id] = self._create_transform_task(transform)
        
        # Create quality check tasks
        if config.quality_checks:
            for i, check in enumerate(config.quality_checks):
                task_id = f"quality_check_{i}"
                tasks[task_id] = self._create_quality_task(check)
        
        # Create load task
        tasks['load'] = self._create_destination_task(config.destination)
        
        # Create notification task
        if config.notifications:
            tasks['notify'] = self._create_notification_task(config.notifications)
        
        return tasks
    
    def _create_source_task(self, source_config: Dict[str, Any]):
        """Create source extraction task."""
        source_type = source_config['type']
        
        if source_type == 'database':
            from airflow.providers.postgres.operators.postgres import PostgresOperator
            return PostgresOperator(
                task_id='extract_from_database',
                sql=source_config['query'],
                postgres_conn_id=source_config['connection_id']
            )
        elif source_type == 'api':
            from airflow.providers.http.operators.http import SimpleHttpOperator
            return SimpleHttpOperator(
                task_id='extract_from_api',
                endpoint=source_config['endpoint'],
                method=source_config.get('method', 'GET'),
                http_conn_id=source_config['connection_id']
            )
        else:
            return DummyOperator(task_id=f'extract_{source_type}')
    
    def _create_transform_task(self, transform_config: Dict[str, Any]):
        """Create transformation task."""
        transform_type = transform_config['type']
        
        def transform_data(**context):
            print(f"Applying transformation: {transform_type}")
            # Implementation would vary based on transform type
            return {'transformed': True, 'type': transform_type}
        
        return PythonOperator(
            task_id=f"transform_{transform_type}",
            python_callable=transform_data
        )
    
    def _create_quality_task(self, check_config: Dict[str, Any]):
        """Create data quality check task."""
        def check_quality(**context):
            print(f"Running quality check: {check_config['name']}")
            # Implement quality check logic
            return {'check': check_config['name'], 'passed': True}
        
        return PythonOperator(
            task_id=f"quality_{check_config['name']}",
            python_callable=check_quality
        )
    
    def _create_destination_task(self, dest_config: Dict[str, Any]):
        """Create destination load task."""
        dest_type = dest_config['type']
        
        def load_to_destination(**context):
            print(f"Loading to {dest_type}: {dest_config.get('table', 'default')}")
            return {'loaded': True, 'destination': dest_type}
        
        return PythonOperator(
            task_id=f"load_to_{dest_type}",
            python_callable=load_to_destination
        )
    
    def _create_notification_task(self, notify_config: Dict[str, Any]):
        """Create notification task."""
        from airflow.operators.email import EmailOperator
        
        return EmailOperator(
            task_id='send_notification',
            to=notify_config['recipients'],
            subject=notify_config.get('subject', 'Pipeline Completed'),
            html_content=notify_config.get('template', '

Pipeline completed successfully

') ) def _set_dependencies(self, tasks: Dict[str, Any]): """Set task dependencies based on configuration.""" # Simple linear dependency for now # In production, parse dependency configuration task_list = list(tasks.values()) for i in range(len(task_list) - 1): task_list[i] >> task_list[i + 1] # ==================== Dynamic Task Generation ==================== @dag( 'dynamic_task_generation', default_args={ 'owner': 'data_team', 'retries': 2, }, description='DAG with dynamically generated tasks', schedule_interval='@daily', start_date=days_ago(1), tags=['dynamic', 'advanced'], ) def dynamic_task_dag(): """DAG that generates tasks dynamically based on data.""" @task def get_processing_list() -> List[str]: """Get list of items to process.""" # In production, this could query a database or API return ['dataset_a', 'dataset_b', 'dataset_c', 'dataset_d'] @task def process_dataset(dataset_name: str) -> Dict[str, Any]: """Process a single dataset.""" import time import random print(f"Processing dataset: {dataset_name}") # Simulate processing time.sleep(random.uniform(1, 3)) result = { 'dataset': dataset_name, 'records_processed': random.randint(1000, 10000), 'status': 'success', 'processed_at': datetime.now().isoformat() } return result @task def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: """Aggregate results from all processing tasks.""" total_records = sum(r['records_processed'] for r in results) summary = { 'total_datasets': len(results), 'total_records': total_records, 'all_successful': all(r['status'] == 'success' for r in results), 'timestamp': datetime.now().isoformat() } print(f"Aggregated results: {summary}") return summary @task def send_report(summary: Dict[str, Any]): """Send summary report.""" print(f"Sending report: {summary}") # Implement actual notification logic return "Report sent successfully" # Build dynamic workflow datasets = get_processing_list() # Process each dataset in parallel processing_results = process_dataset.expand(dataset_name=datasets) # Aggregate all results summary = aggregate_results(processing_results) # Send final report report = send_report(summary) # Instantiate the dynamic DAG dynamic_dag = dynamic_task_dag() # ==================== Complex Dependency Patterns ==================== class DependencyPatterns: """Common DAG dependency patterns.""" @staticmethod def fan_out_fan_in(dag: DAG, source_task, processing_tasks: List, sink_task): """ Fan-out/fan-in pattern: source -> [task1, task2, task3] -> sink """ source_task >> processing_tasks >> sink_task @staticmethod def conditional_branching(dag: DAG, condition_task, branch_tasks: Dict[str, List]): """ Conditional branching pattern: condition -> branch_a or branch_b -> merge """ from airflow.operators.python import BranchPythonOperator with dag: merge = DummyOperator( task_id='merge', trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS ) for branch_id, tasks in branch_tasks.items(): condition_task >> tasks[0] tasks[-1] >> merge return merge @staticmethod def parallel_sequential(dag: DAG, parallel_groups: List[List]): """ Parallel groups in sequence: [group1_tasks] -> [group2_tasks] -> [group3_tasks] """ for i in range(len(parallel_groups) - 1): for task in parallel_groups[i]: task >> parallel_groups[i + 1] @staticmethod def diamond_dependency(dag: DAG, start_task, parallel_tasks: List, end_task): """ Diamond pattern: -> task1 -> start end -> task2 -> """ start_task >> parallel_tasks >> end_task # ==================== Task Group Patterns ==================== def create_data_quality_group( parent_dag: DAG, group_id: str, table_name: str ) -> TaskGroup: """Create a reusable data quality check task group.""" with TaskGroup(group_id=group_id, dag=parent_dag) as quality_group: @task(task_id=f'{group_id}.check_nulls') def check_nulls(table: str) -> Dict[str, Any]: """Check for null values.""" print(f"Checking nulls in {table}") return {'table': table, 'nulls_found': False} @task(task_id=f'{group_id}.check_duplicates') def check_duplicates(table: str) -> Dict[str, Any]: """Check for duplicate records.""" print(f"Checking duplicates in {table}") return {'table': table, 'duplicates_found': False} @task(task_id=f'{group_id}.check_schema') def check_schema(table: str) -> Dict[str, Any]: """Validate table schema.""" print(f"Validating schema for {table}") return {'table': table, 'schema_valid': True} @task(task_id=f'{group_id}.aggregate_checks') def aggregate_checks(*check_results) -> Dict[str, Any]: """Aggregate all quality check results.""" all_passed = all( not r.get('nulls_found', False) and not r.get('duplicates_found', False) and r.get('schema_valid', True) for r in check_results ) return { 'table': table_name, 'all_checks_passed': all_passed, 'timestamp': datetime.now().isoformat() } # Define task dependencies within group null_check = check_nulls(table_name) dup_check = check_duplicates(table_name) schema_check = check_schema(table_name) result = aggregate_checks(null_check, dup_check, schema_check) return quality_group # ==================== SubDAG Pattern ==================== def create_subdag( parent_dag_id: str, child_dag_id: str, default_args: Dict[str, Any], schedule_interval: str ) -> DAG: """Create a SubDAG for modular workflow components.""" subdag = DAG( dag_id=f'{parent_dag_id}.{child_dag_id}', default_args=default_args, schedule_interval=schedule_interval, start_date=default_args.get('start_date', days_ago(1)) ) with subdag: start = DummyOperator(task_id='subdag_start') # Add subdag tasks for i in range(3): task = PythonOperator( task_id=f'subdag_task_{i}', python_callable=lambda x=i: print(f"SubDAG task {x}") ) start >> task end = DummyOperator(task_id='subdag_end') # Connect all tasks to end for task in subdag.tasks: if task.task_id != 'subdag_end' and task.task_id != 'subdag_start': task >> end return subdag # ==================== Cross-DAG Dependencies ==================== @dag( 'master_pipeline', default_args={'owner': 'data_team', 'retries': 1}, description='Master pipeline coordinating multiple DAGs', schedule_interval='@daily', start_date=days_ago(1), tags=['master', 'orchestration'], ) def master_pipeline(): """Master DAG that coordinates multiple child DAGs.""" @task def prepare_environment() -> Dict[str, Any]: """Prepare environment for child DAGs.""" print("Preparing environment...") # Set Airflow Variables for child DAGs Variable.set('processing_date', datetime.now().strftime('%Y-%m-%d')) Variable.set('processing_status', 'started') return { 'environment': 'prepared', 'timestamp': datetime.now().isoformat() } # Prepare environment env_prep = prepare_environment() # Trigger child DAGs trigger_etl = TriggerDagRunOperator( task_id='trigger_etl_dag', trigger_dag_id='etl_pipeline', conf={'source': 'master', 'priority': 'high'}, wait_for_completion=True, poke_interval=60, ) trigger_ml = TriggerDagRunOperator( task_id='trigger_ml_dag', trigger_dag_id='ml_pipeline', conf={'model': 'latest', 'training': True}, wait_for_completion=True, poke_interval=60, ) @task def validate_results() -> Dict[str, Any]: """Validate results from child DAGs.""" print("Validating results from child DAGs...") # Check Variables set by child DAGs etl_status = Variable.get('etl_status', default_var='unknown') ml_status = Variable.get('ml_status', default_var='unknown') return { 'etl_status': etl_status, 'ml_status': ml_status, 'validation': 'passed' if etl_status == 'success' and ml_status == 'success' else 'failed' } @task def cleanup(): """Clean up after pipeline completion.""" print("Cleaning up...") # Clear Variables Variable.delete('processing_date') Variable.delete('processing_status') return "Cleanup completed" # Define dependencies validation = validate_results() clean = cleanup() env_prep >> [trigger_etl, trigger_ml] >> validation >> clean master_dag = master_pipeline() # ==================== Template-Based DAGs ==================== class TemplateDAG: """Create DAGs from Jinja2 templates.""" def __init__(self, template_path: str): """Initialize with template path.""" from jinja2 import Environment, FileSystemLoader self.template_dir = Path(template_path).parent self.template_name = Path(template_path).name self.env = Environment(loader=FileSystemLoader(self.template_dir)) self.template = self.env.get_template(self.template_name) def render_dag(self, **context) -> str: """Render DAG code from template.""" return self.template.render(**context) def create_dag_file(self, output_path: str, **context): """Create DAG file from template.""" dag_code = self.render_dag(**context) with open(output_path, 'w') as f: f.write(dag_code) print(f"Created DAG file: {output_path}") # ==================== DAG Validation and Testing ==================== class DAGValidator: """Validate DAG structure and configuration.""" @staticmethod def validate_dag(dag: DAG) -> Dict[str, Any]: """Validate a DAG for common issues.""" issues = [] warnings = [] # Check for cycles if dag.has_task_cycle(): issues.append("DAG contains a cycle") # Check for duplicate task IDs task_ids = [task.task_id for task in dag.tasks] if len(task_ids) != len(set(task_ids)): issues.append("Duplicate task IDs found") # Check for orphaned tasks for task in dag.tasks: if not task.upstream_task_ids and not task.downstream_task_ids: warnings.append(f"Task '{task.task_id}' has no dependencies") # Check for proper start_date if not dag.start_date: issues.append("No start_date defined") # Check for email configuration if not dag.default_args.get('email'): warnings.append("No email configured for notifications") # Check task count if len(dag.tasks) > 100: warnings.append(f"DAG has {len(dag.tasks)} tasks - consider breaking it up") return { 'valid': len(issues) == 0, 'issues': issues, 'warnings': warnings, 'task_count': len(dag.tasks), 'has_schedule': dag.schedule_interval is not None } @staticmethod def test_dag_execution(dag: DAG, execution_date: datetime = None) -> Dict[str, Any]: """Test DAG execution in a controlled environment.""" from airflow.models import DagBag from airflow.utils.state import State execution_date = execution_date or datetime.now() try: # Create a test DagRun dag_run = dag.create_dagrun( state=State.RUNNING, execution_date=execution_date, run_type='manual', external_trigger=True ) # Test task execution order execution_order = [] for task in dag.topological_sort(): execution_order.append(task.task_id) return { 'success': True, 'execution_order': execution_order, 'dag_run_id': dag_run.run_id } except Exception as e: return { 'success': False, 'error': str(e) } # ==================== DAG Monitoring and Metrics ==================== class DAGMetrics: """Collect and report DAG metrics.""" @staticmethod def calculate_dag_complexity(dag: DAG) -> Dict[str, Any]: """Calculate complexity metrics for a DAG.""" # Calculate depth (longest path) def get_depth(task, visited=None): if visited is None: visited = set() if task in visited: return 0 visited.add(task) if not task.downstream_list: return 1 return 1 + max(get_depth(t, visited.copy()) for t in task.downstream_list) # Calculate width (max parallel tasks) levels = {} for task in dag.tasks: level = len(task.upstream_task_ids) levels[level] = levels.get(level, 0) + 1 max_width = max(levels.values()) if levels else 0 # Get root tasks root_tasks = [t for t in dag.tasks if not t.upstream_task_ids] # Calculate maximum depth max_depth = max(get_depth(t) for t in root_tasks) if root_tasks else 0 return { 'task_count': len(dag.tasks), 'depth': max_depth, 'width': max_width, 'complexity_score': len(dag.tasks) * max_depth * max_width, 'average_dependencies': sum(len(t.upstream_task_ids) for t in dag.tasks) / len(dag.tasks) if dag.tasks else 0 } # Example usage if __name__ == "__main__": print("๐Ÿ”ง Creating DAGs Examples\n") # Example 1: DAG Factory print("1๏ธโƒฃ Using DAG Factory:") factory = DAGFactory() etl_dag = factory.create_etl_dag( dag_id='factory_etl_pipeline', source='database', destination='warehouse', schedule_interval='@daily' ) print(f" Created DAG: {etl_dag.dag_id}") print(f" Tasks: {[t.task_id for t in etl_dag.tasks]}") # Example 2: Dynamic task generation print("\n2๏ธโƒฃ Dynamic Task Generation:") print(" @task") print(" def process_item(item_id: str):") print(" return process(item_id)") print(" ") print(" items = get_items()") print(" results = process_item.expand(item_id=items)") # Example 3: Task group patterns print("\n3๏ธโƒฃ Task Group Patterns:") patterns = [ "Data Quality Checks", "Transformation Pipeline", "Notification Group", "Error Handling Group" ] for pattern in patterns: print(f" โ€ข {pattern}") # Example 4: Dependency patterns print("\n4๏ธโƒฃ Common Dependency Patterns:") dep_patterns = [ ("Linear", "task1 >> task2 >> task3"), ("Fan-out/Fan-in", "start >> [task1, task2, task3] >> end"), ("Conditional", "check >> [branch_a, branch_b]"), ("Diamond", "start >> [parallel1, parallel2] >> end"), ("Cross-DAG", "ExternalTaskSensor + TriggerDagRunOperator") ] for pattern, example in dep_patterns: print(f" {pattern}:") print(f" {example}") # Example 5: Configuration-driven DAGs print("\n5๏ธโƒฃ Configuration-Driven DAGs:") sample_config = """ pipelines: - name: sales_etl schedule: "@daily" source: type: database connection_id: sales_db query: "SELECT * FROM sales" transformations: - type: clean - type: aggregate destination: type: warehouse table: fact_sales """ print(" Sample YAML configuration:") for line in sample_config.strip().split('\n'): print(f" {line}") # Example 6: DAG validation print("\n6๏ธโƒฃ DAG Validation:") # Create a test DAG for validation test_dag = DAG( 'test_dag', default_args={'owner': 'test'}, start_date=days_ago(1), schedule_interval='@daily' ) with test_dag: t1 = DummyOperator(task_id='task1') t2 = DummyOperator(task_id='task2') t3 = DummyOperator(task_id='task3') t1 >> t2 >> t3 validator = DAGValidator() validation_result = validator.validate_dag(test_dag) print(f" Valid: {validation_result['valid']}") print(f" Task count: {validation_result['task_count']}") print(f" Has schedule: {validation_result['has_schedule']}") # Example 7: DAG complexity metrics print("\n7๏ธโƒฃ DAG Complexity Metrics:") metrics = DAGMetrics.calculate_dag_complexity(test_dag) for metric, value in metrics.items(): print(f" {metric}: {value}") # Example 8: Template variables print("\n8๏ธโƒฃ Template Variables in DAGs:") variables = [ ("{{ ds }}", "Execution date YYYY-MM-DD"), ("{{ ds_nodash }}", "Execution date YYYYMMDD"), ("{{ prev_ds }}", "Previous execution date"), ("{{ next_ds }}", "Next execution date"), ("{{ macros.ds_add(ds, 7) }}", "Add days to date"), ("{{ params.my_param }}", "Access DAG parameters"), ("{{ var.value.my_variable }}", "Access Airflow Variables"), ("{{ conn.my_connection }}", "Access connections") ] for var, desc in variables: print(f" {var}: {desc}") # Example 9: Best practices print("\n9๏ธโƒฃ DAG Creation Best Practices:") practices = [ "๐Ÿ“ Use descriptive DAG and task IDs", "๐ŸŽฏ Keep DAGs focused - one workflow per DAG", "๐Ÿ”ง Use task groups for logical grouping", "โ™ป๏ธ Create reusable components", "๐Ÿ“Š Implement proper monitoring and alerting", "๐Ÿงช Test DAGs before production", "๐Ÿ“š Document DAG purpose and dependencies", "โšก Optimize for parallelism where possible", "๐Ÿ”’ Use connections and variables for secrets", "๐Ÿท๏ธ Tag DAGs for organization" ] for practice in practices: print(f" {practice}") # Example 10: Anti-patterns to avoid print("\n๐Ÿ”Ÿ DAG Anti-Patterns to Avoid:") antipatterns = [ ("Top-level code", "Avoid database queries at module level"), ("Non-idempotent tasks", "Tasks should produce same result on retry"), ("Large XCom data", "Don't pass large objects through XCom"), ("Dynamic start_date", "Don't use datetime.now() for start_date"), ("Circular dependencies", "Ensure DAG is acyclic"), ("Too many tasks", "Break large DAGs into smaller ones"), ("Missing error handling", "Always handle and log errors"), ("Hardcoded credentials", "Use Airflow connections") ] for pattern, issue in antipatterns: print(f" โŒ {pattern}: {issue}") print("\nโœ… DAG creation patterns demonstration complete!")

Key Takeaways and Best Practices ๐ŸŽฏ

DAG Creation Best Practices ๐Ÿ“‹

Pro Tip: Think of DAG creation as software architecture - each DAG should have a single, clear purpose and be composed of modular, reusable components. Use the DAG Factory pattern to standardize DAG creation across your organization. Leverage configuration-driven DAGs for repetitive patterns - define pipelines in YAML/JSON and generate DAGs programmatically. Implement task groups to organize related tasks visually and logically. Use dynamic task generation for scalable workflows that adapt to data. Keep DAGs focused - it's better to have multiple simple DAGs than one complex monolith. Implement proper dependency patterns: use fan-out/fan-in for parallel processing, conditional branching for decision logic, and cross-DAG dependencies for complex orchestration. Always validate DAGs before deployment - check for cycles, orphaned tasks, and missing configuration. Use template variables for dynamic behavior but avoid top-level code execution. Implement comprehensive monitoring with SLA checks and alerting. Version control your DAGs and use CI/CD for deployment. Test DAGs thoroughly - use Airflow's testing utilities to validate execution order and behavior. Remember: DAGs are code - apply software engineering best practices including code review, testing, and documentation!

Mastering DAG creation enables you to build sophisticated, maintainable workflow orchestration at scale. You can now design complex workflows with multiple patterns, generate DAGs dynamically from configuration, create reusable components with task groups, implement cross-DAG dependencies, and validate workflows before deployment. Whether you're building data pipelines, ML workflows, or business process automation, these DAG creation skills provide the foundation for enterprise-grade orchestration! ๐Ÿš€