Skip to main content

๐Ÿ”— Task Dependencies: Master Complex Workflow Relationships

Task dependencies are the backbone of workflow orchestration - they define the execution order, parallelism opportunities, and conditional logic that transform individual tasks into sophisticated pipelines. Like designing a complex supply chain, mastering task dependencies allows you to create workflows that adapt to data, handle failures gracefully, and maximize resource utilization through intelligent parallelization. Whether you're building simple sequential pipelines or complex conditional workflows with hundreds of interdependent tasks, understanding dependency patterns is crucial for effective orchestration. Let's explore the comprehensive world of task dependency management! ๐ŸŽฏ

The Task Dependency Architecture

Think of task dependencies as the neural pathways of your workflow - they determine how data and control flow through your pipeline, when tasks can run in parallel, and how failures propagate. Using Airflow's rich dependency system, you can create linear chains, parallel branches, conditional paths, and complex graphs that adapt to runtime conditions. Understanding trigger rules, sensors, and advanced dependency patterns is essential for building resilient, efficient workflows!

graph TB A[Task Dependencies] --> B[Basic Patterns] A --> C[Advanced Patterns] A --> D[Trigger Rules] A --> E[Dynamic Dependencies] B --> F[Sequential] B --> G[Parallel] B --> H[Branching] B --> I[Joining] C --> J[Conditional] C --> K[Short-Circuit] C --> L[Nested Groups] C --> M[Cross-DAG] D --> N[all_success] D --> O[one_success] D --> P[all_done] D --> Q[none_failed] E --> R[Data-Driven] E --> S[Time-Based] E --> T[External] E --> U[Computed] V[Optimization] --> W[Parallelism] V --> X[Resource Pools] V --> Y[Priority] V --> Z[Queues] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Intelligent Pipeline System ๐Ÿง 

You're building an intelligent pipeline system that processes data through multiple stages with complex dependencies, handles conditional execution based on data quality, manages parallel processing for performance, coordinates cross-system dependencies, implements intelligent retry and recovery strategies, optimizes resource usage through pools and priorities, adapts to varying data volumes dynamically, and maintains clear execution paths for debugging. Your system must handle thousands of tasks efficiently, provide clear visibility into execution flow, recover from partial failures, and scale horizontally. Let's build a comprehensive dependency management system!

# Advanced Task Dependency Patterns and Management
# pip install apache-airflow networkx matplotlib

import os
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum
import logging
import random
import time

# Airflow imports
from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.external_task import ExternalTaskSensor, ExternalTaskMarker
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.sensors.date_time import DateTimeSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain, cross_downstream
from airflow.models import Variable
from airflow.exceptions import AirflowException, AirflowSkipException

# For dependency visualization
import networkx as nx
import matplotlib.pyplot as plt

# ==================== Dependency Analyzer ====================

