Skip to main content

ā° Scheduled Tasks: Automate with Cron and Task Scheduler

Time is the ultimate automation trigger. Whether it's backing up databases at 2 AM, generating reports every Monday, or cleaning temp files hourly, scheduled tasks are the heartbeat of system automation. Python gives you the power to create, manage, and orchestrate scheduled tasks across any operating system. Let's master the art of temporal automation! šŸ•

The Scheduling Ecosystem

Think of scheduled tasks as your digital workforce that never sleeps. Like a well-orchestrated factory where machines start and stop at precise times, your scheduled tasks ensure critical operations happen exactly when needed - whether you're awake or asleep, at work or on vacation!

graph TB A[Task Scheduling] --> B[Unix/Linux] A --> C[Windows] A --> D[Cross-Platform] B --> E[Cron] B --> F[Systemd Timers] B --> G[At Command] C --> H[Task Scheduler] C --> I[Scheduled Tasks API] C --> J[PowerShell Jobs] D --> K[Python Schedulers] D --> L[APScheduler] D --> M[Schedule Library] E --> N[Crontab Management] E --> O[Cron Expressions] E --> P[User/System Crons] H --> Q[Task Creation] H --> R[Triggers & Actions] H --> S[Task Management] K --> T[In-Process Scheduling] K --> U[Persistent Scheduling] K --> V[Distributed Scheduling] style A fill:#ff6b6b style E fill:#51cf66 style H fill:#339af0 style K fill:#ffd43b

Real-World Scenario: The Automation Orchestra šŸŽ­

You're managing a complex infrastructure where dozens of tasks need perfect timing: database backups at night, report generation before business hours, log rotation every hour, security scans on weekends, and monitoring checks every minute. Let's build a comprehensive scheduling system that conducts this orchestra flawlessly!

import os
import sys
import subprocess
import platform
import schedule
import time
import json
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any, Tuple
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
import logging
import threading
import queue
from croniter import croniter
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import tempfile
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class ScheduleType(Enum):
    """Types of schedules."""
    CRON = "cron"
    INTERVAL = "interval"
    DATE = "date"
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"

class TaskStatus(Enum):
    """Task execution status."""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    MISSED = "missed"
    DISABLED = "disabled"

@dataclass
class ScheduledTask:
    """Scheduled task definition."""
    id: str
    name: str
    command: str
    schedule_type: ScheduleType
    schedule_value: str
    enabled: bool = True
    description: str = ""
    working_directory: str = None
    environment: Dict[str, str] = field(default_factory=dict)
    timeout: int = 3600  # seconds
    retry_count: int = 0
    retry_delay: int = 60  # seconds
    notify_on_failure: bool = True
    notify_on_success: bool = False
    last_run: datetime = None
    next_run: datetime = None
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)

@dataclass
class TaskExecution:
    """Task execution record."""
    task_id: str
    start_time: datetime
    end_time: datetime = None
    status: TaskStatus = TaskStatus.PENDING
    output: str = ""
    error: str = ""
    exit_code: int = None

