ā° Scheduled Tasks: Automate with Cron and Task Scheduler
Time is the ultimate automation trigger. Whether it's backing up databases at 2 AM, generating reports every Monday, or cleaning temp files hourly, scheduled tasks are the heartbeat of system automation. Python gives you the power to create, manage, and orchestrate scheduled tasks across any operating system. Let's master the art of temporal automation! š
The Scheduling Ecosystem
Think of scheduled tasks as your digital workforce that never sleeps. Like a well-orchestrated factory where machines start and stop at precise times, your scheduled tasks ensure critical operations happen exactly when needed - whether you're awake or asleep, at work or on vacation!
Real-World Scenario: The Automation Orchestra š
You're managing a complex infrastructure where dozens of tasks need perfect timing: database backups at night, report generation before business hours, log rotation every hour, security scans on weekends, and monitoring checks every minute. Let's build a comprehensive scheduling system that conducts this orchestra flawlessly!
import os
import sys
import subprocess
import platform
import schedule
import time
import json
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any, Tuple
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
import logging
import threading
import queue
from croniter import croniter
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import tempfile
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class ScheduleType(Enum):
"""Types of schedules."""
CRON = "cron"
INTERVAL = "interval"
DATE = "date"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
MISSED = "missed"
DISABLED = "disabled"
@dataclass
class ScheduledTask:
"""Scheduled task definition."""
id: str
name: str
command: str
schedule_type: ScheduleType
schedule_value: str
enabled: bool = True
description: str = ""
working_directory: str = None
environment: Dict[str, str] = field(default_factory=dict)
timeout: int = 3600 # seconds
retry_count: int = 0
retry_delay: int = 60 # seconds
notify_on_failure: bool = True
notify_on_success: bool = False
last_run: datetime = None
next_run: datetime = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
@dataclass
class TaskExecution:
"""Task execution record."""
task_id: str
start_time: datetime
end_time: datetime = None
status: TaskStatus = TaskStatus.PENDING
output: str = ""
error: str = ""
exit_code: int = None
class TaskScheduler:
"""
Comprehensive task scheduling system for cross-platform automation.
"""
def __init__(self, config: Dict = None):
self.config = config or self.get_default_config()
self.tasks = {}
self.executions = []
self.scheduler = None
self.setup_logging()
self.setup_database()
self.setup_scheduler()
self.load_tasks()
def get_default_config(self) -> Dict:
"""Get default configuration."""
return {
'db_path': 'scheduler.db',
'log_file': 'scheduler.log',
'max_workers': 10,
'job_defaults': {
'coalesce': False,
'max_instances': 3,
'misfire_grace_time': 30
},
'smtp_server': None,
'smtp_port': 587,
'email_from': None,
'email_to': [],
'enable_notifications': False
}
def setup_logging(self):
"""Setup logging configuration."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.config['log_file']),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_database(self):
"""Setup SQLite database for task storage."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
# Create tasks table
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
command TEXT NOT NULL,
schedule_type TEXT,
schedule_value TEXT,
enabled BOOLEAN DEFAULT 1,
description TEXT,
working_directory TEXT,
environment TEXT,
timeout INTEGER DEFAULT 3600,
retry_count INTEGER DEFAULT 0,
retry_delay INTEGER DEFAULT 60,
notify_on_failure BOOLEAN DEFAULT 1,
notify_on_success BOOLEAN DEFAULT 0,
last_run DATETIME,
next_run DATETIME,
created_at DATETIME,
updated_at DATETIME
)
''')
# Create executions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS executions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
start_time DATETIME,
end_time DATETIME,
status TEXT,
output TEXT,
error TEXT,
exit_code INTEGER,
FOREIGN KEY (task_id) REFERENCES tasks (id),
INDEX idx_task_id (task_id),
INDEX idx_start_time (start_time)
)
''')
conn.commit()
conn.close()
def setup_scheduler(self):
"""Setup APScheduler."""
jobstores = {
'default': SQLAlchemyJobStore(
url=f'sqlite:///{self.config["db_path"]}'
)
}
executors = {
'default': ThreadPoolExecutor(self.config['max_workers']),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = self.config['job_defaults']
self.scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone='UTC'
)
# Add listener for job events
self.scheduler.add_listener(
self._job_listener,
mask=(
self.scheduler.EVENT_JOB_EXECUTED |
self.scheduler.EVENT_JOB_ERROR |
self.scheduler.EVENT_JOB_MISSED
)
)
def _job_listener(self, event):
"""Handle scheduler events."""
if event.exception:
self.logger.error(f"Job {event.job_id} crashed: {event.exception}")
else:
self.logger.info(f"Job {event.job_id} executed successfully")
def load_tasks(self):
"""Load tasks from database."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE enabled = 1")
rows = cursor.fetchall()
for row in rows:
task = ScheduledTask(
id=row[0],
name=row[1],
command=row[2],
schedule_type=ScheduleType(row[3]),
schedule_value=row[4],
enabled=bool(row[5]),
description=row[6],
working_directory=row[7],
environment=json.loads(row[8]) if row[8] else {},
timeout=row[9],
retry_count=row[10],
retry_delay=row[11],
notify_on_failure=bool(row[12]),
notify_on_success=bool(row[13]),
last_run=datetime.fromisoformat(row[14]) if row[14] else None,
next_run=datetime.fromisoformat(row[15]) if row[15] else None,
created_at=datetime.fromisoformat(row[16]) if row[16] else datetime.now(),
updated_at=datetime.fromisoformat(row[17]) if row[17] else datetime.now()
)
self.tasks[task.id] = task
# Schedule the task
if task.enabled:
self._schedule_task(task)
conn.close()
self.logger.info(f"Loaded {len(self.tasks)} tasks")
def add_task(self, task: ScheduledTask) -> bool:
"""Add a new scheduled task."""
try:
# Store in memory
self.tasks[task.id] = task
# Store in database
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO tasks VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
task.id,
task.name,
task.command,
task.schedule_type.value,
task.schedule_value,
task.enabled,
task.description,
task.working_directory,
json.dumps(task.environment),
task.timeout,
task.retry_count,
task.retry_delay,
task.notify_on_failure,
task.notify_on_success,
task.last_run.isoformat() if task.last_run else None,
task.next_run.isoformat() if task.next_run else None,
task.created_at.isoformat(),
task.updated_at.isoformat()
))
conn.commit()
conn.close()
# Schedule if enabled
if task.enabled:
self._schedule_task(task)
self.logger.info(f"Added task: {task.name}")
return True
except Exception as e:
self.logger.error(f"Failed to add task: {e}")
return False
def _schedule_task(self, task: ScheduledTask):
"""Schedule a task with APScheduler."""
try:
# Create trigger based on schedule type
if task.schedule_type == ScheduleType.CRON:
trigger = CronTrigger.from_crontab(task.schedule_value)
elif task.schedule_type == ScheduleType.INTERVAL:
# Parse interval (e.g., "5m", "1h", "30s")
interval = self._parse_interval(task.schedule_value)
trigger = IntervalTrigger(seconds=interval)
elif task.schedule_type == ScheduleType.DATE:
# One-time execution
trigger = DateTrigger(run_date=datetime.fromisoformat(task.schedule_value))
elif task.schedule_type == ScheduleType.DAILY:
# Daily at specified time
hour, minute = map(int, task.schedule_value.split(':'))
trigger = CronTrigger(hour=hour, minute=minute)
elif task.schedule_type == ScheduleType.WEEKLY:
# Weekly on specified day and time
day, time_str = task.schedule_value.split(' ')
hour, minute = map(int, time_str.split(':'))
trigger = CronTrigger(day_of_week=day.lower(), hour=hour, minute=minute)
elif task.schedule_type == ScheduleType.MONTHLY:
# Monthly on specified day and time
day, time_str = task.schedule_value.split(' ')
hour, minute = map(int, time_str.split(':'))
trigger = CronTrigger(day=int(day), hour=hour, minute=minute)
else:
raise ValueError(f"Unknown schedule type: {task.schedule_type}")
# Add job to scheduler
self.scheduler.add_job(
func=self._execute_task,
trigger=trigger,
args=[task.id],
id=task.id,
name=task.name,
replace_existing=True
)
# Update next run time
job = self.scheduler.get_job(task.id)
if job:
task.next_run = job.next_run_time
self.logger.info(f"Scheduled task: {task.name}")
except Exception as e:
self.logger.error(f"Failed to schedule task {task.name}: {e}")
def _parse_interval(self, interval_str: str) -> int:
"""Parse interval string to seconds."""
units = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400,
'w': 604800
}
# Extract number and unit
import re
match = re.match(r'^(\d+)([smhdw])$', interval_str.lower())
if not match:
raise ValueError(f"Invalid interval format: {interval_str}")
value = int(match.group(1))
unit = match.group(2)
return value * units[unit]
def _execute_task(self, task_id: str):
"""Execute a scheduled task."""
task = self.tasks.get(task_id)
if not task:
self.logger.error(f"Task not found: {task_id}")
return
execution = TaskExecution(
task_id=task_id,
start_time=datetime.now()
)
self.logger.info(f"Executing task: {task.name}")
try:
# Prepare environment
env = os.environ.copy()
if task.environment:
env.update(task.environment)
# Execute command
result = subprocess.run(
task.command,
shell=True,
capture_output=True,
text=True,
timeout=task.timeout,
cwd=task.working_directory,
env=env
)
execution.end_time = datetime.now()
execution.exit_code = result.returncode
execution.output = result.stdout
execution.error = result.stderr
if result.returncode == 0:
execution.status = TaskStatus.SUCCESS
self.logger.info(f"Task {task.name} completed successfully")
if task.notify_on_success:
self._send_notification(task, execution)
else:
execution.status = TaskStatus.FAILED
self.logger.error(f"Task {task.name} failed with exit code {result.returncode}")
if task.notify_on_failure:
self._send_notification(task, execution)
# Retry if configured
if task.retry_count > 0:
self._retry_task(task, execution)
# Update task last run
task.last_run = execution.start_time
self._update_task(task)
except subprocess.TimeoutExpired:
execution.status = TaskStatus.FAILED
execution.error = f"Task timed out after {task.timeout} seconds"
self.logger.error(f"Task {task.name} timed out")
if task.notify_on_failure:
self._send_notification(task, execution)
except Exception as e:
execution.status = TaskStatus.FAILED
execution.error = str(e)
self.logger.error(f"Task {task.name} failed: {e}")
if task.notify_on_failure:
self._send_notification(task, execution)
finally:
# Store execution record
self._store_execution(execution)
def _retry_task(self, task: ScheduledTask, execution: TaskExecution):
"""Retry a failed task."""
retry_count = task.retry_count
for i in range(retry_count):
self.logger.info(f"Retrying task {task.name} (attempt {i+1}/{retry_count})")
# Wait before retry
time.sleep(task.retry_delay)
# Try again
retry_execution = TaskExecution(
task_id=task.id,
start_time=datetime.now()
)
try:
result = subprocess.run(
task.command,
shell=True,
capture_output=True,
text=True,
timeout=task.timeout,
cwd=task.working_directory
)
retry_execution.end_time = datetime.now()
retry_execution.exit_code = result.returncode
retry_execution.output = result.stdout
retry_execution.error = result.stderr
if result.returncode == 0:
retry_execution.status = TaskStatus.SUCCESS
self.logger.info(f"Task {task.name} succeeded on retry {i+1}")
self._store_execution(retry_execution)
return # Success, no more retries needed
else:
retry_execution.status = TaskStatus.FAILED
except Exception as e:
retry_execution.status = TaskStatus.FAILED
retry_execution.error = str(e)
self._store_execution(retry_execution)
self.logger.error(f"Task {task.name} failed after {retry_count} retries")
def _update_task(self, task: ScheduledTask):
"""Update task in database."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute('''
UPDATE tasks SET
last_run = ?,
next_run = ?,
updated_at = ?
WHERE id = ?
''', (
task.last_run.isoformat() if task.last_run else None,
task.next_run.isoformat() if task.next_run else None,
datetime.now().isoformat(),
task.id
))
conn.commit()
conn.close()
def _store_execution(self, execution: TaskExecution):
"""Store execution record in database."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute('''
INSERT INTO executions
(task_id, start_time, end_time, status, output, error, exit_code)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
execution.task_id,
execution.start_time.isoformat(),
execution.end_time.isoformat() if execution.end_time else None,
execution.status.value,
execution.output,
execution.error,
execution.exit_code
))
conn.commit()
conn.close()
def _send_notification(self, task: ScheduledTask, execution: TaskExecution):
"""Send notification about task execution."""
if not self.config['enable_notifications']:
return
try:
subject = f"Task {task.name}: {execution.status.value}"
body = f"""
Task: {task.name}
Status: {execution.status.value}
Start Time: {execution.start_time}
End Time: {execution.end_time}
Exit Code: {execution.exit_code}
Output:
{execution.output[:1000]}
Error:
{execution.error[:1000]}
"""
# Send email
if self.config['smtp_server'] and self.config['email_to']:
self._send_email(subject, body)
except Exception as e:
self.logger.error(f"Failed to send notification: {e}")
def _send_email(self, subject: str, body: str):
"""Send email notification."""
msg = MIMEMultipart()
msg['From'] = self.config['email_from']
msg['To'] = ', '.join(self.config['email_to'])
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
server.starttls()
if self.config.get('smtp_username') and self.config.get('smtp_password'):
server.login(self.config['smtp_username'], self.config['smtp_password'])
server.send_message(msg)
def start(self):
"""Start the scheduler."""
self.scheduler.start()
self.logger.info("Scheduler started")
def stop(self):
"""Stop the scheduler."""
self.scheduler.shutdown()
self.logger.info("Scheduler stopped")
def get_task_status(self, task_id: str) -> Dict:
"""Get task status and history."""
task = self.tasks.get(task_id)
if not task:
return None
# Get recent executions
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM executions
WHERE task_id = ?
ORDER BY start_time DESC
LIMIT 10
''', (task_id,))
executions = cursor.fetchall()
conn.close()
return {
'task': asdict(task),
'executions': [
{
'start_time': e[2],
'end_time': e[3],
'status': e[4],
'exit_code': e[7]
}
for e in executions
]
}
def list_tasks(self) -> List[Dict]:
"""List all scheduled tasks."""
return [asdict(task) for task in self.tasks.values()]
def enable_task(self, task_id: str):
"""Enable a task."""
task = self.tasks.get(task_id)
if task:
task.enabled = True
self._update_task(task)
self._schedule_task(task)
self.logger.info(f"Enabled task: {task.name}")
def disable_task(self, task_id: str):
"""Disable a task."""
task = self.tasks.get(task_id)
if task:
task.enabled = False
self._update_task(task)
self.scheduler.remove_job(task_id)
self.logger.info(f"Disabled task: {task.name}")
def delete_task(self, task_id: str):
"""Delete a task."""
if task_id in self.tasks:
# Remove from scheduler
if self.scheduler.get_job(task_id):
self.scheduler.remove_job(task_id)
# Remove from memory
del self.tasks[task_id]
# Remove from database
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
cursor.execute("DELETE FROM executions WHERE task_id = ?", (task_id,))
conn.commit()
conn.close()
self.logger.info(f"Deleted task: {task_id}")
class CronManager:
"""
Manage system cron jobs on Unix/Linux systems.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
if platform.system() not in ['Linux', 'Darwin']:
raise OSError("CronManager only works on Unix-like systems")
def list_crontab(self, user: str = None) -> List[str]:
"""List crontab entries."""
cmd = ['crontab', '-l']
if user:
cmd.extend(['-u', user])
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip().split('\n')
else:
return []
except Exception as e:
self.logger.error(f"Failed to list crontab: {e}")
return []
def add_cron_job(self, schedule: str, command: str, user: str = None) -> bool:
"""Add a cron job."""
try:
# Get current crontab
current_crontab = self.list_crontab(user)
# Add new job
new_job = f"{schedule} {command}"
current_crontab.append(new_job)
# Write back
return self._write_crontab(current_crontab, user)
except Exception as e:
self.logger.error(f"Failed to add cron job: {e}")
return False
def remove_cron_job(self, pattern: str, user: str = None) -> bool:
"""Remove cron jobs matching pattern."""
try:
# Get current crontab
current_crontab = self.list_crontab(user)
# Filter out matching jobs
filtered_crontab = [
line for line in current_crontab
if pattern not in line
]
# Write back
return self._write_crontab(filtered_crontab, user)
except Exception as e:
self.logger.error(f"Failed to remove cron job: {e}")
return False
def _write_crontab(self, lines: List[str], user: str = None) -> bool:
"""Write crontab entries."""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.cron', delete=False) as f:
f.write('\n'.join(lines))
f.write('\n')
temp_file = f.name
# Install new crontab
cmd = ['crontab', temp_file]
if user:
cmd.extend(['-u', user])
result = subprocess.run(cmd, capture_output=True, text=True)
# Clean up
os.unlink(temp_file)
return result.returncode == 0
except Exception as e:
self.logger.error(f"Failed to write crontab: {e}")
return False
def validate_cron_expression(self, expression: str) -> bool:
"""Validate cron expression."""
try:
croniter(expression)
return True
except:
return False
def get_next_run(self, expression: str, n: int = 5) -> List[datetime]:
"""Get next N run times for a cron expression."""
try:
cron = croniter(expression, datetime.now())
return [cron.get_next(datetime) for _ in range(n)]
except:
return []
class WindowsTaskScheduler:
"""
Manage Windows Task Scheduler tasks.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
if platform.system() != 'Windows':
raise OSError("WindowsTaskScheduler only works on Windows")
def list_tasks(self, folder: str = "\\") -> List[Dict]:
"""List scheduled tasks."""
try:
# Use schtasks command
result = subprocess.run(
['schtasks', '/query', '/fo', 'csv', '/v'],
capture_output=True,
text=True,
shell=True
)
if result.returncode != 0:
return []
# Parse CSV output
import csv
reader = csv.DictReader(result.stdout.splitlines())
tasks = []
for row in reader:
if row['TaskName'].startswith(folder):
tasks.append({
'name': row['TaskName'],
'status': row['Status'],
'next_run': row['Next Run Time'],
'last_run': row['Last Run Time']
})
return tasks
except Exception as e:
self.logger.error(f"Failed to list tasks: {e}")
return []
def create_task(self, name: str, command: str,
schedule: str, description: str = "") -> bool:
"""Create a scheduled task."""
try:
# Build schtasks command
cmd = [
'schtasks', '/create',
'/tn', name,
'/tr', command,
'/sc', self._parse_schedule_type(schedule),
'/f' # Force create
]
if description:
cmd.extend(['/desc', description])
# Add schedule-specific parameters
schedule_params = self._parse_schedule_params(schedule)
cmd.extend(schedule_params)
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode == 0:
self.logger.info(f"Created task: {name}")
return True
else:
self.logger.error(f"Failed to create task: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Failed to create task: {e}")
return False
def delete_task(self, name: str) -> bool:
"""Delete a scheduled task."""
try:
result = subprocess.run(
['schtasks', '/delete', '/tn', name, '/f'],
capture_output=True,
text=True,
shell=True
)
if result.returncode == 0:
self.logger.info(f"Deleted task: {name}")
return True
else:
self.logger.error(f"Failed to delete task: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Failed to delete task: {e}")
return False
def run_task(self, name: str) -> bool:
"""Run a task immediately."""
try:
result = subprocess.run(
['schtasks', '/run', '/tn', name],
capture_output=True,
text=True,
shell=True
)
return result.returncode == 0
except Exception as e:
self.logger.error(f"Failed to run task: {e}")
return False
def _parse_schedule_type(self, schedule: str) -> str:
"""Parse schedule type for schtasks."""
schedule_lower = schedule.lower()
if 'daily' in schedule_lower:
return 'daily'
elif 'weekly' in schedule_lower:
return 'weekly'
elif 'monthly' in schedule_lower:
return 'monthly'
elif 'once' in schedule_lower:
return 'once'
elif 'onstart' in schedule_lower:
return 'onstart'
elif 'onlogon' in schedule_lower:
return 'onlogon'
else:
return 'daily' # Default
def _parse_schedule_params(self, schedule: str) -> List[str]:
"""Parse schedule parameters for schtasks."""
params = []
# Extract time if present
import re
time_match = re.search(r'(\d{1,2}):(\d{2})', schedule)
if time_match:
params.extend(['/st', time_match.group(0)])
# Extract day for weekly/monthly
if 'monday' in schedule.lower():
params.extend(['/d', 'MON'])
elif 'tuesday' in schedule.lower():
params.extend(['/d', 'TUE'])
# ... etc
return params
class TaskOrchestrator:
"""
High-level task orchestration with dependencies and workflows.
"""
def __init__(self, scheduler: TaskScheduler):
self.scheduler = scheduler
self.workflows = {}
self.logger = logging.getLogger(__name__)
def create_workflow(self, name: str, tasks: List[Dict]) -> bool:
"""
Create a workflow with dependent tasks.
tasks = [
{'id': 'backup', 'command': '...', 'schedule': '0 2 * * *'},
{'id': 'compress', 'command': '...', 'depends_on': ['backup']},
{'id': 'upload', 'command': '...', 'depends_on': ['compress']}
]
"""
workflow = {
'name': name,
'tasks': tasks,
'created_at': datetime.now()
}
# Validate dependencies
task_ids = {t['id'] for t in tasks}
for task in tasks:
deps = task.get('depends_on', [])
for dep in deps:
if dep not in task_ids:
self.logger.error(f"Invalid dependency: {dep}")
return False
# Create tasks
for task_def in tasks:
if 'depends_on' not in task_def:
# Root task - schedule normally
task = ScheduledTask(
id=f"{name}_{task_def['id']}",
name=task_def['id'],
command=task_def['command'],
schedule_type=ScheduleType.CRON,
schedule_value=task_def.get('schedule', '0 * * * *')
)
self.scheduler.add_task(task)
else:
# Dependent task - will be triggered by parent completion
task = ScheduledTask(
id=f"{name}_{task_def['id']}",
name=task_def['id'],
command=task_def['command'],
schedule_type=ScheduleType.DATE,
schedule_value=datetime.now().isoformat(),
enabled=False # Will be triggered by dependencies
)
self.scheduler.add_task(task)
self.workflows[name] = workflow
self.logger.info(f"Created workflow: {name}")
return True
def execute_workflow(self, name: str):
"""Execute a complete workflow."""
workflow = self.workflows.get(name)
if not workflow:
self.logger.error(f"Workflow not found: {name}")
return
# Build execution order
execution_order = self._topological_sort(workflow['tasks'])
# Execute tasks in order
for task_id in execution_order:
full_task_id = f"{name}_{task_id}"
self.scheduler._execute_task(full_task_id)
# Wait for completion
# In production, use proper synchronization
time.sleep(1)
def _topological_sort(self, tasks: List[Dict]) -> List[str]:
"""Sort tasks by dependencies."""
# Build dependency graph
graph = defaultdict(list)
in_degree = defaultdict(int)
for task in tasks:
task_id = task['id']
deps = task.get('depends_on', [])
for dep in deps:
graph[dep].append(task_id)
in_degree[task_id] += 1
# Find all tasks with no dependencies
queue = []
for task in tasks:
if in_degree[task['id']] == 0:
queue.append(task['id'])
# Process queue
result = []
while queue:
task_id = queue.pop(0)
result.append(task_id)
# Remove this task from dependencies
for dependent in graph[task_id]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return result
# Example usage
if __name__ == "__main__":
# Initialize scheduler
config = {
'db_path': 'scheduler.db',
'log_file': 'scheduler.log',
'enable_notifications': True,
'email_to': ['admin@example.com']
}
scheduler = TaskScheduler(config)
# Example 1: Add a daily backup task
backup_task = ScheduledTask(
id="daily_backup",
name="Daily Database Backup",
command="pg_dump mydb > /backups/mydb_$(date +%Y%m%d).sql",
schedule_type=ScheduleType.DAILY,
schedule_value="02:00",
description="Daily database backup at 2 AM",
working_directory="/opt/scripts",
notify_on_failure=True
)
scheduler.add_task(backup_task)
# Example 2: Add an interval task (every 5 minutes)
monitor_task = ScheduledTask(
id="system_monitor",
name="System Monitor",
command="python /opt/scripts/monitor.py",
schedule_type=ScheduleType.INTERVAL,
schedule_value="5m",
description="Monitor system resources every 5 minutes"
)
scheduler.add_task(monitor_task)
# Example 3: Add a cron-based task
cleanup_task = ScheduledTask(
id="log_cleanup",
name="Log Cleanup",
command="find /var/log -name '*.log' -mtime +30 -delete",
schedule_type=ScheduleType.CRON,
schedule_value="0 3 * * 0", # Every Sunday at 3 AM
description="Clean old log files weekly"
)
scheduler.add_task(cleanup_task)
# Example 4: Platform-specific scheduling
if platform.system() in ['Linux', 'Darwin']:
# Unix/Linux - Use cron
cron_manager = CronManager()
# Add a cron job
cron_manager.add_cron_job(
"0 4 * * *", # 4 AM daily
"/usr/local/bin/backup.sh"
)
# List current crontab
print("\nš
Current Crontab:")
for job in cron_manager.list_crontab():
if job.strip() and not job.startswith('#'):
print(f" {job}")
# Validate and show next runs
expression = "*/15 * * * *" # Every 15 minutes
if cron_manager.validate_cron_expression(expression):
print(f"\nā° Next runs for '{expression}':")
for run_time in cron_manager.get_next_run(expression):
print(f" {run_time}")
elif platform.system() == 'Windows':
# Windows - Use Task Scheduler
win_scheduler = WindowsTaskScheduler()
# Create a Windows task
win_scheduler.create_task(
name="MyApp\\DailyBackup",
command="C:\\Scripts\\backup.bat",
schedule="daily 02:00",
description="Daily backup task"
)
# List tasks
print("\nš Windows Scheduled Tasks:")
for task in win_scheduler.list_tasks("\\MyApp"):
print(f" {task['name']}: {task['status']}")
# Example 5: Create a workflow with dependencies
orchestrator = TaskOrchestrator(scheduler)
workflow_tasks = [
{
'id': 'download',
'command': 'wget https://example.com/data.zip',
'schedule': '0 1 * * *'
},
{
'id': 'extract',
'command': 'unzip data.zip',
'depends_on': ['download']
},
{
'id': 'process',
'command': 'python process_data.py',
'depends_on': ['extract']
},
{
'id': 'upload',
'command': 'aws s3 cp processed_data.csv s3://bucket/',
'depends_on': ['process']
}
]
orchestrator.create_workflow("data_pipeline", workflow_tasks)
# Start scheduler
scheduler.start()
print("\nā
Task scheduler started")
# List all tasks
print("\nš Scheduled Tasks:")
for task in scheduler.list_tasks():
print(f" {task['name']}:")
print(f" Schedule: {task['schedule_type']} - {task['schedule_value']}")
print(f" Enabled: {task['enabled']}")
print(f" Last Run: {task['last_run']}")
print(f" Next Run: {task['next_run']}")
# Check task status
print("\nš Task Status:")
status = scheduler.get_task_status("daily_backup")
if status:
print(f" Task: {status['task']['name']}")
print(f" Recent Executions:")
for exec in status['executions'][:5]:
print(f" {exec['start_time']}: {exec['status']} (exit code: {exec['exit_code']})")
# Keep running for demonstration
try:
print("\nā³ Scheduler is running... Press Ctrl+C to stop")
while True:
time.sleep(60)
except KeyboardInterrupt:
print("\n\nStopping scheduler...")
scheduler.stop()
print("ā
Scheduler stopped")
Key Takeaways and Best Practices šÆ
- Use the Right Tool: Cron for Unix/Linux, Task Scheduler for Windows, Python schedulers for cross-platform.
- Handle Failures Gracefully: Always implement retry logic and notifications for critical tasks.
- Log Everything: Keep detailed logs of task executions for debugging and auditing.
- Test Schedules: Always test cron expressions and schedules before deploying.
- Consider Time Zones: Be explicit about time zones, especially for distributed systems.
- Monitor Task Health: Track success rates, execution times, and resource usage.
- Implement Dependencies: Use workflows for tasks that depend on each other.
Task Scheduling Best Practices š
Task scheduling mastery transforms you from a manual operator to an automation architect. You can orchestrate complex workflows, ensure critical maintenance happens on schedule, and sleep soundly knowing your systems are taking care of themselves. Whether you're managing a single server or a distributed infrastructure, these scheduling skills are essential for reliable automation! š
Pro Tip: Scheduled tasks are the backbone of automation - they ensure critical operations happen reliably without human intervention. Always design with failure in mind: what happens if a task fails? How do you know? Can it recover automatically? Use proper locking mechanisms to prevent task overlap, implement health checks to ensure tasks are actually running, and always have a manual override option. Remember that scheduled tasks run in the background with limited interaction capability, so make them self-sufficient with proper error handling and logging. And most importantly: test your schedules thoroughly - there's nothing worse than discovering your backup task hasn't run for months!