class DependencyAnalyzer:
    """Analyze and visualize task dependencies."""
    
    def __init__(self, dag: DAG):
        """Initialize with a DAG."""
        self.dag = dag
        self.graph = self._build_dependency_graph()
        self.logger = logging.getLogger(__name__)
    
    def _build_dependency_graph(self) -> nx.DiGraph:
        """Build NetworkX graph from DAG dependencies."""
        graph = nx.DiGraph()
        
        # Add nodes
        for task in self.dag.tasks:
            graph.add_node(
                task.task_id,
                task_type=type(task).__name__,
                trigger_rule=task.trigger_rule.value if hasattr(task.trigger_rule, 'value') else str(task.trigger_rule),
                retries=task.retries,
                retry_delay=str(task.retry_delay)
            )
        
        # Add edges
        for task in self.dag.tasks:
            for downstream_task in task.downstream_list:
                graph.add_edge(task.task_id, downstream_task.task_id)
        
        return graph
    
    def get_execution_paths(self) -> List[List[str]]:
        """Get all possible execution paths through the DAG."""
        paths = []
        
        # Find start nodes (no predecessors)
        start_nodes = [n for n in self.graph.nodes() if self.graph.in_degree(n) == 0]
        
        # Find end nodes (no successors)
        end_nodes = [n for n in self.graph.nodes() if self.graph.out_degree(n) == 0]
        
        # Find all paths from start to end nodes
        for start in start_nodes:
            for end in end_nodes:
                try:
                    all_paths = list(nx.all_simple_paths(self.graph, start, end))
                    paths.extend(all_paths)
                except nx.NetworkXNoPath:
                    continue
        
        return paths
    
    def get_critical_path(self) -> List[str]:
        """Get the critical path (longest path) through the DAG."""
        try:
            return nx.dag_longest_path(self.graph)
        except nx.NetworkXError:
            self.logger.warning("Could not determine critical path")
            return []
    
    def get_parallel_groups(self) -> List[Set[str]]:
        """Identify groups of tasks that can run in parallel."""
        parallel_groups = []
        
        # Use topological generations to find parallel tasks
        for generation in nx.topological_generations(self.graph):
            if len(generation) > 1:
                parallel_groups.append(set(generation))
        
        return parallel_groups
    
    def detect_bottlenecks(self) -> List[str]:
        """Identify potential bottleneck tasks."""
        bottlenecks = []
        
        for node in self.graph.nodes():
            # High fan-in (many dependencies)
            if self.graph.in_degree(node) > 3:
                bottlenecks.append(node)
            
            # High fan-out (many dependents)
            if self.graph.out_degree(node) > 3:
                bottlenecks.append(node)
        
        return list(set(bottlenecks))
    
    def visualize_dependencies(self, output_path: str = None):
        """Visualize the dependency graph."""
        plt.figure(figsize=(12, 8))
        
        # Calculate layout
        pos = nx.spring_layout(self.graph, k=2, iterations=50)
        
        # Draw nodes
        nx.draw_networkx_nodes(
            self.graph, pos,
            node_color='lightblue',
            node_size=1000,
            alpha=0.9
        )
        
        # Draw edges
        nx.draw_networkx_edges(
            self.graph, pos,
            edge_color='gray',
            arrows=True,
            arrowsize=20,
            arrowstyle='->'
        )
        
        # Draw labels
        nx.draw_networkx_labels(
            self.graph, pos,
            font_size=8,
            font_weight='bold'
        )
        
        plt.title(f"Task Dependencies: {self.dag.dag_id}")
        plt.axis('off')
        
        if output_path:
            plt.savefig(output_path)
        else:
            plt.show()
        
        plt.close()
    
    def get_dependency_report(self) -> Dict[str, Any]:
        """Generate comprehensive dependency analysis report."""
        return {
            'total_tasks': len(self.dag.tasks),
            'total_dependencies': self.graph.number_of_edges(),
            'critical_path': self.get_critical_path(),
            'critical_path_length': len(self.get_critical_path()),
            'parallel_groups': [list(g) for g in self.get_parallel_groups()],
            'max_parallelism': max(len(g) for g in self.get_parallel_groups()) if self.get_parallel_groups() else 0,
            'bottlenecks': self.detect_bottlenecks(),
            'execution_paths': len(self.get_execution_paths()),
            'is_acyclic': nx.is_directed_acyclic_graph(self.graph)
        }

# ==================== Advanced Dependency Patterns ====================

