๐ง Creating DAGs: Design Complex Workflows with Code
Creating effective DAGs (Directed Acyclic Graphs) in Apache Airflow is both an art and a science - it requires understanding workflow patterns, dependency management, and best practices for maintainable pipeline design. Like architecting a complex system, well-designed DAGs are modular, reusable, testable, and scalable. Whether you're building simple linear pipelines or complex branching workflows with hundreds of tasks, mastering DAG creation patterns is essential for production-grade orchestration. Let's explore the comprehensive world of DAG design and implementation! ๐๏ธ
The DAG Design Architecture
Think of DAG creation as building with intelligent LEGO blocks - each task is a self-contained unit that connects to others through dependencies, forming complex workflows that can branch, merge, and adapt dynamically. Using Python's flexibility, Airflow allows you to create DAGs programmatically, generate them from configuration, implement complex patterns, and build reusable components. Understanding DAG lifecycle, task patterns, and design principles is crucial for scalable orchestration!
Real-World Scenario: The Enterprise Data Platform ๐ข
You're building an enterprise data platform that orchestrates hundreds of data pipelines, handles complex ETL/ELT workflows with dependencies, manages machine learning model training and deployment, processes real-time and batch data streams, coordinates multi-cloud data operations, implements data quality gates and validation, provides self-service DAG generation for teams, and maintains audit trails and compliance. Your platform must support dynamic workflow generation, handle failures gracefully, scale horizontally, and provide clear visibility into pipeline operations. Let's build a comprehensive DAG creation framework!
# Advanced DAG Creation Patterns and Best Practices
# pip install apache-airflow pandas pyyaml jinja2
import os
import json
import yaml
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import logging
from functools import wraps
# Airflow imports
from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from airflow.exceptions import AirflowException
# ==================== DAG Factory Pattern ====================
class DAGFactory:
"""Factory for creating DAGs with consistent configuration."""
def __init__(self, default_args: Optional[Dict[str, Any]] = None):
"""Initialize DAG factory with default arguments."""
self.default_args = default_args or {
'owner': 'data_team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
self.logger = logging.getLogger(__name__)
def create_dag(
self,
dag_id: str,
description: str,
schedule_interval: str = '@daily',
tags: Optional[List[str]] = None,
**kwargs
) -> DAG:
"""Create a DAG with standard configuration."""
# Merge default args with any provided args
dag_args = {**self.default_args, **kwargs.get('default_args', {})}
# Create DAG
dag = DAG(
dag_id=dag_id,
description=description,
default_args=dag_args,
schedule_interval=schedule_interval,
start_date=kwargs.get('start_date', days_ago(1)),
catchup=kwargs.get('catchup', False),
tags=tags or [],
max_active_runs=kwargs.get('max_active_runs', 1),
**{k: v for k, v in kwargs.items()
if k not in ['default_args', 'start_date', 'catchup', 'max_active_runs']}
)
self.logger.info(f"Created DAG: {dag_id}")
return dag
def create_etl_dag(
self,
dag_id: str,
source: str,
destination: str,
transform_func: Optional[Callable] = None,
**kwargs
) -> DAG:
"""Create a standard ETL DAG."""
dag = self.create_dag(
dag_id=dag_id,
description=f"ETL pipeline from {source} to {destination}",
tags=['etl', source, destination],
**kwargs
)
with dag:
# Extract task
extract = PythonOperator(
task_id='extract',
python_callable=self._create_extract_task(source),
op_kwargs={'source': source}
)
# Transform task
if transform_func:
transform = PythonOperator(
task_id='transform',
python_callable=transform_func,
)
else:
transform = DummyOperator(task_id='transform')
# Load task
load = PythonOperator(
task_id='load',
python_callable=self._create_load_task(destination),
op_kwargs={'destination': destination}
)
# Set dependencies
extract >> transform >> load
return dag
def _create_extract_task(self, source: str) -> Callable:
"""Create extraction function for source."""
def extract_data(**context):
print(f"Extracting data from {source}")
# Implement extraction logic based on source type
data = {'source': source, 'extracted_at': datetime.now().isoformat()}
context['task_instance'].xcom_push(key='extracted_data', value=data)
return data
return extract_data
def _create_load_task(self, destination: str) -> Callable:
"""Create load function for destination."""
def load_data(**context):
data = context['task_instance'].xcom_pull(task_ids='transform', key='return_value')
print(f"Loading data to {destination}: {data}")
# Implement load logic based on destination type
return {'status': 'loaded', 'destination': destination}
return load_data
# ==================== Configuration-Driven DAGs ====================
@dataclass
class PipelineConfig:
"""Configuration for a data pipeline."""
name: str
schedule: str
source: Dict[str, Any]
transformations: List[Dict[str, Any]]
destination: Dict[str, Any]
notifications: Optional[Dict[str, Any]] = None
quality_checks: Optional[List[Dict[str, Any]]] = None
tags: List[str] = field(default_factory=list)
class ConfigDrivenDAG:
"""Create DAGs from YAML/JSON configuration."""
def __init__(self, config_path: str):
"""Initialize with configuration file path."""
self.config_path = Path(config_path)
self.configs = self._load_configs()
self.dags = {}
self._create_dags()
def _load_configs(self) -> List[PipelineConfig]:
"""Load pipeline configurations from file."""
configs = []
if self.config_path.suffix == '.yaml':
with open(self.config_path, 'r') as f:
data = yaml.safe_load(f)
elif self.config_path.suffix == '.json':
with open(self.config_path, 'r') as f:
data = json.load(f)
else:
raise ValueError(f"Unsupported config format: {self.config_path.suffix}")
for pipeline_data in data.get('pipelines', []):
configs.append(PipelineConfig(**pipeline_data))
return configs
def _create_dags(self):
"""Create DAGs from configurations."""
factory = DAGFactory()
for config in self.configs:
dag_id = f"pipeline_{config.name}"
dag = factory.create_dag(
dag_id=dag_id,
description=f"Auto-generated pipeline: {config.name}",
schedule_interval=config.schedule,
tags=config.tags
)
with dag:
tasks = self._build_pipeline_tasks(config)
self._set_dependencies(tasks)
self.dags[dag_id] = dag
def _build_pipeline_tasks(self, config: PipelineConfig) -> Dict[str, Any]:
"""Build tasks for pipeline configuration."""
tasks = {}
# Create extraction task
tasks['extract'] = self._create_source_task(config.source)
# Create transformation tasks
for i, transform in enumerate(config.transformations):
task_id = f"transform_{i}_{transform['type']}"
tasks[task_id] = self._create_transform_task(transform)
# Create quality check tasks
if config.quality_checks:
for i, check in enumerate(config.quality_checks):
task_id = f"quality_check_{i}"
tasks[task_id] = self._create_quality_task(check)
# Create load task
tasks['load'] = self._create_destination_task(config.destination)
# Create notification task
if config.notifications:
tasks['notify'] = self._create_notification_task(config.notifications)
return tasks
def _create_source_task(self, source_config: Dict[str, Any]):
"""Create source extraction task."""
source_type = source_config['type']
if source_type == 'database':
from airflow.providers.postgres.operators.postgres import PostgresOperator
return PostgresOperator(
task_id='extract_from_database',
sql=source_config['query'],
postgres_conn_id=source_config['connection_id']
)
elif source_type == 'api':
from airflow.providers.http.operators.http import SimpleHttpOperator
return SimpleHttpOperator(
task_id='extract_from_api',
endpoint=source_config['endpoint'],
method=source_config.get('method', 'GET'),
http_conn_id=source_config['connection_id']
)
else:
return DummyOperator(task_id=f'extract_{source_type}')
def _create_transform_task(self, transform_config: Dict[str, Any]):
"""Create transformation task."""
transform_type = transform_config['type']
def transform_data(**context):
print(f"Applying transformation: {transform_type}")
# Implementation would vary based on transform type
return {'transformed': True, 'type': transform_type}
return PythonOperator(
task_id=f"transform_{transform_type}",
python_callable=transform_data
)
def _create_quality_task(self, check_config: Dict[str, Any]):
"""Create data quality check task."""
def check_quality(**context):
print(f"Running quality check: {check_config['name']}")
# Implement quality check logic
return {'check': check_config['name'], 'passed': True}
return PythonOperator(
task_id=f"quality_{check_config['name']}",
python_callable=check_quality
)
def _create_destination_task(self, dest_config: Dict[str, Any]):
"""Create destination load task."""
dest_type = dest_config['type']
def load_to_destination(**context):
print(f"Loading to {dest_type}: {dest_config.get('table', 'default')}")
return {'loaded': True, 'destination': dest_type}
return PythonOperator(
task_id=f"load_to_{dest_type}",
python_callable=load_to_destination
)
def _create_notification_task(self, notify_config: Dict[str, Any]):
"""Create notification task."""
from airflow.operators.email import EmailOperator
return EmailOperator(
task_id='send_notification',
to=notify_config['recipients'],
subject=notify_config.get('subject', 'Pipeline Completed'),
html_content=notify_config.get('template', 'Pipeline completed successfully
')
)
def _set_dependencies(self, tasks: Dict[str, Any]):
"""Set task dependencies based on configuration."""
# Simple linear dependency for now
# In production, parse dependency configuration
task_list = list(tasks.values())
for i in range(len(task_list) - 1):
task_list[i] >> task_list[i + 1]
# ==================== Dynamic Task Generation ====================
@dag(
'dynamic_task_generation',
default_args={
'owner': 'data_team',
'retries': 2,
},
description='DAG with dynamically generated tasks',
schedule_interval='@daily',
start_date=days_ago(1),
tags=['dynamic', 'advanced'],
)
def dynamic_task_dag():
"""DAG that generates tasks dynamically based on data."""
@task
def get_processing_list() -> List[str]:
"""Get list of items to process."""
# In production, this could query a database or API
return ['dataset_a', 'dataset_b', 'dataset_c', 'dataset_d']
@task
def process_dataset(dataset_name: str) -> Dict[str, Any]:
"""Process a single dataset."""
import time
import random
print(f"Processing dataset: {dataset_name}")
# Simulate processing
time.sleep(random.uniform(1, 3))
result = {
'dataset': dataset_name,
'records_processed': random.randint(1000, 10000),
'status': 'success',
'processed_at': datetime.now().isoformat()
}
return result
@task
def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate results from all processing tasks."""
total_records = sum(r['records_processed'] for r in results)
summary = {
'total_datasets': len(results),
'total_records': total_records,
'all_successful': all(r['status'] == 'success' for r in results),
'timestamp': datetime.now().isoformat()
}
print(f"Aggregated results: {summary}")
return summary
@task
def send_report(summary: Dict[str, Any]):
"""Send summary report."""
print(f"Sending report: {summary}")
# Implement actual notification logic
return "Report sent successfully"
# Build dynamic workflow
datasets = get_processing_list()
# Process each dataset in parallel
processing_results = process_dataset.expand(dataset_name=datasets)
# Aggregate all results
summary = aggregate_results(processing_results)
# Send final report
report = send_report(summary)
# Instantiate the dynamic DAG
dynamic_dag = dynamic_task_dag()
# ==================== Complex Dependency Patterns ====================
class DependencyPatterns:
"""Common DAG dependency patterns."""
@staticmethod
def fan_out_fan_in(dag: DAG, source_task, processing_tasks: List, sink_task):
"""
Fan-out/fan-in pattern:
source -> [task1, task2, task3] -> sink
"""
source_task >> processing_tasks >> sink_task
@staticmethod
def conditional_branching(dag: DAG, condition_task, branch_tasks: Dict[str, List]):
"""
Conditional branching pattern:
condition -> branch_a or branch_b -> merge
"""
from airflow.operators.python import BranchPythonOperator
with dag:
merge = DummyOperator(
task_id='merge',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
for branch_id, tasks in branch_tasks.items():
condition_task >> tasks[0]
tasks[-1] >> merge
return merge
@staticmethod
def parallel_sequential(dag: DAG, parallel_groups: List[List]):
"""
Parallel groups in sequence:
[group1_tasks] -> [group2_tasks] -> [group3_tasks]
"""
for i in range(len(parallel_groups) - 1):
for task in parallel_groups[i]:
task >> parallel_groups[i + 1]
@staticmethod
def diamond_dependency(dag: DAG, start_task, parallel_tasks: List, end_task):
"""
Diamond pattern:
-> task1 ->
start end
-> task2 ->
"""
start_task >> parallel_tasks >> end_task
# ==================== Task Group Patterns ====================
def create_data_quality_group(
parent_dag: DAG,
group_id: str,
table_name: str
) -> TaskGroup:
"""Create a reusable data quality check task group."""
with TaskGroup(group_id=group_id, dag=parent_dag) as quality_group:
@task(task_id=f'{group_id}.check_nulls')
def check_nulls(table: str) -> Dict[str, Any]:
"""Check for null values."""
print(f"Checking nulls in {table}")
return {'table': table, 'nulls_found': False}
@task(task_id=f'{group_id}.check_duplicates')
def check_duplicates(table: str) -> Dict[str, Any]:
"""Check for duplicate records."""
print(f"Checking duplicates in {table}")
return {'table': table, 'duplicates_found': False}
@task(task_id=f'{group_id}.check_schema')
def check_schema(table: str) -> Dict[str, Any]:
"""Validate table schema."""
print(f"Validating schema for {table}")
return {'table': table, 'schema_valid': True}
@task(task_id=f'{group_id}.aggregate_checks')
def aggregate_checks(*check_results) -> Dict[str, Any]:
"""Aggregate all quality check results."""
all_passed = all(
not r.get('nulls_found', False) and
not r.get('duplicates_found', False) and
r.get('schema_valid', True)
for r in check_results
)
return {
'table': table_name,
'all_checks_passed': all_passed,
'timestamp': datetime.now().isoformat()
}
# Define task dependencies within group
null_check = check_nulls(table_name)
dup_check = check_duplicates(table_name)
schema_check = check_schema(table_name)
result = aggregate_checks(null_check, dup_check, schema_check)
return quality_group
# ==================== SubDAG Pattern ====================
def create_subdag(
parent_dag_id: str,
child_dag_id: str,
default_args: Dict[str, Any],
schedule_interval: str
) -> DAG:
"""Create a SubDAG for modular workflow components."""
subdag = DAG(
dag_id=f'{parent_dag_id}.{child_dag_id}',
default_args=default_args,
schedule_interval=schedule_interval,
start_date=default_args.get('start_date', days_ago(1))
)
with subdag:
start = DummyOperator(task_id='subdag_start')
# Add subdag tasks
for i in range(3):
task = PythonOperator(
task_id=f'subdag_task_{i}',
python_callable=lambda x=i: print(f"SubDAG task {x}")
)
start >> task
end = DummyOperator(task_id='subdag_end')
# Connect all tasks to end
for task in subdag.tasks:
if task.task_id != 'subdag_end' and task.task_id != 'subdag_start':
task >> end
return subdag
# ==================== Cross-DAG Dependencies ====================
@dag(
'master_pipeline',
default_args={'owner': 'data_team', 'retries': 1},
description='Master pipeline coordinating multiple DAGs',
schedule_interval='@daily',
start_date=days_ago(1),
tags=['master', 'orchestration'],
)
def master_pipeline():
"""Master DAG that coordinates multiple child DAGs."""
@task
def prepare_environment() -> Dict[str, Any]:
"""Prepare environment for child DAGs."""
print("Preparing environment...")
# Set Airflow Variables for child DAGs
Variable.set('processing_date', datetime.now().strftime('%Y-%m-%d'))
Variable.set('processing_status', 'started')
return {
'environment': 'prepared',
'timestamp': datetime.now().isoformat()
}
# Prepare environment
env_prep = prepare_environment()
# Trigger child DAGs
trigger_etl = TriggerDagRunOperator(
task_id='trigger_etl_dag',
trigger_dag_id='etl_pipeline',
conf={'source': 'master', 'priority': 'high'},
wait_for_completion=True,
poke_interval=60,
)
trigger_ml = TriggerDagRunOperator(
task_id='trigger_ml_dag',
trigger_dag_id='ml_pipeline',
conf={'model': 'latest', 'training': True},
wait_for_completion=True,
poke_interval=60,
)
@task
def validate_results() -> Dict[str, Any]:
"""Validate results from child DAGs."""
print("Validating results from child DAGs...")
# Check Variables set by child DAGs
etl_status = Variable.get('etl_status', default_var='unknown')
ml_status = Variable.get('ml_status', default_var='unknown')
return {
'etl_status': etl_status,
'ml_status': ml_status,
'validation': 'passed' if etl_status == 'success' and ml_status == 'success' else 'failed'
}
@task
def cleanup():
"""Clean up after pipeline completion."""
print("Cleaning up...")
# Clear Variables
Variable.delete('processing_date')
Variable.delete('processing_status')
return "Cleanup completed"
# Define dependencies
validation = validate_results()
clean = cleanup()
env_prep >> [trigger_etl, trigger_ml] >> validation >> clean
master_dag = master_pipeline()
# ==================== Template-Based DAGs ====================
class TemplateDAG:
"""Create DAGs from Jinja2 templates."""
def __init__(self, template_path: str):
"""Initialize with template path."""
from jinja2 import Environment, FileSystemLoader
self.template_dir = Path(template_path).parent
self.template_name = Path(template_path).name
self.env = Environment(loader=FileSystemLoader(self.template_dir))
self.template = self.env.get_template(self.template_name)
def render_dag(self, **context) -> str:
"""Render DAG code from template."""
return self.template.render(**context)
def create_dag_file(self, output_path: str, **context):
"""Create DAG file from template."""
dag_code = self.render_dag(**context)
with open(output_path, 'w') as f:
f.write(dag_code)
print(f"Created DAG file: {output_path}")
# ==================== DAG Validation and Testing ====================
class DAGValidator:
"""Validate DAG structure and configuration."""
@staticmethod
def validate_dag(dag: DAG) -> Dict[str, Any]:
"""Validate a DAG for common issues."""
issues = []
warnings = []
# Check for cycles
if dag.has_task_cycle():
issues.append("DAG contains a cycle")
# Check for duplicate task IDs
task_ids = [task.task_id for task in dag.tasks]
if len(task_ids) != len(set(task_ids)):
issues.append("Duplicate task IDs found")
# Check for orphaned tasks
for task in dag.tasks:
if not task.upstream_task_ids and not task.downstream_task_ids:
warnings.append(f"Task '{task.task_id}' has no dependencies")
# Check for proper start_date
if not dag.start_date:
issues.append("No start_date defined")
# Check for email configuration
if not dag.default_args.get('email'):
warnings.append("No email configured for notifications")
# Check task count
if len(dag.tasks) > 100:
warnings.append(f"DAG has {len(dag.tasks)} tasks - consider breaking it up")
return {
'valid': len(issues) == 0,
'issues': issues,
'warnings': warnings,
'task_count': len(dag.tasks),
'has_schedule': dag.schedule_interval is not None
}
@staticmethod
def test_dag_execution(dag: DAG, execution_date: datetime = None) -> Dict[str, Any]:
"""Test DAG execution in a controlled environment."""
from airflow.models import DagBag
from airflow.utils.state import State
execution_date = execution_date or datetime.now()
try:
# Create a test DagRun
dag_run = dag.create_dagrun(
state=State.RUNNING,
execution_date=execution_date,
run_type='manual',
external_trigger=True
)
# Test task execution order
execution_order = []
for task in dag.topological_sort():
execution_order.append(task.task_id)
return {
'success': True,
'execution_order': execution_order,
'dag_run_id': dag_run.run_id
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
# ==================== DAG Monitoring and Metrics ====================
class DAGMetrics:
"""Collect and report DAG metrics."""
@staticmethod
def calculate_dag_complexity(dag: DAG) -> Dict[str, Any]:
"""Calculate complexity metrics for a DAG."""
# Calculate depth (longest path)
def get_depth(task, visited=None):
if visited is None:
visited = set()
if task in visited:
return 0
visited.add(task)
if not task.downstream_list:
return 1
return 1 + max(get_depth(t, visited.copy()) for t in task.downstream_list)
# Calculate width (max parallel tasks)
levels = {}
for task in dag.tasks:
level = len(task.upstream_task_ids)
levels[level] = levels.get(level, 0) + 1
max_width = max(levels.values()) if levels else 0
# Get root tasks
root_tasks = [t for t in dag.tasks if not t.upstream_task_ids]
# Calculate maximum depth
max_depth = max(get_depth(t) for t in root_tasks) if root_tasks else 0
return {
'task_count': len(dag.tasks),
'depth': max_depth,
'width': max_width,
'complexity_score': len(dag.tasks) * max_depth * max_width,
'average_dependencies': sum(len(t.upstream_task_ids) for t in dag.tasks) / len(dag.tasks) if dag.tasks else 0
}
# Example usage
if __name__ == "__main__":
print("๐ง Creating DAGs Examples\n")
# Example 1: DAG Factory
print("1๏ธโฃ Using DAG Factory:")
factory = DAGFactory()
etl_dag = factory.create_etl_dag(
dag_id='factory_etl_pipeline',
source='database',
destination='warehouse',
schedule_interval='@daily'
)
print(f" Created DAG: {etl_dag.dag_id}")
print(f" Tasks: {[t.task_id for t in etl_dag.tasks]}")
# Example 2: Dynamic task generation
print("\n2๏ธโฃ Dynamic Task Generation:")
print(" @task")
print(" def process_item(item_id: str):")
print(" return process(item_id)")
print(" ")
print(" items = get_items()")
print(" results = process_item.expand(item_id=items)")
# Example 3: Task group patterns
print("\n3๏ธโฃ Task Group Patterns:")
patterns = [
"Data Quality Checks",
"Transformation Pipeline",
"Notification Group",
"Error Handling Group"
]
for pattern in patterns:
print(f" โข {pattern}")
# Example 4: Dependency patterns
print("\n4๏ธโฃ Common Dependency Patterns:")
dep_patterns = [
("Linear", "task1 >> task2 >> task3"),
("Fan-out/Fan-in", "start >> [task1, task2, task3] >> end"),
("Conditional", "check >> [branch_a, branch_b]"),
("Diamond", "start >> [parallel1, parallel2] >> end"),
("Cross-DAG", "ExternalTaskSensor + TriggerDagRunOperator")
]
for pattern, example in dep_patterns:
print(f" {pattern}:")
print(f" {example}")
# Example 5: Configuration-driven DAGs
print("\n5๏ธโฃ Configuration-Driven DAGs:")
sample_config = """
pipelines:
- name: sales_etl
schedule: "@daily"
source:
type: database
connection_id: sales_db
query: "SELECT * FROM sales"
transformations:
- type: clean
- type: aggregate
destination:
type: warehouse
table: fact_sales
"""
print(" Sample YAML configuration:")
for line in sample_config.strip().split('\n'):
print(f" {line}")
# Example 6: DAG validation
print("\n6๏ธโฃ DAG Validation:")
# Create a test DAG for validation
test_dag = DAG(
'test_dag',
default_args={'owner': 'test'},
start_date=days_ago(1),
schedule_interval='@daily'
)
with test_dag:
t1 = DummyOperator(task_id='task1')
t2 = DummyOperator(task_id='task2')
t3 = DummyOperator(task_id='task3')
t1 >> t2 >> t3
validator = DAGValidator()
validation_result = validator.validate_dag(test_dag)
print(f" Valid: {validation_result['valid']}")
print(f" Task count: {validation_result['task_count']}")
print(f" Has schedule: {validation_result['has_schedule']}")
# Example 7: DAG complexity metrics
print("\n7๏ธโฃ DAG Complexity Metrics:")
metrics = DAGMetrics.calculate_dag_complexity(test_dag)
for metric, value in metrics.items():
print(f" {metric}: {value}")
# Example 8: Template variables
print("\n8๏ธโฃ Template Variables in DAGs:")
variables = [
("{{ ds }}", "Execution date YYYY-MM-DD"),
("{{ ds_nodash }}", "Execution date YYYYMMDD"),
("{{ prev_ds }}", "Previous execution date"),
("{{ next_ds }}", "Next execution date"),
("{{ macros.ds_add(ds, 7) }}", "Add days to date"),
("{{ params.my_param }}", "Access DAG parameters"),
("{{ var.value.my_variable }}", "Access Airflow Variables"),
("{{ conn.my_connection }}", "Access connections")
]
for var, desc in variables:
print(f" {var}: {desc}")
# Example 9: Best practices
print("\n9๏ธโฃ DAG Creation Best Practices:")
practices = [
"๐ Use descriptive DAG and task IDs",
"๐ฏ Keep DAGs focused - one workflow per DAG",
"๐ง Use task groups for logical grouping",
"โป๏ธ Create reusable components",
"๐ Implement proper monitoring and alerting",
"๐งช Test DAGs before production",
"๐ Document DAG purpose and dependencies",
"โก Optimize for parallelism where possible",
"๐ Use connections and variables for secrets",
"๐ท๏ธ Tag DAGs for organization"
]
for practice in practices:
print(f" {practice}")
# Example 10: Anti-patterns to avoid
print("\n๐ DAG Anti-Patterns to Avoid:")
antipatterns = [
("Top-level code", "Avoid database queries at module level"),
("Non-idempotent tasks", "Tasks should produce same result on retry"),
("Large XCom data", "Don't pass large objects through XCom"),
("Dynamic start_date", "Don't use datetime.now() for start_date"),
("Circular dependencies", "Ensure DAG is acyclic"),
("Too many tasks", "Break large DAGs into smaller ones"),
("Missing error handling", "Always handle and log errors"),
("Hardcoded credentials", "Use Airflow connections")
]
for pattern, issue in antipatterns:
print(f" โ {pattern}: {issue}")
print("\nโ
DAG creation patterns demonstration complete!")
Key Takeaways and Best Practices ๐ฏ
- Modular Design: Create reusable components with task groups and factory patterns.
- Configuration-Driven: Generate DAGs from YAML/JSON for easier management.
- Dynamic Generation: Use dynamic task mapping for scalable workflows.
- Clear Dependencies: Define explicit task dependencies and use appropriate patterns.
- Validation: Test and validate DAGs before deployment.
- Documentation: Document DAG purpose, dependencies, and requirements.
- Error Handling: Implement comprehensive error handling and recovery.
- Performance: Optimize for parallelism and resource usage.
DAG Creation Best Practices ๐
Mastering DAG creation enables you to build sophisticated, maintainable workflow orchestration at scale. You can now design complex workflows with multiple patterns, generate DAGs dynamically from configuration, create reusable components with task groups, implement cross-DAG dependencies, and validate workflows before deployment. Whether you're building data pipelines, ML workflows, or business process automation, these DAG creation skills provide the foundation for enterprise-grade orchestration! ๐
Pro Tip: Think of DAG creation as software architecture - each DAG should have a single, clear purpose and be composed of modular, reusable components. Use the DAG Factory pattern to standardize DAG creation across your organization. Leverage configuration-driven DAGs for repetitive patterns - define pipelines in YAML/JSON and generate DAGs programmatically. Implement task groups to organize related tasks visually and logically. Use dynamic task generation for scalable workflows that adapt to data. Keep DAGs focused - it's better to have multiple simple DAGs than one complex monolith. Implement proper dependency patterns: use fan-out/fan-in for parallel processing, conditional branching for decision logic, and cross-DAG dependencies for complex orchestration. Always validate DAGs before deployment - check for cycles, orphaned tasks, and missing configuration. Use template variables for dynamic behavior but avoid top-level code execution. Implement comprehensive monitoring with SLA checks and alerting. Version control your DAGs and use CI/CD for deployment. Test DAGs thoroughly - use Airflow's testing utilities to validate execution order and behavior. Remember: DAGs are code - apply software engineering best practices including code review, testing, and documentation!