class TaskScheduler:
    """
    Comprehensive task scheduling system for cross-platform automation.
    """
    
    def __init__(self, config: Dict = None):
        self.config = config or self.get_default_config()
        self.tasks = {}
        self.executions = []
        self.scheduler = None
        self.setup_logging()
        self.setup_database()
        self.setup_scheduler()
        self.load_tasks()
        
    def get_default_config(self) -> Dict:
        """Get default configuration."""
        return {
            'db_path': 'scheduler.db',
            'log_file': 'scheduler.log',
            'max_workers': 10,
            'job_defaults': {
                'coalesce': False,
                'max_instances': 3,
                'misfire_grace_time': 30
            },
            'smtp_server': None,
            'smtp_port': 587,
            'email_from': None,
            'email_to': [],
            'enable_notifications': False
        }
    
    def setup_logging(self):
        """Setup logging configuration."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.config['log_file']),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_database(self):
        """Setup SQLite database for task storage."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        # Create tasks table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                command TEXT NOT NULL,
                schedule_type TEXT,
                schedule_value TEXT,
                enabled BOOLEAN DEFAULT 1,
                description TEXT,
                working_directory TEXT,
                environment TEXT,
                timeout INTEGER DEFAULT 3600,
                retry_count INTEGER DEFAULT 0,
                retry_delay INTEGER DEFAULT 60,
                notify_on_failure BOOLEAN DEFAULT 1,
                notify_on_success BOOLEAN DEFAULT 0,
                last_run DATETIME,
                next_run DATETIME,
                created_at DATETIME,
                updated_at DATETIME
            )
        ''')
        
        # Create executions table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS executions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                task_id TEXT,
                start_time DATETIME,
                end_time DATETIME,
                status TEXT,
                output TEXT,
                error TEXT,
                exit_code INTEGER,
                FOREIGN KEY (task_id) REFERENCES tasks (id),
                INDEX idx_task_id (task_id),
                INDEX idx_start_time (start_time)
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def setup_scheduler(self):
        """Setup APScheduler."""
        jobstores = {
            'default': SQLAlchemyJobStore(
                url=f'sqlite:///{self.config["db_path"]}'
            )
        }
        
        executors = {
            'default': ThreadPoolExecutor(self.config['max_workers']),
            'processpool': ProcessPoolExecutor(5)
        }
        
        job_defaults = self.config['job_defaults']
        
        self.scheduler = BackgroundScheduler(
            jobstores=jobstores,
            executors=executors,
            job_defaults=job_defaults,
            timezone='UTC'
        )
        
        # Add listener for job events
        self.scheduler.add_listener(
            self._job_listener,
            mask=(
                self.scheduler.EVENT_JOB_EXECUTED |
                self.scheduler.EVENT_JOB_ERROR |
                self.scheduler.EVENT_JOB_MISSED
            )
        )
    
    def _job_listener(self, event):
        """Handle scheduler events."""
        if event.exception:
            self.logger.error(f"Job {event.job_id} crashed: {event.exception}")
        else:
            self.logger.info(f"Job {event.job_id} executed successfully")
    
    def load_tasks(self):
        """Load tasks from database."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM tasks WHERE enabled = 1")
        rows = cursor.fetchall()
        
        for row in rows:
            task = ScheduledTask(
                id=row[0],
                name=row[1],
                command=row[2],
                schedule_type=ScheduleType(row[3]),
                schedule_value=row[4],
                enabled=bool(row[5]),
                description=row[6],
                working_directory=row[7],
                environment=json.loads(row[8]) if row[8] else {},
                timeout=row[9],
                retry_count=row[10],
                retry_delay=row[11],
                notify_on_failure=bool(row[12]),
                notify_on_success=bool(row[13]),
                last_run=datetime.fromisoformat(row[14]) if row[14] else None,
                next_run=datetime.fromisoformat(row[15]) if row[15] else None,
                created_at=datetime.fromisoformat(row[16]) if row[16] else datetime.now(),
                updated_at=datetime.fromisoformat(row[17]) if row[17] else datetime.now()
            )
            
            self.tasks[task.id] = task
            
            # Schedule the task
            if task.enabled:
                self._schedule_task(task)
        
        conn.close()
        
        self.logger.info(f"Loaded {len(self.tasks)} tasks")
    
    def add_task(self, task: ScheduledTask) -> bool:
        """Add a new scheduled task."""
        try:
            # Store in memory
            self.tasks[task.id] = task
            
            # Store in database
            conn = sqlite3.connect(self.config['db_path'])
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO tasks VALUES 
                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                task.id,
                task.name,
                task.command,
                task.schedule_type.value,
                task.schedule_value,
                task.enabled,
                task.description,
                task.working_directory,
                json.dumps(task.environment),
                task.timeout,
                task.retry_count,
                task.retry_delay,
                task.notify_on_failure,
                task.notify_on_success,
                task.last_run.isoformat() if task.last_run else None,
                task.next_run.isoformat() if task.next_run else None,
                task.created_at.isoformat(),
                task.updated_at.isoformat()
            ))
            
            conn.commit()
            conn.close()
            
            # Schedule if enabled
            if task.enabled:
                self._schedule_task(task)
            
            self.logger.info(f"Added task: {task.name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to add task: {e}")
            return False
    
    def _schedule_task(self, task: ScheduledTask):
        """Schedule a task with APScheduler."""
        try:
            # Create trigger based on schedule type
            if task.schedule_type == ScheduleType.CRON:
                trigger = CronTrigger.from_crontab(task.schedule_value)
            
            elif task.schedule_type == ScheduleType.INTERVAL:
                # Parse interval (e.g., "5m", "1h", "30s")
                interval = self._parse_interval(task.schedule_value)
                trigger = IntervalTrigger(seconds=interval)
            
            elif task.schedule_type == ScheduleType.DATE:
                # One-time execution
                trigger = DateTrigger(run_date=datetime.fromisoformat(task.schedule_value))
            
            elif task.schedule_type == ScheduleType.DAILY:
                # Daily at specified time
                hour, minute = map(int, task.schedule_value.split(':'))
                trigger = CronTrigger(hour=hour, minute=minute)
            
            elif task.schedule_type == ScheduleType.WEEKLY:
                # Weekly on specified day and time
                day, time_str = task.schedule_value.split(' ')
                hour, minute = map(int, time_str.split(':'))
                trigger = CronTrigger(day_of_week=day.lower(), hour=hour, minute=minute)
            
            elif task.schedule_type == ScheduleType.MONTHLY:
                # Monthly on specified day and time
                day, time_str = task.schedule_value.split(' ')
                hour, minute = map(int, time_str.split(':'))
                trigger = CronTrigger(day=int(day), hour=hour, minute=minute)
            
            else:
                raise ValueError(f"Unknown schedule type: {task.schedule_type}")
            
            # Add job to scheduler
            self.scheduler.add_job(
                func=self._execute_task,
                trigger=trigger,
                args=[task.id],
                id=task.id,
                name=task.name,
                replace_existing=True
            )
            
            # Update next run time
            job = self.scheduler.get_job(task.id)
            if job:
                task.next_run = job.next_run_time
            
            self.logger.info(f"Scheduled task: {task.name}")
            
        except Exception as e:
            self.logger.error(f"Failed to schedule task {task.name}: {e}")
    
    def _parse_interval(self, interval_str: str) -> int:
        """Parse interval string to seconds."""
        units = {
            's': 1,
            'm': 60,
            'h': 3600,
            'd': 86400,
            'w': 604800
        }
        
        # Extract number and unit
        import re
        match = re.match(r'^(\d+)([smhdw])$', interval_str.lower())
        if not match:
            raise ValueError(f"Invalid interval format: {interval_str}")
        
        value = int(match.group(1))
        unit = match.group(2)
        
        return value * units[unit]
    
    def _execute_task(self, task_id: str):
        """Execute a scheduled task."""
        task = self.tasks.get(task_id)
        if not task:
            self.logger.error(f"Task not found: {task_id}")
            return
        
        execution = TaskExecution(
            task_id=task_id,
            start_time=datetime.now()
        )
        
        self.logger.info(f"Executing task: {task.name}")
        
        try:
            # Prepare environment
            env = os.environ.copy()
            if task.environment:
                env.update(task.environment)
            
            # Execute command
            result = subprocess.run(
                task.command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=task.timeout,
                cwd=task.working_directory,
                env=env
            )
            
            execution.end_time = datetime.now()
            execution.exit_code = result.returncode
            execution.output = result.stdout
            execution.error = result.stderr
            
            if result.returncode == 0:
                execution.status = TaskStatus.SUCCESS
                self.logger.info(f"Task {task.name} completed successfully")
                
                if task.notify_on_success:
                    self._send_notification(task, execution)
            else:
                execution.status = TaskStatus.FAILED
                self.logger.error(f"Task {task.name} failed with exit code {result.returncode}")
                
                if task.notify_on_failure:
                    self._send_notification(task, execution)
                
                # Retry if configured
                if task.retry_count > 0:
                    self._retry_task(task, execution)
            
            # Update task last run
            task.last_run = execution.start_time
            self._update_task(task)
            
        except subprocess.TimeoutExpired:
            execution.status = TaskStatus.FAILED
            execution.error = f"Task timed out after {task.timeout} seconds"
            self.logger.error(f"Task {task.name} timed out")
            
            if task.notify_on_failure:
                self._send_notification(task, execution)
            
        except Exception as e:
            execution.status = TaskStatus.FAILED
            execution.error = str(e)
            self.logger.error(f"Task {task.name} failed: {e}")
            
            if task.notify_on_failure:
                self._send_notification(task, execution)
        
        finally:
            # Store execution record
            self._store_execution(execution)
    
    def _retry_task(self, task: ScheduledTask, execution: TaskExecution):
        """Retry a failed task."""
        retry_count = task.retry_count
        
        for i in range(retry_count):
            self.logger.info(f"Retrying task {task.name} (attempt {i+1}/{retry_count})")
            
            # Wait before retry
            time.sleep(task.retry_delay)
            
            # Try again
            retry_execution = TaskExecution(
                task_id=task.id,
                start_time=datetime.now()
            )
            
            try:
                result = subprocess.run(
                    task.command,
                    shell=True,
                    capture_output=True,
                    text=True,
                    timeout=task.timeout,
                    cwd=task.working_directory
                )
                
                retry_execution.end_time = datetime.now()
                retry_execution.exit_code = result.returncode
                retry_execution.output = result.stdout
                retry_execution.error = result.stderr
                
                if result.returncode == 0:
                    retry_execution.status = TaskStatus.SUCCESS
                    self.logger.info(f"Task {task.name} succeeded on retry {i+1}")
                    self._store_execution(retry_execution)
                    return  # Success, no more retries needed
                else:
                    retry_execution.status = TaskStatus.FAILED
                    
            except Exception as e:
                retry_execution.status = TaskStatus.FAILED
                retry_execution.error = str(e)
            
            self._store_execution(retry_execution)
        
        self.logger.error(f"Task {task.name} failed after {retry_count} retries")
    
    def _update_task(self, task: ScheduledTask):
        """Update task in database."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute('''
            UPDATE tasks SET 
                last_run = ?,
                next_run = ?,
                updated_at = ?
            WHERE id = ?
        ''', (
            task.last_run.isoformat() if task.last_run else None,
            task.next_run.isoformat() if task.next_run else None,
            datetime.now().isoformat(),
            task.id
        ))
        
        conn.commit()
        conn.close()
    
    def _store_execution(self, execution: TaskExecution):
        """Store execution record in database."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO executions 
            (task_id, start_time, end_time, status, output, error, exit_code)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            execution.task_id,
            execution.start_time.isoformat(),
            execution.end_time.isoformat() if execution.end_time else None,
            execution.status.value,
            execution.output,
            execution.error,
            execution.exit_code
        ))
        
        conn.commit()
        conn.close()
    
    def _send_notification(self, task: ScheduledTask, execution: TaskExecution):
        """Send notification about task execution."""
        if not self.config['enable_notifications']:
            return
        
        try:
            subject = f"Task {task.name}: {execution.status.value}"
            
            body = f"""
Task: {task.name}
Status: {execution.status.value}
Start Time: {execution.start_time}
End Time: {execution.end_time}
Exit Code: {execution.exit_code}

Output:
{execution.output[:1000]}

Error:
{execution.error[:1000]}
            """
            
            # Send email
            if self.config['smtp_server'] and self.config['email_to']:
                self._send_email(subject, body)
            
        except Exception as e:
            self.logger.error(f"Failed to send notification: {e}")
    
    def _send_email(self, subject: str, body: str):
        """Send email notification."""
        msg = MIMEMultipart()
        msg['From'] = self.config['email_from']
        msg['To'] = ', '.join(self.config['email_to'])
        msg['Subject'] = subject
        
        msg.attach(MIMEText(body, 'plain'))
        
        with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
            server.starttls()
            if self.config.get('smtp_username') and self.config.get('smtp_password'):
                server.login(self.config['smtp_username'], self.config['smtp_password'])
            
            server.send_message(msg)
    
    def start(self):
        """Start the scheduler."""
        self.scheduler.start()
        self.logger.info("Scheduler started")
    
    def stop(self):
        """Stop the scheduler."""
        self.scheduler.shutdown()
        self.logger.info("Scheduler stopped")
    
    def get_task_status(self, task_id: str) -> Dict:
        """Get task status and history."""
        task = self.tasks.get(task_id)
        if not task:
            return None
        
        # Get recent executions
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT * FROM executions 
            WHERE task_id = ?
            ORDER BY start_time DESC
            LIMIT 10
        ''', (task_id,))
        
        executions = cursor.fetchall()
        conn.close()
        
        return {
            'task': asdict(task),
            'executions': [
                {
                    'start_time': e[2],
                    'end_time': e[3],
                    'status': e[4],
                    'exit_code': e[7]
                }
                for e in executions
            ]
        }
    
    def list_tasks(self) -> List[Dict]:
        """List all scheduled tasks."""
        return [asdict(task) for task in self.tasks.values()]
    
    def enable_task(self, task_id: str):
        """Enable a task."""
        task = self.tasks.get(task_id)
        if task:
            task.enabled = True
            self._update_task(task)
            self._schedule_task(task)
            self.logger.info(f"Enabled task: {task.name}")
    
    def disable_task(self, task_id: str):
        """Disable a task."""
        task = self.tasks.get(task_id)
        if task:
            task.enabled = False
            self._update_task(task)
            self.scheduler.remove_job(task_id)
            self.logger.info(f"Disabled task: {task.name}")
    
    def delete_task(self, task_id: str):
        """Delete a task."""
        if task_id in self.tasks:
            # Remove from scheduler
            if self.scheduler.get_job(task_id):
                self.scheduler.remove_job(task_id)
            
            # Remove from memory
            del self.tasks[task_id]
            
            # Remove from database
            conn = sqlite3.connect(self.config['db_path'])
            cursor = conn.cursor()
            
            cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
            cursor.execute("DELETE FROM executions WHERE task_id = ?", (task_id,))
            
            conn.commit()
            conn.close()
            
            self.logger.info(f"Deleted task: {task_id}")