class DependencyPatterns:
    """Advanced dependency pattern implementations."""
    
    @staticmethod
    def create_conditional_chain(
        dag: DAG,
        condition_func: Callable,
        success_tasks: List,
        failure_tasks: List
    ):
        """Create conditional execution chain."""
        
        with dag:
            # Condition checker
            condition = BranchPythonOperator(
                task_id='check_condition',
                python_callable=condition_func
            )
            
            # Success path
            success_start = DummyOperator(
                task_id='success_path_start'
            )
            
            success_end = DummyOperator(
                task_id='success_path_end',
                trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
            )
            
            # Failure path
            failure_start = DummyOperator(
                task_id='failure_path_start'
            )
            
            failure_end = DummyOperator(
                task_id='failure_path_end',
                trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
            )
            
            # Join paths
            join = DummyOperator(
                task_id='join_paths',
                trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
            )
            
            # Set up dependencies
            condition >> [success_start, failure_start]
            
            if success_tasks:
                success_start >> success_tasks[0]
                chain(*success_tasks)
                success_tasks[-1] >> success_end
            else:
                success_start >> success_end
            
            if failure_tasks:
                failure_start >> failure_tasks[0]
                chain(*failure_tasks)
                failure_tasks[-1] >> failure_end
            else:
                failure_start >> failure_end
            
            [success_end, failure_end] >> join
            
            return condition, join
    
    @staticmethod
    def create_retry_pattern(
        dag: DAG,
        main_task: Any,
        max_retries: int = 3,
        backoff_factor: int = 2
    ):
        """Create sophisticated retry pattern with exponential backoff."""
        
        retry_tasks = []
        
        with dag:
            for i in range(max_retries):
                delay = backoff_factor ** i
                
                retry = PythonOperator(
                    task_id=f'retry_{i+1}_after_{delay}m',
                    python_callable=lambda: time.sleep(delay * 60),
                    trigger_rule=TriggerRule.ONE_FAILED if i > 0 else TriggerRule.ALL_FAILED,
                    retries=0  # Don't retry the retry task
                )
                
                retry_tasks.append(retry)
            
            # Chain retries
            main_task >> retry_tasks[0]
            
            for i in range(len(retry_tasks) - 1):
                retry_tasks[i] >> retry_tasks[i + 1]
            
            # Final success/failure handlers
            final_success = DummyOperator(
                task_id='all_retries_exhausted',
                trigger_rule=TriggerRule.ALL_FAILED
            )
            
            retry_tasks[-1] >> final_success
    
    @staticmethod
    def create_parallel_processor(
        dag: DAG,
        items: List[Any],
        process_func: Callable,
        batch_size: int = 10
    ):
        """Create parallel processing pattern with batching."""
        
        with dag:
            start = DummyOperator(task_id='parallel_start')
            end = DummyOperator(
                task_id='parallel_end',
                trigger_rule=TriggerRule.ALL_DONE
            )
            
            # Create batches
            batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
            
            batch_tasks = []
            for i, batch in enumerate(batches):
                batch_task = PythonOperator(
                    task_id=f'process_batch_{i}',
                    python_callable=process_func,
                    op_kwargs={'batch': batch, 'batch_id': i}
                )
                
                batch_tasks.append(batch_task)
                start >> batch_task >> end
            
            return start, batch_tasks, end
    
    @staticmethod
    def create_data_quality_gate(
        dag: DAG,
        data_task: Any,
        quality_checks: List[Dict[str, Any]]
    ):
        """Create data quality gate pattern."""
        
        with dag:
            # Quality check group
            with TaskGroup('data_quality_checks') as quality_group:
                check_results = []
                
                for check in quality_checks:
                    check_task = ShortCircuitOperator(
                        task_id=f"check_{check['name']}",
                        python_callable=check['function'],
                        op_kwargs=check.get('kwargs', {})
                    )
                    
                    check_results.append(check_task)
                
                # Aggregate results
                aggregate = PythonOperator(
                    task_id='aggregate_quality_results',
                    python_callable=lambda **context: all(
                        context['task_instance'].xcom_pull(task_ids=t.task_id)
                        for t in check_results
                    ),
                    trigger_rule=TriggerRule.ALL_SUCCESS
                )
                
                check_results >> aggregate
            
            # Gate task - only proceeds if quality checks pass
            gate = ShortCircuitOperator(
                task_id='quality_gate',
                python_callable=lambda **context: context['task_instance'].xcom_pull(
                    task_ids='data_quality_checks.aggregate_quality_results'
                )
            )
            
            data_task >> quality_group >> gate
            
            return gate

# ==================== Dynamic Dependency Builder ====================

