๐ Task Dependencies: Master Complex Workflow Relationships
Task dependencies are the backbone of workflow orchestration - they define the execution order, parallelism opportunities, and conditional logic that transform individual tasks into sophisticated pipelines. Like designing a complex supply chain, mastering task dependencies allows you to create workflows that adapt to data, handle failures gracefully, and maximize resource utilization through intelligent parallelization. Whether you're building simple sequential pipelines or complex conditional workflows with hundreds of interdependent tasks, understanding dependency patterns is crucial for effective orchestration. Let's explore the comprehensive world of task dependency management! ๐ฏ
The Task Dependency Architecture
Think of task dependencies as the neural pathways of your workflow - they determine how data and control flow through your pipeline, when tasks can run in parallel, and how failures propagate. Using Airflow's rich dependency system, you can create linear chains, parallel branches, conditional paths, and complex graphs that adapt to runtime conditions. Understanding trigger rules, sensors, and advanced dependency patterns is essential for building resilient, efficient workflows!
Real-World Scenario: The Intelligent Pipeline System ๐ง
You're building an intelligent pipeline system that processes data through multiple stages with complex dependencies, handles conditional execution based on data quality, manages parallel processing for performance, coordinates cross-system dependencies, implements intelligent retry and recovery strategies, optimizes resource usage through pools and priorities, adapts to varying data volumes dynamically, and maintains clear execution paths for debugging. Your system must handle thousands of tasks efficiently, provide clear visibility into execution flow, recover from partial failures, and scale horizontally. Let's build a comprehensive dependency management system!
# Advanced Task Dependency Patterns and Management
# pip install apache-airflow networkx matplotlib
import os
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum
import logging
import random
import time
# Airflow imports
from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.external_task import ExternalTaskSensor, ExternalTaskMarker
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.sensors.date_time import DateTimeSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain, cross_downstream
from airflow.models import Variable
from airflow.exceptions import AirflowException, AirflowSkipException
# For dependency visualization
import networkx as nx
import matplotlib.pyplot as plt
# ==================== Dependency Analyzer ====================
class DependencyAnalyzer:
"""Analyze and visualize task dependencies."""
def __init__(self, dag: DAG):
"""Initialize with a DAG."""
self.dag = dag
self.graph = self._build_dependency_graph()
self.logger = logging.getLogger(__name__)
def _build_dependency_graph(self) -> nx.DiGraph:
"""Build NetworkX graph from DAG dependencies."""
graph = nx.DiGraph()
# Add nodes
for task in self.dag.tasks:
graph.add_node(
task.task_id,
task_type=type(task).__name__,
trigger_rule=task.trigger_rule.value if hasattr(task.trigger_rule, 'value') else str(task.trigger_rule),
retries=task.retries,
retry_delay=str(task.retry_delay)
)
# Add edges
for task in self.dag.tasks:
for downstream_task in task.downstream_list:
graph.add_edge(task.task_id, downstream_task.task_id)
return graph
def get_execution_paths(self) -> List[List[str]]:
"""Get all possible execution paths through the DAG."""
paths = []
# Find start nodes (no predecessors)
start_nodes = [n for n in self.graph.nodes() if self.graph.in_degree(n) == 0]
# Find end nodes (no successors)
end_nodes = [n for n in self.graph.nodes() if self.graph.out_degree(n) == 0]
# Find all paths from start to end nodes
for start in start_nodes:
for end in end_nodes:
try:
all_paths = list(nx.all_simple_paths(self.graph, start, end))
paths.extend(all_paths)
except nx.NetworkXNoPath:
continue
return paths
def get_critical_path(self) -> List[str]:
"""Get the critical path (longest path) through the DAG."""
try:
return nx.dag_longest_path(self.graph)
except nx.NetworkXError:
self.logger.warning("Could not determine critical path")
return []
def get_parallel_groups(self) -> List[Set[str]]:
"""Identify groups of tasks that can run in parallel."""
parallel_groups = []
# Use topological generations to find parallel tasks
for generation in nx.topological_generations(self.graph):
if len(generation) > 1:
parallel_groups.append(set(generation))
return parallel_groups
def detect_bottlenecks(self) -> List[str]:
"""Identify potential bottleneck tasks."""
bottlenecks = []
for node in self.graph.nodes():
# High fan-in (many dependencies)
if self.graph.in_degree(node) > 3:
bottlenecks.append(node)
# High fan-out (many dependents)
if self.graph.out_degree(node) > 3:
bottlenecks.append(node)
return list(set(bottlenecks))
def visualize_dependencies(self, output_path: str = None):
"""Visualize the dependency graph."""
plt.figure(figsize=(12, 8))
# Calculate layout
pos = nx.spring_layout(self.graph, k=2, iterations=50)
# Draw nodes
nx.draw_networkx_nodes(
self.graph, pos,
node_color='lightblue',
node_size=1000,
alpha=0.9
)
# Draw edges
nx.draw_networkx_edges(
self.graph, pos,
edge_color='gray',
arrows=True,
arrowsize=20,
arrowstyle='->'
)
# Draw labels
nx.draw_networkx_labels(
self.graph, pos,
font_size=8,
font_weight='bold'
)
plt.title(f"Task Dependencies: {self.dag.dag_id}")
plt.axis('off')
if output_path:
plt.savefig(output_path)
else:
plt.show()
plt.close()
def get_dependency_report(self) -> Dict[str, Any]:
"""Generate comprehensive dependency analysis report."""
return {
'total_tasks': len(self.dag.tasks),
'total_dependencies': self.graph.number_of_edges(),
'critical_path': self.get_critical_path(),
'critical_path_length': len(self.get_critical_path()),
'parallel_groups': [list(g) for g in self.get_parallel_groups()],
'max_parallelism': max(len(g) for g in self.get_parallel_groups()) if self.get_parallel_groups() else 0,
'bottlenecks': self.detect_bottlenecks(),
'execution_paths': len(self.get_execution_paths()),
'is_acyclic': nx.is_directed_acyclic_graph(self.graph)
}
# ==================== Advanced Dependency Patterns ====================
class DependencyPatterns:
"""Advanced dependency pattern implementations."""
@staticmethod
def create_conditional_chain(
dag: DAG,
condition_func: Callable,
success_tasks: List,
failure_tasks: List
):
"""Create conditional execution chain."""
with dag:
# Condition checker
condition = BranchPythonOperator(
task_id='check_condition',
python_callable=condition_func
)
# Success path
success_start = DummyOperator(
task_id='success_path_start'
)
success_end = DummyOperator(
task_id='success_path_end',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
# Failure path
failure_start = DummyOperator(
task_id='failure_path_start'
)
failure_end = DummyOperator(
task_id='failure_path_end',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
# Join paths
join = DummyOperator(
task_id='join_paths',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
# Set up dependencies
condition >> [success_start, failure_start]
if success_tasks:
success_start >> success_tasks[0]
chain(*success_tasks)
success_tasks[-1] >> success_end
else:
success_start >> success_end
if failure_tasks:
failure_start >> failure_tasks[0]
chain(*failure_tasks)
failure_tasks[-1] >> failure_end
else:
failure_start >> failure_end
[success_end, failure_end] >> join
return condition, join
@staticmethod
def create_retry_pattern(
dag: DAG,
main_task: Any,
max_retries: int = 3,
backoff_factor: int = 2
):
"""Create sophisticated retry pattern with exponential backoff."""
retry_tasks = []
with dag:
for i in range(max_retries):
delay = backoff_factor ** i
retry = PythonOperator(
task_id=f'retry_{i+1}_after_{delay}m',
python_callable=lambda: time.sleep(delay * 60),
trigger_rule=TriggerRule.ONE_FAILED if i > 0 else TriggerRule.ALL_FAILED,
retries=0 # Don't retry the retry task
)
retry_tasks.append(retry)
# Chain retries
main_task >> retry_tasks[0]
for i in range(len(retry_tasks) - 1):
retry_tasks[i] >> retry_tasks[i + 1]
# Final success/failure handlers
final_success = DummyOperator(
task_id='all_retries_exhausted',
trigger_rule=TriggerRule.ALL_FAILED
)
retry_tasks[-1] >> final_success
@staticmethod
def create_parallel_processor(
dag: DAG,
items: List[Any],
process_func: Callable,
batch_size: int = 10
):
"""Create parallel processing pattern with batching."""
with dag:
start = DummyOperator(task_id='parallel_start')
end = DummyOperator(
task_id='parallel_end',
trigger_rule=TriggerRule.ALL_DONE
)
# Create batches
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
batch_tasks = []
for i, batch in enumerate(batches):
batch_task = PythonOperator(
task_id=f'process_batch_{i}',
python_callable=process_func,
op_kwargs={'batch': batch, 'batch_id': i}
)
batch_tasks.append(batch_task)
start >> batch_task >> end
return start, batch_tasks, end
@staticmethod
def create_data_quality_gate(
dag: DAG,
data_task: Any,
quality_checks: List[Dict[str, Any]]
):
"""Create data quality gate pattern."""
with dag:
# Quality check group
with TaskGroup('data_quality_checks') as quality_group:
check_results = []
for check in quality_checks:
check_task = ShortCircuitOperator(
task_id=f"check_{check['name']}",
python_callable=check['function'],
op_kwargs=check.get('kwargs', {})
)
check_results.append(check_task)
# Aggregate results
aggregate = PythonOperator(
task_id='aggregate_quality_results',
python_callable=lambda **context: all(
context['task_instance'].xcom_pull(task_ids=t.task_id)
for t in check_results
),
trigger_rule=TriggerRule.ALL_SUCCESS
)
check_results >> aggregate
# Gate task - only proceeds if quality checks pass
gate = ShortCircuitOperator(
task_id='quality_gate',
python_callable=lambda **context: context['task_instance'].xcom_pull(
task_ids='data_quality_checks.aggregate_quality_results'
)
)
data_task >> quality_group >> gate
return gate
# ==================== Dynamic Dependency Builder ====================
class DynamicDependencyBuilder:
"""Build dependencies dynamically based on configuration or data."""
def __init__(self, dag: DAG):
"""Initialize with DAG."""
self.dag = dag
self.tasks = {}
self.logger = logging.getLogger(__name__)
def add_task(self, task_id: str, operator: Any) -> Any:
"""Add task to the builder."""
self.tasks[task_id] = operator
return operator
def build_from_config(self, config: Dict[str, Any]):
"""Build dependencies from configuration."""
# Create tasks
for task_config in config.get('tasks', []):
task_id = task_config['id']
task_type = task_config.get('type', 'dummy')
if task_type == 'python':
task = PythonOperator(
task_id=task_id,
python_callable=eval(task_config.get('callable', 'lambda: None')),
dag=self.dag
)
elif task_type == 'bash':
task = BashOperator(
task_id=task_id,
bash_command=task_config.get('command', 'echo "No command"'),
dag=self.dag
)
else:
task = DummyOperator(task_id=task_id, dag=self.dag)
self.tasks[task_id] = task
# Set dependencies
for dep in config.get('dependencies', []):
upstream_id = dep['upstream']
downstream_id = dep['downstream']
if upstream_id in self.tasks and downstream_id in self.tasks:
self.tasks[upstream_id] >> self.tasks[downstream_id]
else:
self.logger.warning(f"Invalid dependency: {upstream_id} >> {downstream_id}")
def build_from_graph(self, graph_definition: Dict[str, List[str]]):
"""Build dependencies from graph definition."""
# Create tasks if they don't exist
all_tasks = set(graph_definition.keys())
for deps in graph_definition.values():
all_tasks.update(deps)
for task_id in all_tasks:
if task_id not in self.tasks:
self.tasks[task_id] = DummyOperator(task_id=task_id, dag=self.dag)
# Set dependencies
for upstream_id, downstream_ids in graph_definition.items():
for downstream_id in downstream_ids:
self.tasks[upstream_id] >> self.tasks[downstream_id]
def build_dynamic_chain(self, task_count: int, pattern: str = 'linear'):
"""Build dynamic chain of tasks."""
tasks = []
for i in range(task_count):
task = DummyOperator(
task_id=f'dynamic_task_{i}',
dag=self.dag
)
tasks.append(task)
self.tasks[f'dynamic_task_{i}'] = task
if pattern == 'linear':
chain(*tasks)
elif pattern == 'parallel':
start = DummyOperator(task_id='parallel_start', dag=self.dag)
end = DummyOperator(task_id='parallel_end', dag=self.dag)
start >> tasks >> end
elif pattern == 'diamond':
if len(tasks) >= 4:
tasks[0] >> [tasks[1], tasks[2]] >> tasks[3]
def optimize_dependencies(self):
"""Optimize dependency structure for parallelism."""
# Analyze current dependencies
analyzer = DependencyAnalyzer(self.dag)
report = analyzer.get_dependency_report()
# Identify optimization opportunities
optimizations = []
# Check for unnecessary sequential dependencies
for path in analyzer.get_execution_paths():
if len(path) > 5:
optimizations.append({
'type': 'long_chain',
'path': path,
'suggestion': 'Consider parallelizing independent tasks'
})
# Check for bottlenecks
for bottleneck in report['bottlenecks']:
optimizations.append({
'type': 'bottleneck',
'task': bottleneck,
'suggestion': 'Consider splitting or optimizing this task'
})
return optimizations
# ==================== Cross-DAG Dependencies ====================
class CrossDAGDependencyManager:
"""Manage dependencies between different DAGs."""
@staticmethod
def create_external_sensor(
dag: DAG,
external_dag_id: str,
external_task_id: str,
execution_delta: timedelta = timedelta(0),
timeout: int = 3600,
poke_interval: int = 60
) -> ExternalTaskSensor:
"""Create sensor to wait for external DAG/task."""
sensor = ExternalTaskSensor(
task_id=f'wait_for_{external_dag_id}_{external_task_id}',
external_dag_id=external_dag_id,
external_task_id=external_task_id,
execution_delta=execution_delta,
timeout=timeout,
poke_interval=poke_interval,
mode='reschedule', # Free up worker slot while waiting
dag=dag
)
return sensor
@staticmethod
def create_external_marker(
dag: DAG,
external_dag_id: str,
external_task_id: str
) -> ExternalTaskMarker:
"""Create marker for external DAG dependency."""
marker = ExternalTaskMarker(
task_id=f'marker_for_{external_dag_id}_{external_task_id}',
external_dag_id=external_dag_id,
external_task_id=external_task_id,
dag=dag
)
return marker
@staticmethod
def create_dag_dependency_chain(
parent_dag: DAG,
child_dags: List[str],
wait_for_completion: bool = True
):
"""Create chain of DAG dependencies."""
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_tasks = []
with parent_dag:
for i, child_dag_id in enumerate(child_dags):
trigger = TriggerDagRunOperator(
task_id=f'trigger_{child_dag_id}',
trigger_dag_id=child_dag_id,
wait_for_completion=wait_for_completion,
poke_interval=30 if wait_for_completion else 0
)
trigger_tasks.append(trigger)
# Chain triggers
if i > 0:
trigger_tasks[i-1] >> trigger_tasks[i]
return trigger_tasks
# ==================== Trigger Rule Examples ====================
@dag(
'trigger_rule_examples',
default_args={
'owner': 'data_team',
'retries': 1,
},
description='Examples of different trigger rules',
schedule_interval='@daily',
start_date=days_ago(1),
tags=['examples', 'trigger_rules'],
)
def trigger_rule_examples():
"""Demonstrate various trigger rules."""
@task
def random_fail() -> bool:
"""Task that randomly fails."""
if random.random() > 0.5:
raise AirflowException("Random failure!")
return True
@task
def always_succeed() -> str:
"""Task that always succeeds."""
return "Success!"
# Create tasks with different behaviors
task_1 = random_fail.override(task_id='random_task_1')()
task_2 = random_fail.override(task_id='random_task_2')()
task_3 = always_succeed.override(task_id='always_succeed')()
# All Success (default) - runs only if all upstream succeed
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def all_success_task():
print("All upstream tasks succeeded")
# All Failed - runs only if all upstream failed
@task(trigger_rule=TriggerRule.ALL_FAILED)
def all_failed_task():
print("All upstream tasks failed")
# All Done - runs when all upstream are done regardless of status
@task(trigger_rule=TriggerRule.ALL_DONE)
def all_done_task():
print("All upstream tasks completed (success or failure)")
# One Success - runs if at least one upstream succeeded
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def one_success_task():
print("At least one upstream task succeeded")
# One Failed - runs if at least one upstream failed
@task(trigger_rule=TriggerRule.ONE_FAILED)
def one_failed_task():
print("At least one upstream task failed")
# None Failed - runs if no upstream tasks failed (success or skip)
@task(trigger_rule=TriggerRule.NONE_FAILED)
def none_failed_task():
print("No upstream tasks failed")
# None Failed Min One Success - no failures and at least one success
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def none_failed_min_one_success_task():
print("No failures and at least one success")
# Set up dependencies
[task_1, task_2, task_3] >> all_success_task()
[task_1, task_2, task_3] >> all_failed_task()
[task_1, task_2, task_3] >> all_done_task()
[task_1, task_2, task_3] >> one_success_task()
[task_1, task_2, task_3] >> one_failed_task()
[task_1, task_2, task_3] >> none_failed_task()
[task_1, task_2, task_3] >> none_failed_min_one_success_task()
trigger_dag = trigger_rule_examples()
# ==================== Complex Dependency Example ====================
@dag(
'complex_dependencies',
default_args={
'owner': 'data_team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
description='Complex dependency patterns example',
schedule_interval='@daily',
start_date=days_ago(1),
tags=['complex', 'dependencies'],
)
def complex_dependencies_dag():
"""DAG demonstrating complex dependency patterns."""
# Start tasks
@task
def initialize():
"""Initialize pipeline."""
print("Initializing complex pipeline")
return {'status': 'initialized', 'timestamp': datetime.now().isoformat()}
# Data extraction tasks (parallel)
@task
def extract_source_a():
"""Extract from source A."""
time.sleep(random.uniform(1, 3))
return {'source': 'A', 'records': random.randint(1000, 5000)}
@task
def extract_source_b():
"""Extract from source B."""
time.sleep(random.uniform(1, 3))
return {'source': 'B', 'records': random.randint(1000, 5000)}
@task
def extract_source_c():
"""Extract from source C."""
time.sleep(random.uniform(1, 3))
return {'source': 'C', 'records': random.randint(1000, 5000)}
# Quality check (branching)
@task.branch
def quality_check(data: List[Dict]) -> str:
"""Check data quality and decide path."""
total_records = sum(d['records'] for d in data)
if total_records > 10000:
return 'high_volume_processing'
elif total_records > 5000:
return 'normal_processing'
else:
return 'low_volume_processing'
# Processing paths
@task
def high_volume_processing(data: List[Dict]):
"""Process high volume data."""
print(f"High volume processing: {sum(d['records'] for d in data)} records")
return {'processing': 'high_volume', 'status': 'complete'}
@task
def normal_processing(data: List[Dict]):
"""Process normal volume data."""
print(f"Normal processing: {sum(d['records'] for d in data)} records")
return {'processing': 'normal', 'status': 'complete'}
@task
def low_volume_processing(data: List[Dict]):
"""Process low volume data."""
print(f"Low volume processing: {sum(d['records'] for d in data)} records")
return {'processing': 'low_volume', 'status': 'complete'}
# Convergence point
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def merge_results(results: Optional[Dict] = None):
"""Merge results from any processing path."""
print(f"Merging results: {results}")
return {'merged': True, 'timestamp': datetime.now().isoformat()}
# Final tasks
@task
def generate_report(merge_data: Dict):
"""Generate final report."""
print(f"Generating report: {merge_data}")
return {'report': 'generated'}
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup():
"""Cleanup resources."""
print("Cleaning up resources")
return {'cleanup': 'complete'}
# Build the complex dependency graph
init = initialize()
# Parallel extraction
source_a = extract_source_a()
source_b = extract_source_b()
source_c = extract_source_c()
# Wait for all extractions
extracted_data = [source_a, source_b, source_c]
# Quality check branches
check = quality_check(extracted_data)
# Processing paths
high_vol = high_volume_processing(extracted_data)
high_vol.task_id = 'high_volume_processing'
normal = normal_processing(extracted_data)
normal.task_id = 'normal_processing'
low_vol = low_volume_processing(extracted_data)
low_vol.task_id = 'low_volume_processing'
# Convergence
merge = merge_results()
# Final steps
report = generate_report(merge)
clean = cleanup()
# Set up dependencies
init >> [source_a, source_b, source_c]
extracted_data >> check
check >> [high_vol, normal, low_vol]
[high_vol, normal, low_vol] >> merge
merge >> report
[merge, report] >> clean
complex_dag = complex_dependencies_dag()
# ==================== Dependency Helper Functions ====================
class DependencyHelpers:
"""Helper functions for managing dependencies."""
@staticmethod
def create_linear_chain(tasks: List[Any]):
"""Create linear dependency chain."""
for i in range(len(tasks) - 1):
tasks[i] >> tasks[i + 1]
@staticmethod
def create_parallel_groups(groups: List[List[Any]]):
"""Create parallel task groups with dependencies."""
for i in range(len(groups) - 1):
for task in groups[i]:
for next_task in groups[i + 1]:
task >> next_task
@staticmethod
def create_fan_out_in(source: Any, processors: List[Any], sink: Any):
"""Create fan-out/fan-in pattern."""
source >> processors >> sink
@staticmethod
def create_conditional_skip(condition_task: Any, skip_tasks: List[Any]):
"""Create conditional skip pattern."""
for task in skip_tasks:
task.trigger_rule = TriggerRule.ALL_DONE
condition_task >> task
# Example usage
if __name__ == "__main__":
print("๐ Task Dependencies Examples\n")
# Example 1: Basic dependency patterns
print("1๏ธโฃ Basic Dependency Patterns:")
patterns = [
("Sequential", "task_a >> task_b >> task_c"),
("Parallel", "[task_a, task_b, task_c]"),
("Fan-out", "task_a >> [task_b, task_c, task_d]"),
("Fan-in", "[task_a, task_b, task_c] >> task_d"),
("Diamond", "task_a >> [task_b, task_c] >> task_d")
]
for pattern, example in patterns:
print(f" {pattern}: {example}")
# Example 2: Trigger rules
print("\n2๏ธโฃ Trigger Rules:")
rules = [
("ALL_SUCCESS", "All upstream tasks succeeded (default)"),
("ALL_FAILED", "All upstream tasks failed"),
("ALL_DONE", "All upstream tasks completed"),
("ONE_SUCCESS", "At least one upstream succeeded"),
("ONE_FAILED", "At least one upstream failed"),
("NONE_FAILED", "No upstream tasks failed"),
("NONE_FAILED_MIN_ONE_SUCCESS", "No failures and at least one success"),
("NONE_SKIPPED", "No upstream tasks were skipped"),
("ALWAYS", "Always run regardless of upstream")
]
for rule, description in rules:
print(f" {rule}: {description}")
# Example 3: Create test DAG for analysis
test_dag = DAG(
'test_dependencies',
default_args={'owner': 'test'},
start_date=days_ago(1),
schedule_interval='@daily'
)
# Build complex dependencies
with test_dag:
start = DummyOperator(task_id='start')
# Parallel extraction
extract_1 = DummyOperator(task_id='extract_1')
extract_2 = DummyOperator(task_id='extract_2')
extract_3 = DummyOperator(task_id='extract_3')
# Processing
process = DummyOperator(task_id='process')
# Quality checks (parallel)
check_1 = DummyOperator(task_id='quality_check_1')
check_2 = DummyOperator(task_id='quality_check_2')
# Final tasks
report = DummyOperator(task_id='generate_report')
notify = DummyOperator(task_id='send_notification')
cleanup = DummyOperator(
task_id='cleanup',
trigger_rule=TriggerRule.ALL_DONE
)
# Set dependencies
start >> [extract_1, extract_2, extract_3] >> process
process >> [check_1, check_2] >> report
report >> [notify, cleanup]
# Example 4: Analyze dependencies
print("\n3๏ธโฃ Dependency Analysis:")
analyzer = DependencyAnalyzer(test_dag)
report = analyzer.get_dependency_report()
print(f" Total tasks: {report['total_tasks']}")
print(f" Total dependencies: {report['total_dependencies']}")
print(f" Critical path length: {report['critical_path_length']}")
print(f" Max parallelism: {report['max_parallelism']}")
print(f" Is acyclic: {report['is_acyclic']}")
# Example 5: Dynamic dependencies
print("\n4๏ธโฃ Dynamic Dependency Building:")
dynamic_dag = DAG(
'dynamic_deps',
default_args={'owner': 'test'},
start_date=days_ago(1)
)
builder = DynamicDependencyBuilder(dynamic_dag)
# Build from configuration
config = {
'tasks': [
{'id': 'task_a', 'type': 'dummy'},
{'id': 'task_b', 'type': 'python'},
{'id': 'task_c', 'type': 'bash', 'command': 'echo hello'}
],
'dependencies': [
{'upstream': 'task_a', 'downstream': 'task_b'},
{'upstream': 'task_b', 'downstream': 'task_c'}
]
}
builder.build_from_config(config)
print(" Built DAG from configuration")
# Example 6: Cross-DAG dependencies
print("\n5๏ธโฃ Cross-DAG Dependencies:")
print(" # Wait for external DAG")
print(" sensor = ExternalTaskSensor(")
print(" external_dag_id='other_dag',")
print(" external_task_id='final_task',")
print(" mode='reschedule'")
print(" )")
# Example 7: Dependency optimization
print("\n6๏ธโฃ Dependency Optimization:")
optimizations = builder.optimize_dependencies()
if optimizations:
for opt in optimizations[:3]: # Show first 3
print(f" {opt['type']}: {opt['suggestion']}")
else:
print(" No optimization opportunities found")
# Example 8: Helper functions
print("\n7๏ธโฃ Helper Functions:")
helpers = [
("chain()", "Create linear chain of tasks"),
("cross_downstream()", "Create cross product of dependencies"),
("chain_linear()", "Custom linear chaining"),
("create_fan_out_in()", "Fan-out/fan-in pattern")
]
for func, description in helpers:
print(f" {func}: {description}")
# Example 9: Best practices
print("\n8๏ธโฃ Dependency Best Practices:")
practices = [
"๐ฏ Keep dependencies simple and clear",
"โก Maximize parallelism where possible",
"๐ Use appropriate trigger rules",
"๐ Avoid deep dependency chains",
"๐ Break complex workflows into sub-DAGs",
"โฐ Consider execution time in dependencies",
"๐ท๏ธ Use task groups for logical grouping",
"๐ Document complex dependency logic",
"๐งช Test dependency paths thoroughly",
"๐ Monitor and optimize bottlenecks"
]
for practice in practices:
print(f" {practice}")
# Example 10: Common pitfalls
print("\n9๏ธโฃ Common Dependency Pitfalls:")
pitfalls = [
("Circular dependencies", "Ensure DAG remains acyclic"),
("Over-serialization", "Don't chain tasks that could run in parallel"),
("Incorrect trigger rules", "Understand when tasks should run"),
("Missing dependencies", "Explicitly define all dependencies"),
("Bottleneck tasks", "Avoid single tasks blocking many others"),
("Complex branching", "Keep conditional logic simple")
]
for pitfall, solution in pitfalls:
print(f" โ {pitfall}: {solution}")
print("\nโ
Task dependencies demonstration complete!")
Key Takeaways and Best Practices ๐ฏ
- Clear Dependencies: Define explicit, understandable task relationships.
- Maximize Parallelism: Run independent tasks concurrently for performance.
- Appropriate Trigger Rules: Use the right trigger rule for each scenario.
- Avoid Deep Chains: Break long sequential dependencies into parallel groups.
- Handle Failures: Design dependencies that gracefully handle task failures.
- Dynamic Dependencies: Build flexible workflows that adapt to data.
- Cross-DAG Coordination: Use sensors and triggers for multi-DAG workflows.
- Performance Analysis: Monitor and optimize dependency bottlenecks.
Task Dependency Best Practices ๐
Mastering task dependencies enables you to build sophisticated workflows that maximize performance while maintaining reliability. You can now create complex dependency patterns, implement conditional execution logic, coordinate cross-DAG workflows, analyze and optimize dependency graphs, and handle failures gracefully with trigger rules. Whether you're building data pipelines, ML workflows, or business process automation, these dependency management skills are essential for production-grade orchestration! ๐
Pro Tip: Think of task dependencies as the nervous system of your workflow - they control how signals (data and control) flow through your pipeline. Always strive for maximum parallelism while maintaining logical correctness. Use the simplest dependency pattern that meets your needs - linear chains for sequential processing, fan-out/fan-in for parallel processing, and branching for conditional logic. Master trigger rules - they're powerful tools for handling complex scenarios like partial failures or optional paths. Use ALL_DONE for cleanup tasks that must run regardless of success/failure. Use NONE_FAILED_MIN_ONE_SUCCESS for convergence points in branching workflows. Implement data quality gates to prevent bad data from propagating through your pipeline. For complex workflows, use task groups to organize related tasks visually and logically. Analyze your dependency graph regularly - look for bottlenecks where many tasks depend on a single task, and optimize by parallelizing or splitting the bottleneck. Use ExternalTaskSensor for cross-DAG dependencies but be careful with execution timing. Build dependencies dynamically when dealing with variable workloads. Monitor critical paths - the longest dependency chain determines minimum execution time. Remember: well-designed dependencies make workflows resilient, efficient, and maintainable!