class CronManager:
    """
    Manage system cron jobs on Unix/Linux systems.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        if platform.system() not in ['Linux', 'Darwin']:
            raise OSError("CronManager only works on Unix-like systems")
    
    def list_crontab(self, user: str = None) -> List[str]:
        """List crontab entries."""
        cmd = ['crontab', '-l']
        if user:
            cmd.extend(['-u', user])
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                return result.stdout.strip().split('\n')
            else:
                return []
                
        except Exception as e:
            self.logger.error(f"Failed to list crontab: {e}")
            return []
    
    def add_cron_job(self, schedule: str, command: str, user: str = None) -> bool:
        """Add a cron job."""
        try:
            # Get current crontab
            current_crontab = self.list_crontab(user)
            
            # Add new job
            new_job = f"{schedule} {command}"
            current_crontab.append(new_job)
            
            # Write back
            return self._write_crontab(current_crontab, user)
            
        except Exception as e:
            self.logger.error(f"Failed to add cron job: {e}")
            return False
    
    def remove_cron_job(self, pattern: str, user: str = None) -> bool:
        """Remove cron jobs matching pattern."""
        try:
            # Get current crontab
            current_crontab = self.list_crontab(user)
            
            # Filter out matching jobs
            filtered_crontab = [
                line for line in current_crontab
                if pattern not in line
            ]
            
            # Write back
            return self._write_crontab(filtered_crontab, user)
            
        except Exception as e:
            self.logger.error(f"Failed to remove cron job: {e}")
            return False
    
    def _write_crontab(self, lines: List[str], user: str = None) -> bool:
        """Write crontab entries."""
        try:
            # Create temporary file
            with tempfile.NamedTemporaryFile(mode='w', suffix='.cron', delete=False) as f:
                f.write('\n'.join(lines))
                f.write('\n')
                temp_file = f.name
            
            # Install new crontab
            cmd = ['crontab', temp_file]
            if user:
                cmd.extend(['-u', user])
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            # Clean up
            os.unlink(temp_file)
            
            return result.returncode == 0
            
        except Exception as e:
            self.logger.error(f"Failed to write crontab: {e}")
            return False
    
    def validate_cron_expression(self, expression: str) -> bool:
        """Validate cron expression."""
        try:
            croniter(expression)
            return True
        except:
            return False
    
    def get_next_run(self, expression: str, n: int = 5) -> List[datetime]:
        """Get next N run times for a cron expression."""
        try:
            cron = croniter(expression, datetime.now())
            return [cron.get_next(datetime) for _ in range(n)]
        except:
            return []

class WindowsTaskScheduler:
    """
    Manage Windows Task Scheduler tasks.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        if platform.system() != 'Windows':
            raise OSError("WindowsTaskScheduler only works on Windows")
    
    def list_tasks(self, folder: str = "\\") -> List[Dict]:
        """List scheduled tasks."""
        try:
            # Use schtasks command
            result = subprocess.run(
                ['schtasks', '/query', '/fo', 'csv', '/v'],
                capture_output=True,
                text=True,
                shell=True
            )
            
            if result.returncode != 0:
                return []
            
            # Parse CSV output
            import csv
            reader = csv.DictReader(result.stdout.splitlines())
            
            tasks = []
            for row in reader:
                if row['TaskName'].startswith(folder):
                    tasks.append({
                        'name': row['TaskName'],
                        'status': row['Status'],
                        'next_run': row['Next Run Time'],
                        'last_run': row['Last Run Time']
                    })
            
            return tasks
            
        except Exception as e:
            self.logger.error(f"Failed to list tasks: {e}")
            return []
    
    def create_task(self, name: str, command: str, 
                   schedule: str, description: str = "") -> bool:
        """Create a scheduled task."""
        try:
            # Build schtasks command
            cmd = [
                'schtasks', '/create',
                '/tn', name,
                '/tr', command,
                '/sc', self._parse_schedule_type(schedule),
                '/f'  # Force create
            ]
            
            if description:
                cmd.extend(['/desc', description])
            
            # Add schedule-specific parameters
            schedule_params = self._parse_schedule_params(schedule)
            cmd.extend(schedule_params)
            
            result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
            
            if result.returncode == 0:
                self.logger.info(f"Created task: {name}")
                return True
            else:
                self.logger.error(f"Failed to create task: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"Failed to create task: {e}")
            return False
    
    def delete_task(self, name: str) -> bool:
        """Delete a scheduled task."""
        try:
            result = subprocess.run(
                ['schtasks', '/delete', '/tn', name, '/f'],
                capture_output=True,
                text=True,
                shell=True
            )
            
            if result.returncode == 0:
                self.logger.info(f"Deleted task: {name}")
                return True
            else:
                self.logger.error(f"Failed to delete task: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"Failed to delete task: {e}")
            return False
    
    def run_task(self, name: str) -> bool:
        """Run a task immediately."""
        try:
            result = subprocess.run(
                ['schtasks', '/run', '/tn', name],
                capture_output=True,
                text=True,
                shell=True
            )
            
            return result.returncode == 0
            
        except Exception as e:
            self.logger.error(f"Failed to run task: {e}")
            return False
    
    def _parse_schedule_type(self, schedule: str) -> str:
        """Parse schedule type for schtasks."""
        schedule_lower = schedule.lower()
        
        if 'daily' in schedule_lower:
            return 'daily'
        elif 'weekly' in schedule_lower:
            return 'weekly'
        elif 'monthly' in schedule_lower:
            return 'monthly'
        elif 'once' in schedule_lower:
            return 'once'
        elif 'onstart' in schedule_lower:
            return 'onstart'
        elif 'onlogon' in schedule_lower:
            return 'onlogon'
        else:
            return 'daily'  # Default
    
    def _parse_schedule_params(self, schedule: str) -> List[str]:
        """Parse schedule parameters for schtasks."""
        params = []
        
        # Extract time if present
        import re
        time_match = re.search(r'(\d{1,2}):(\d{2})', schedule)
        if time_match:
            params.extend(['/st', time_match.group(0)])
        
        # Extract day for weekly/monthly
        if 'monday' in schedule.lower():
            params.extend(['/d', 'MON'])
        elif 'tuesday' in schedule.lower():
            params.extend(['/d', 'TUE'])
        # ... etc
        
        return params

class TaskOrchestrator:
    """
    High-level task orchestration with dependencies and workflows.
    """
    
    def __init__(self, scheduler: TaskScheduler):
        self.scheduler = scheduler
        self.workflows = {}
        self.logger = logging.getLogger(__name__)
    
    def create_workflow(self, name: str, tasks: List[Dict]) -> bool:
        """
        Create a workflow with dependent tasks.
        
        tasks = [
            {'id': 'backup', 'command': '...', 'schedule': '0 2 * * *'},
            {'id': 'compress', 'command': '...', 'depends_on': ['backup']},
            {'id': 'upload', 'command': '...', 'depends_on': ['compress']}
        ]
        """
        workflow = {
            'name': name,
            'tasks': tasks,
            'created_at': datetime.now()
        }
        
        # Validate dependencies
        task_ids = {t['id'] for t in tasks}
        for task in tasks:
            deps = task.get('depends_on', [])
            for dep in deps:
                if dep not in task_ids:
                    self.logger.error(f"Invalid dependency: {dep}")
                    return False
        
        # Create tasks
        for task_def in tasks:
            if 'depends_on' not in task_def:
                # Root task - schedule normally
                task = ScheduledTask(
                    id=f"{name}_{task_def['id']}",
                    name=task_def['id'],
                    command=task_def['command'],
                    schedule_type=ScheduleType.CRON,
                    schedule_value=task_def.get('schedule', '0 * * * *')
                )
                self.scheduler.add_task(task)
            else:
                # Dependent task - will be triggered by parent completion
                task = ScheduledTask(
                    id=f"{name}_{task_def['id']}",
                    name=task_def['id'],
                    command=task_def['command'],
                    schedule_type=ScheduleType.DATE,
                    schedule_value=datetime.now().isoformat(),
                    enabled=False  # Will be triggered by dependencies
                )
                self.scheduler.add_task(task)
        
        self.workflows[name] = workflow
        self.logger.info(f"Created workflow: {name}")
        return True
    
    def execute_workflow(self, name: str):
        """Execute a complete workflow."""
        workflow = self.workflows.get(name)
        if not workflow:
            self.logger.error(f"Workflow not found: {name}")
            return
        
        # Build execution order
        execution_order = self._topological_sort(workflow['tasks'])
        
        # Execute tasks in order
        for task_id in execution_order:
            full_task_id = f"{name}_{task_id}"
            self.scheduler._execute_task(full_task_id)
            
            # Wait for completion
            # In production, use proper synchronization
            time.sleep(1)
    
    def _topological_sort(self, tasks: List[Dict]) -> List[str]:
        """Sort tasks by dependencies."""
        # Build dependency graph
        graph = defaultdict(list)
        in_degree = defaultdict(int)
        
        for task in tasks:
            task_id = task['id']
            deps = task.get('depends_on', [])
            
            for dep in deps:
                graph[dep].append(task_id)
                in_degree[task_id] += 1
        
        # Find all tasks with no dependencies
        queue = []
        for task in tasks:
            if in_degree[task['id']] == 0:
                queue.append(task['id'])
        
        # Process queue
        result = []
        while queue:
            task_id = queue.pop(0)
            result.append(task_id)
            
            # Remove this task from dependencies
            for dependent in graph[task_id]:
                in_degree[dependent] -= 1
                if in_degree[dependent] == 0:
                    queue.append(dependent)
        
        return result

# Example usage
if __name__ == "__main__":
    # Initialize scheduler
    config = {
        'db_path': 'scheduler.db',
        'log_file': 'scheduler.log',
        'enable_notifications': True,
        'email_to': ['admin@example.com']
    }
    
    scheduler = TaskScheduler(config)
    
    # Example 1: Add a daily backup task
    backup_task = ScheduledTask(
        id="daily_backup",
        name="Daily Database Backup",
        command="pg_dump mydb > /backups/mydb_$(date +%Y%m%d).sql",
        schedule_type=ScheduleType.DAILY,
        schedule_value="02:00",
        description="Daily database backup at 2 AM",
        working_directory="/opt/scripts",
        notify_on_failure=True
    )
    scheduler.add_task(backup_task)
    
    # Example 2: Add an interval task (every 5 minutes)
    monitor_task = ScheduledTask(
        id="system_monitor",
        name="System Monitor",
        command="python /opt/scripts/monitor.py",
        schedule_type=ScheduleType.INTERVAL,
        schedule_value="5m",
        description="Monitor system resources every 5 minutes"
    )
    scheduler.add_task(monitor_task)
    
    # Example 3: Add a cron-based task
    cleanup_task = ScheduledTask(
        id="log_cleanup",
        name="Log Cleanup",
        command="find /var/log -name '*.log' -mtime +30 -delete",
        schedule_type=ScheduleType.CRON,
        schedule_value="0 3 * * 0",  # Every Sunday at 3 AM
        description="Clean old log files weekly"
    )
    scheduler.add_task(cleanup_task)
    
    # Example 4: Platform-specific scheduling
    if platform.system() in ['Linux', 'Darwin']:
        # Unix/Linux - Use cron
        cron_manager = CronManager()
        
        # Add a cron job
        cron_manager.add_cron_job(
            "0 4 * * *",  # 4 AM daily
            "/usr/local/bin/backup.sh"
        )
        
        # List current crontab
        print("\nšŸ“… Current Crontab:")
        for job in cron_manager.list_crontab():
            if job.strip() and not job.startswith('#'):
                print(f"  {job}")
        
        # Validate and show next runs
        expression = "*/15 * * * *"  # Every 15 minutes
        if cron_manager.validate_cron_expression(expression):
            print(f"\nā° Next runs for '{expression}':")
            for run_time in cron_manager.get_next_run(expression):
                print(f"  {run_time}")
    
    elif platform.system() == 'Windows':
        # Windows - Use Task Scheduler
        win_scheduler = WindowsTaskScheduler()
        
        # Create a Windows task
        win_scheduler.create_task(
            name="MyApp\\DailyBackup",
            command="C:\\Scripts\\backup.bat",
            schedule="daily 02:00",
            description="Daily backup task"
        )
        
        # List tasks
        print("\nšŸ“‹ Windows Scheduled Tasks:")
        for task in win_scheduler.list_tasks("\\MyApp"):
            print(f"  {task['name']}: {task['status']}")
    
    # Example 5: Create a workflow with dependencies
    orchestrator = TaskOrchestrator(scheduler)
    
    workflow_tasks = [
        {
            'id': 'download',
            'command': 'wget https://example.com/data.zip',
            'schedule': '0 1 * * *'
        },
        {
            'id': 'extract',
            'command': 'unzip data.zip',
            'depends_on': ['download']
        },
        {
            'id': 'process',
            'command': 'python process_data.py',
            'depends_on': ['extract']
        },
        {
            'id': 'upload',
            'command': 'aws s3 cp processed_data.csv s3://bucket/',
            'depends_on': ['process']
        }
    ]
    
    orchestrator.create_workflow("data_pipeline", workflow_tasks)
    
    # Start scheduler
    scheduler.start()
    print("\nāœ… Task scheduler started")
    
    # List all tasks
    print("\nšŸ“‹ Scheduled Tasks:")
    for task in scheduler.list_tasks():
        print(f"  {task['name']}:")
        print(f"    Schedule: {task['schedule_type']} - {task['schedule_value']}")
        print(f"    Enabled: {task['enabled']}")
        print(f"    Last Run: {task['last_run']}")
        print(f"    Next Run: {task['next_run']}")
    
    # Check task status
    print("\nšŸ“Š Task Status:")
    status = scheduler.get_task_status("daily_backup")
    if status:
        print(f"  Task: {status['task']['name']}")
        print(f"  Recent Executions:")
        for exec in status['executions'][:5]:
            print(f"    {exec['start_time']}: {exec['status']} (exit code: {exec['exit_code']})")
    
    # Keep running for demonstration
    try:
        print("\nā³ Scheduler is running... Press Ctrl+C to stop")
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        print("\n\nStopping scheduler...")
        scheduler.stop()
        print("āœ… Scheduler stopped")

Key Takeaways and Best Practices šŸŽÆ

Task Scheduling Best Practices šŸ“‹

Pro Tip: Scheduled tasks are the backbone of automation - they ensure critical operations happen reliably without human intervention. Always design with failure in mind: what happens if a task fails? How do you know? Can it recover automatically? Use proper locking mechanisms to prevent task overlap, implement health checks to ensure tasks are actually running, and always have a manual override option. Remember that scheduled tasks run in the background with limited interaction capability, so make them self-sufficient with proper error handling and logging. And most importantly: test your schedules thoroughly - there's nothing worse than discovering your backup task hasn't run for months!

Task scheduling mastery transforms you from a manual operator to an automation architect. You can orchestrate complex workflows, ensure critical maintenance happens on schedule, and sleep soundly knowing your systems are taking care of themselves. Whether you're managing a single server or a distributed infrastructure, these scheduling skills are essential for reliable automation! šŸš€