class DynamicDependencyBuilder:
    """Build dependencies dynamically based on configuration or data."""
    
    def __init__(self, dag: DAG):
        """Initialize with DAG."""
        self.dag = dag
        self.tasks = {}
        self.logger = logging.getLogger(__name__)
    
    def add_task(self, task_id: str, operator: Any) -> Any:
        """Add task to the builder."""
        self.tasks[task_id] = operator
        return operator
    
    def build_from_config(self, config: Dict[str, Any]):
        """Build dependencies from configuration."""
        
        # Create tasks
        for task_config in config.get('tasks', []):
            task_id = task_config['id']
            task_type = task_config.get('type', 'dummy')
            
            if task_type == 'python':
                task = PythonOperator(
                    task_id=task_id,
                    python_callable=eval(task_config.get('callable', 'lambda: None')),
                    dag=self.dag
                )
            elif task_type == 'bash':
                task = BashOperator(
                    task_id=task_id,
                    bash_command=task_config.get('command', 'echo "No command"'),
                    dag=self.dag
                )
            else:
                task = DummyOperator(task_id=task_id, dag=self.dag)
            
            self.tasks[task_id] = task
        
        # Set dependencies
        for dep in config.get('dependencies', []):
            upstream_id = dep['upstream']
            downstream_id = dep['downstream']
            
            if upstream_id in self.tasks and downstream_id in self.tasks:
                self.tasks[upstream_id] >> self.tasks[downstream_id]
            else:
                self.logger.warning(f"Invalid dependency: {upstream_id} >> {downstream_id}")
    
    def build_from_graph(self, graph_definition: Dict[str, List[str]]):
        """Build dependencies from graph definition."""
        
        # Create tasks if they don't exist
        all_tasks = set(graph_definition.keys())
        for deps in graph_definition.values():
            all_tasks.update(deps)
        
        for task_id in all_tasks:
            if task_id not in self.tasks:
                self.tasks[task_id] = DummyOperator(task_id=task_id, dag=self.dag)
        
        # Set dependencies
        for upstream_id, downstream_ids in graph_definition.items():
            for downstream_id in downstream_ids:
                self.tasks[upstream_id] >> self.tasks[downstream_id]
    
    def build_dynamic_chain(self, task_count: int, pattern: str = 'linear'):
        """Build dynamic chain of tasks."""
        
        tasks = []
        for i in range(task_count):
            task = DummyOperator(
                task_id=f'dynamic_task_{i}',
                dag=self.dag
            )
            tasks.append(task)
            self.tasks[f'dynamic_task_{i}'] = task
        
        if pattern == 'linear':
            chain(*tasks)
        elif pattern == 'parallel':
            start = DummyOperator(task_id='parallel_start', dag=self.dag)
            end = DummyOperator(task_id='parallel_end', dag=self.dag)
            start >> tasks >> end
        elif pattern == 'diamond':
            if len(tasks) >= 4:
                tasks[0] >> [tasks[1], tasks[2]] >> tasks[3]
    
    def optimize_dependencies(self):
        """Optimize dependency structure for parallelism."""
        
        # Analyze current dependencies
        analyzer = DependencyAnalyzer(self.dag)
        report = analyzer.get_dependency_report()
        
        # Identify optimization opportunities
        optimizations = []
        
        # Check for unnecessary sequential dependencies
        for path in analyzer.get_execution_paths():
            if len(path) > 5:
                optimizations.append({
                    'type': 'long_chain',
                    'path': path,
                    'suggestion': 'Consider parallelizing independent tasks'
                })
        
        # Check for bottlenecks
        for bottleneck in report['bottlenecks']:
            optimizations.append({
                'type': 'bottleneck',
                'task': bottleneck,
                'suggestion': 'Consider splitting or optimizing this task'
            })
        
        return optimizations

# ==================== Cross-DAG Dependencies ====================

class CrossDAGDependencyManager:
    """Manage dependencies between different DAGs."""
    
    @staticmethod
    def create_external_sensor(
        dag: DAG,
        external_dag_id: str,
        external_task_id: str,
        execution_delta: timedelta = timedelta(0),
        timeout: int = 3600,
        poke_interval: int = 60
    ) -> ExternalTaskSensor:
        """Create sensor to wait for external DAG/task."""
        
        sensor = ExternalTaskSensor(
            task_id=f'wait_for_{external_dag_id}_{external_task_id}',
            external_dag_id=external_dag_id,
            external_task_id=external_task_id,
            execution_delta=execution_delta,
            timeout=timeout,
            poke_interval=poke_interval,
            mode='reschedule',  # Free up worker slot while waiting
            dag=dag
        )
        
        return sensor
    
    @staticmethod
    def create_external_marker(
        dag: DAG,
        external_dag_id: str,
        external_task_id: str
    ) -> ExternalTaskMarker:
        """Create marker for external DAG dependency."""
        
        marker = ExternalTaskMarker(
            task_id=f'marker_for_{external_dag_id}_{external_task_id}',
            external_dag_id=external_dag_id,
            external_task_id=external_task_id,
            dag=dag
        )
        
        return marker
    
    @staticmethod
    def create_dag_dependency_chain(
        parent_dag: DAG,
        child_dags: List[str],
        wait_for_completion: bool = True
    ):
        """Create chain of DAG dependencies."""
        
        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
        
        trigger_tasks = []
        
        with parent_dag:
            for i, child_dag_id in enumerate(child_dags):
                trigger = TriggerDagRunOperator(
                    task_id=f'trigger_{child_dag_id}',
                    trigger_dag_id=child_dag_id,
                    wait_for_completion=wait_for_completion,
                    poke_interval=30 if wait_for_completion else 0
                )
                
                trigger_tasks.append(trigger)
                
                # Chain triggers
                if i > 0:
                    trigger_tasks[i-1] >> trigger_tasks[i]
        
        return trigger_tasks

# ==================== Trigger Rule Examples ====================

@dag(
    'trigger_rule_examples',
    default_args={
        'owner': 'data_team',
        'retries': 1,
    },
    description='Examples of different trigger rules',
    schedule_interval='@daily',
    start_date=days_ago(1),
    tags=['examples', 'trigger_rules'],
)
def trigger_rule_examples():
    """Demonstrate various trigger rules."""
    
    @task
    def random_fail() -> bool:
        """Task that randomly fails."""
        if random.random() > 0.5:
            raise AirflowException("Random failure!")
        return True
    
    @task
    def always_succeed() -> str:
        """Task that always succeeds."""
        return "Success!"
    
    # Create tasks with different behaviors
    task_1 = random_fail.override(task_id='random_task_1')()
    task_2 = random_fail.override(task_id='random_task_2')()
    task_3 = always_succeed.override(task_id='always_succeed')()
    
    # All Success (default) - runs only if all upstream succeed
    @task(trigger_rule=TriggerRule.ALL_SUCCESS)
    def all_success_task():
        print("All upstream tasks succeeded")
    
    # All Failed - runs only if all upstream failed
    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def all_failed_task():
        print("All upstream tasks failed")
    
    # All Done - runs when all upstream are done regardless of status
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def all_done_task():
        print("All upstream tasks completed (success or failure)")
    
    # One Success - runs if at least one upstream succeeded
    @task(trigger_rule=TriggerRule.ONE_SUCCESS)
    def one_success_task():
        print("At least one upstream task succeeded")
    
    # One Failed - runs if at least one upstream failed
    @task(trigger_rule=TriggerRule.ONE_FAILED)
    def one_failed_task():
        print("At least one upstream task failed")
    
    # None Failed - runs if no upstream tasks failed (success or skip)
    @task(trigger_rule=TriggerRule.NONE_FAILED)
    def none_failed_task():
        print("No upstream tasks failed")
    
    # None Failed Min One Success - no failures and at least one success
    @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
    def none_failed_min_one_success_task():
        print("No failures and at least one success")
    
    # Set up dependencies
    [task_1, task_2, task_3] >> all_success_task()
    [task_1, task_2, task_3] >> all_failed_task()
    [task_1, task_2, task_3] >> all_done_task()
    [task_1, task_2, task_3] >> one_success_task()
    [task_1, task_2, task_3] >> one_failed_task()
    [task_1, task_2, task_3] >> none_failed_task()
    [task_1, task_2, task_3] >> none_failed_min_one_success_task()

trigger_dag = trigger_rule_examples()

# ==================== Complex Dependency Example ====================

@dag(
    'complex_dependencies',
    default_args={
        'owner': 'data_team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    },
    description='Complex dependency patterns example',
    schedule_interval='@daily',
    start_date=days_ago(1),
    tags=['complex', 'dependencies'],
)
def complex_dependencies_dag():
    """DAG demonstrating complex dependency patterns."""
    
    # Start tasks
    @task
    def initialize():
        """Initialize pipeline."""
        print("Initializing complex pipeline")
        return {'status': 'initialized', 'timestamp': datetime.now().isoformat()}
    
    # Data extraction tasks (parallel)
    @task
    def extract_source_a():
        """Extract from source A."""
        time.sleep(random.uniform(1, 3))
        return {'source': 'A', 'records': random.randint(1000, 5000)}
    
    @task
    def extract_source_b():
        """Extract from source B."""
        time.sleep(random.uniform(1, 3))
        return {'source': 'B', 'records': random.randint(1000, 5000)}
    
    @task
    def extract_source_c():
        """Extract from source C."""
        time.sleep(random.uniform(1, 3))
        return {'source': 'C', 'records': random.randint(1000, 5000)}
    
    # Quality check (branching)
    @task.branch
    def quality_check(data: List[Dict]) -> str:
        """Check data quality and decide path."""
        total_records = sum(d['records'] for d in data)
        
        if total_records > 10000:
            return 'high_volume_processing'
        elif total_records > 5000:
            return 'normal_processing'
        else:
            return 'low_volume_processing'
    
    # Processing paths
    @task
    def high_volume_processing(data: List[Dict]):
        """Process high volume data."""
        print(f"High volume processing: {sum(d['records'] for d in data)} records")
        return {'processing': 'high_volume', 'status': 'complete'}
    
    @task
    def normal_processing(data: List[Dict]):
        """Process normal volume data."""
        print(f"Normal processing: {sum(d['records'] for d in data)} records")
        return {'processing': 'normal', 'status': 'complete'}
    
    @task
    def low_volume_processing(data: List[Dict]):
        """Process low volume data."""
        print(f"Low volume processing: {sum(d['records'] for d in data)} records")
        return {'processing': 'low_volume', 'status': 'complete'}
    
    # Convergence point
    @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
    def merge_results(results: Optional[Dict] = None):
        """Merge results from any processing path."""
        print(f"Merging results: {results}")
        return {'merged': True, 'timestamp': datetime.now().isoformat()}
    
    # Final tasks
    @task
    def generate_report(merge_data: Dict):
        """Generate final report."""
        print(f"Generating report: {merge_data}")
        return {'report': 'generated'}
    
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup():
        """Cleanup resources."""
        print("Cleaning up resources")
        return {'cleanup': 'complete'}
    
    # Build the complex dependency graph
    init = initialize()
    
    # Parallel extraction
    source_a = extract_source_a()
    source_b = extract_source_b()
    source_c = extract_source_c()
    
    # Wait for all extractions
    extracted_data = [source_a, source_b, source_c]
    
    # Quality check branches
    check = quality_check(extracted_data)
    
    # Processing paths
    high_vol = high_volume_processing(extracted_data)
    high_vol.task_id = 'high_volume_processing'
    
    normal = normal_processing(extracted_data)
    normal.task_id = 'normal_processing'
    
    low_vol = low_volume_processing(extracted_data)
    low_vol.task_id = 'low_volume_processing'
    
    # Convergence
    merge = merge_results()
    
    # Final steps
    report = generate_report(merge)
    clean = cleanup()
    
    # Set up dependencies
    init >> [source_a, source_b, source_c]
    extracted_data >> check
    check >> [high_vol, normal, low_vol]
    [high_vol, normal, low_vol] >> merge
    merge >> report
    [merge, report] >> clean

complex_dag = complex_dependencies_dag()

# ==================== Dependency Helper Functions ====================

class DependencyHelpers:
    """Helper functions for managing dependencies."""
    
    @staticmethod
    def create_linear_chain(tasks: List[Any]):
        """Create linear dependency chain."""
        for i in range(len(tasks) - 1):
            tasks[i] >> tasks[i + 1]
    
    @staticmethod
    def create_parallel_groups(groups: List[List[Any]]):
        """Create parallel task groups with dependencies."""
        for i in range(len(groups) - 1):
            for task in groups[i]:
                for next_task in groups[i + 1]:
                    task >> next_task
    
    @staticmethod
    def create_fan_out_in(source: Any, processors: List[Any], sink: Any):
        """Create fan-out/fan-in pattern."""
        source >> processors >> sink
    
    @staticmethod
    def create_conditional_skip(condition_task: Any, skip_tasks: List[Any]):
        """Create conditional skip pattern."""
        for task in skip_tasks:
            task.trigger_rule = TriggerRule.ALL_DONE
            condition_task >> task

# Example usage
if __name__ == "__main__":
    print("๐Ÿ”— Task Dependencies Examples\n")
    
    # Example 1: Basic dependency patterns
    print("1๏ธโƒฃ Basic Dependency Patterns:")
    patterns = [
        ("Sequential", "task_a >> task_b >> task_c"),
        ("Parallel", "[task_a, task_b, task_c]"),
        ("Fan-out", "task_a >> [task_b, task_c, task_d]"),
        ("Fan-in", "[task_a, task_b, task_c] >> task_d"),
        ("Diamond", "task_a >> [task_b, task_c] >> task_d")
    ]
    
    for pattern, example in patterns:
        print(f"   {pattern}: {example}")
    
    # Example 2: Trigger rules
    print("\n2๏ธโƒฃ Trigger Rules:")
    rules = [
        ("ALL_SUCCESS", "All upstream tasks succeeded (default)"),
        ("ALL_FAILED", "All upstream tasks failed"),
        ("ALL_DONE", "All upstream tasks completed"),
        ("ONE_SUCCESS", "At least one upstream succeeded"),
        ("ONE_FAILED", "At least one upstream failed"),
        ("NONE_FAILED", "No upstream tasks failed"),
        ("NONE_FAILED_MIN_ONE_SUCCESS", "No failures and at least one success"),
        ("NONE_SKIPPED", "No upstream tasks were skipped"),
        ("ALWAYS", "Always run regardless of upstream")
    ]
    
    for rule, description in rules:
        print(f"   {rule}: {description}")
    
    # Example 3: Create test DAG for analysis
    test_dag = DAG(
        'test_dependencies',
        default_args={'owner': 'test'},
        start_date=days_ago(1),
        schedule_interval='@daily'
    )
    
    # Build complex dependencies
    with test_dag:
        start = DummyOperator(task_id='start')
        
        # Parallel extraction
        extract_1 = DummyOperator(task_id='extract_1')
        extract_2 = DummyOperator(task_id='extract_2')
        extract_3 = DummyOperator(task_id='extract_3')
        
        # Processing
        process = DummyOperator(task_id='process')
        
        # Quality checks (parallel)
        check_1 = DummyOperator(task_id='quality_check_1')
        check_2 = DummyOperator(task_id='quality_check_2')
        
        # Final tasks
        report = DummyOperator(task_id='generate_report')
        notify = DummyOperator(task_id='send_notification')
        cleanup = DummyOperator(
            task_id='cleanup',
            trigger_rule=TriggerRule.ALL_DONE
        )
        
        # Set dependencies
        start >> [extract_1, extract_2, extract_3] >> process
        process >> [check_1, check_2] >> report
        report >> [notify, cleanup]
    
    # Example 4: Analyze dependencies
    print("\n3๏ธโƒฃ Dependency Analysis:")
    
    analyzer = DependencyAnalyzer(test_dag)
    report = analyzer.get_dependency_report()
    
    print(f"   Total tasks: {report['total_tasks']}")
    print(f"   Total dependencies: {report['total_dependencies']}")
    print(f"   Critical path length: {report['critical_path_length']}")
    print(f"   Max parallelism: {report['max_parallelism']}")
    print(f"   Is acyclic: {report['is_acyclic']}")
    
    # Example 5: Dynamic dependencies
    print("\n4๏ธโƒฃ Dynamic Dependency Building:")
    
    dynamic_dag = DAG(
        'dynamic_deps',
        default_args={'owner': 'test'},
        start_date=days_ago(1)
    )
    
    builder = DynamicDependencyBuilder(dynamic_dag)
    
    # Build from configuration
    config = {
        'tasks': [
            {'id': 'task_a', 'type': 'dummy'},
            {'id': 'task_b', 'type': 'python'},
            {'id': 'task_c', 'type': 'bash', 'command': 'echo hello'}
        ],
        'dependencies': [
            {'upstream': 'task_a', 'downstream': 'task_b'},
            {'upstream': 'task_b', 'downstream': 'task_c'}
        ]
    }
    
    builder.build_from_config(config)
    print("   Built DAG from configuration")
    
    # Example 6: Cross-DAG dependencies
    print("\n5๏ธโƒฃ Cross-DAG Dependencies:")
    
    print("   # Wait for external DAG")
    print("   sensor = ExternalTaskSensor(")
    print("       external_dag_id='other_dag',")
    print("       external_task_id='final_task',")
    print("       mode='reschedule'")
    print("   )")
    
    # Example 7: Dependency optimization
    print("\n6๏ธโƒฃ Dependency Optimization:")
    
    optimizations = builder.optimize_dependencies()
    
    if optimizations:
        for opt in optimizations[:3]:  # Show first 3
            print(f"   {opt['type']}: {opt['suggestion']}")
    else:
        print("   No optimization opportunities found")
    
    # Example 8: Helper functions
    print("\n7๏ธโƒฃ Helper Functions:")
    
    helpers = [
        ("chain()", "Create linear chain of tasks"),
        ("cross_downstream()", "Create cross product of dependencies"),
        ("chain_linear()", "Custom linear chaining"),
        ("create_fan_out_in()", "Fan-out/fan-in pattern")
    ]
    
    for func, description in helpers:
        print(f"   {func}: {description}")
    
    # Example 9: Best practices
    print("\n8๏ธโƒฃ Dependency Best Practices:")
    
    practices = [
        "๐ŸŽฏ Keep dependencies simple and clear",
        "โšก Maximize parallelism where possible",
        "๐Ÿ”„ Use appropriate trigger rules",
        "๐Ÿ“Š Avoid deep dependency chains",
        "๐Ÿ”€ Break complex workflows into sub-DAGs",
        "โฐ Consider execution time in dependencies",
        "๐Ÿท๏ธ Use task groups for logical grouping",
        "๐Ÿ“ Document complex dependency logic",
        "๐Ÿงช Test dependency paths thoroughly",
        "๐Ÿ“ˆ Monitor and optimize bottlenecks"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 10: Common pitfalls
    print("\n9๏ธโƒฃ Common Dependency Pitfalls:")
    
    pitfalls = [
        ("Circular dependencies", "Ensure DAG remains acyclic"),
        ("Over-serialization", "Don't chain tasks that could run in parallel"),
        ("Incorrect trigger rules", "Understand when tasks should run"),
        ("Missing dependencies", "Explicitly define all dependencies"),
        ("Bottleneck tasks", "Avoid single tasks blocking many others"),
        ("Complex branching", "Keep conditional logic simple")
    ]
    
    for pitfall, solution in pitfalls:
        print(f"   โŒ {pitfall}: {solution}")
    
    print("\nโœ… Task dependencies demonstration complete!")

Key Takeaways and Best Practices ๐ŸŽฏ

Task Dependency Best Practices ๐Ÿ“‹

Pro Tip: Think of task dependencies as the nervous system of your workflow - they control how signals (data and control) flow through your pipeline. Always strive for maximum parallelism while maintaining logical correctness. Use the simplest dependency pattern that meets your needs - linear chains for sequential processing, fan-out/fan-in for parallel processing, and branching for conditional logic. Master trigger rules - they're powerful tools for handling complex scenarios like partial failures or optional paths. Use ALL_DONE for cleanup tasks that must run regardless of success/failure. Use NONE_FAILED_MIN_ONE_SUCCESS for convergence points in branching workflows. Implement data quality gates to prevent bad data from propagating through your pipeline. For complex workflows, use task groups to organize related tasks visually and logically. Analyze your dependency graph regularly - look for bottlenecks where many tasks depend on a single task, and optimize by parallelizing or splitting the bottleneck. Use ExternalTaskSensor for cross-DAG dependencies but be careful with execution timing. Build dependencies dynamically when dealing with variable workloads. Monitor critical paths - the longest dependency chain determines minimum execution time. Remember: well-designed dependencies make workflows resilient, efficient, and maintainable!

Mastering task dependencies enables you to build sophisticated workflows that maximize performance while maintaining reliability. You can now create complex dependency patterns, implement conditional execution logic, coordinate cross-DAG workflows, analyze and optimize dependency graphs, and handle failures gracefully with trigger rules. Whether you're building data pipelines, ML workflows, or business process automation, these dependency management skills are essential for production-grade orchestration! ๐Ÿš€