Skip to main content

šŸ’¾ Backup Automation: Never Lose Important Files Again

Imagine this: Your hard drive crashes, your laptop gets stolen, or you accidentally delete that crucial project you've been working on for months. Without backups, it's gone forever. But with automated backup systems, you're invincible! Think of backups as time travel for your files – you can always go back! ā°

The 3-2-1 Backup Philosophy

The golden rule of backups: Keep 3 copies of important data, on 2 different media types, with 1 copy off-site. It's like having a safety net under your safety net, with another safety net in a different building! Let's build systems that implement this automatically.

graph TB A[Original Files] --> B[Local Backup] A --> C[Network Backup] A --> D[Cloud Backup] B --> E[Version Control] B --> F[Incremental Saves] C --> G[NAS/Server] C --> H[Remote Location] D --> I[Cloud Storage] D --> J[Multiple Providers] E --> K[Restoration Point] F --> K G --> K H --> K I --> K J --> K style A fill:#ff6b6b style K fill:#51cf66

Real-World Scenario: The Complete Disaster Recovery System šŸ›”ļø

You're a freelance designer with years of client work, a developer with countless projects, or a photographer with irreplaceable photos. Let's build a backup system that protects everything automatically, efficiently, and intelligently!

import os
import shutil
import hashlib
import json
import sqlite3
import zipfile
import tarfile
import boto3  # For AWS S3
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set, Tuple
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
import logging

class ComprehensiveBackupSystem:
    """
    A complete backup solution with local, network, and cloud backups,
    incremental backups, version control, and automatic restoration.
    """
    
    def __init__(self, source_dirs: List[str], backup_config: Dict):
        self.source_dirs = [Path(d) for d in source_dirs]
        self.config = self.load_config(backup_config)
        
        # Setup backup destinations
        self.local_backup = Path(self.config['local_backup_path'])
        self.network_backup = Path(self.config.get('network_backup_path', ''))
        self.cloud_config = self.config.get('cloud', {})
        
        # Create backup directories
        self.local_backup.mkdir(parents=True, exist_ok=True)
        
        # Setup database for tracking
        self.db_path = self.local_backup / 'backup_catalog.db'
        self.init_database()
        
        # Setup logging
        self.setup_logging()
        
        # Backup strategies
        self.strategies = {
            'full': self.full_backup,
            'incremental': self.incremental_backup,
            'differential': self.differential_backup,
            'mirror': self.mirror_backup
        }
        
        # File filters
        self.include_patterns = self.config.get('include_patterns', ['*'])
        self.exclude_patterns = self.config.get('exclude_patterns', [
            '*.tmp', '*.temp', '~*', '.DS_Store', 'Thumbs.db',
            '__pycache__', 'node_modules', '.git/objects'
        ])
        
        # Performance settings
        self.chunk_size = self.config.get('chunk_size', 65536)
        self.compression_level = self.config.get('compression_level', 6)
        self.parallel_workers = self.config.get('parallel_workers', 4)
        
        # Encryption settings (if enabled)
        self.encryption_enabled = self.config.get('encryption', {}).get('enabled', False)
        if self.encryption_enabled:
            from cryptography.fernet import Fernet
            key = self.config['encryption'].get('key')
            if not key:
                key = Fernet.generate_key()
                self.config['encryption']['key'] = key.decode()
                self.save_config()
            self.cipher = Fernet(key.encode() if isinstance(key, str) else key)
    
    def load_config(self, config_dict: Dict) -> Dict:
        """Load and validate backup configuration."""
        default_config = {
            'local_backup_path': '/backup/local',
            'backup_strategy': 'incremental',
            'retention_days': 30,
            'max_versions': 10,
            'compression': True,
            'verify_after_backup': True,
            'backup_schedule': {
                'full_backup_interval_days': 7,
                'incremental_interval_hours': 6
            }
        }
        
        # Merge with provided config
        for key, value in config_dict.items():
            default_config[key] = value
        
        return default_config
    
    def save_config(self):
        """Save configuration to file."""
        config_file = self.local_backup / 'backup_config.json'
        with open(config_file, 'w') as f:
            json.dump(self.config, f, indent=2)
    
    def setup_logging(self):
        """Setup comprehensive logging."""
        log_dir = self.local_backup / 'logs'
        log_dir.mkdir(exist_ok=True)
        
        log_file = log_dir / f"backup_{datetime.now().strftime('%Y%m%d')}.log"
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def init_database(self):
        """Initialize SQLite database for backup tracking."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Backup sets table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS backup_sets (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                backup_id TEXT UNIQUE,
                backup_type TEXT,
                source_paths TEXT,
                destination_path TEXT,
                start_time DATETIME,
                end_time DATETIME,
                total_files INTEGER,
                total_size INTEGER,
                status TEXT,
                error_message TEXT
            )
        ''')
        
        # File records table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS file_records (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                backup_id TEXT,
                file_path TEXT,
                file_hash TEXT,
                file_size INTEGER,
                modified_time DATETIME,
                backed_up BOOLEAN,
                FOREIGN KEY (backup_id) REFERENCES backup_sets(backup_id)
            )
        ''')
        
        # Version history table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS version_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                file_path TEXT,
                version_number INTEGER,
                backup_id TEXT,
                file_hash TEXT,
                backup_time DATETIME,
                UNIQUE(file_path, version_number)
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def calculate_file_hash(self, file_path: Path) -> str:
        """Calculate SHA256 hash of a file."""
        sha256 = hashlib.sha256()
        try:
            with open(file_path, 'rb') as f:
                for chunk in iter(lambda: f.read(self.chunk_size), b''):
                    sha256.update(chunk)
            return sha256.hexdigest()
        except Exception as e:
            self.logger.error(f"Error hashing {file_path}: {e}")
            return ""
    
    def should_backup_file(self, file_path: Path) -> bool:
        """Determine if a file should be backed up based on filters."""
        # Check exclude patterns
        for pattern in self.exclude_patterns:
            if file_path.match(pattern):
                return False
        
        # Check include patterns
        if self.include_patterns != ['*']:
            included = False
            for pattern in self.include_patterns:
                if file_path.match(pattern):
                    included = True
                    break
            if not included:
                return False
        
        # Check file size limits
        max_size = self.config.get('max_file_size_mb', 1000) * 1024 * 1024
        if file_path.stat().st_size > max_size:
            self.logger.warning(f"Skipping {file_path.name}: exceeds size limit")
            return False
        
        return True
    
    def full_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
        """
        Perform a full backup of all files.
        Returns (files_backed_up, total_size)
        """
        self.logger.info(f"Starting full backup of {source_dir}")
        
        backup_dir = self.local_backup / 'full' / backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        files_backed_up = 0
        total_size = 0
        errors = []
        
        # Collect all files to backup
        files_to_backup = []
        for file_path in source_dir.rglob('*'):
            if file_path.is_file() and self.should_backup_file(file_path):
                files_to_backup.append(file_path)
        
        # Backup files with progress tracking
        total_files = len(files_to_backup)
        self.logger.info(f"Found {total_files} files to backup")
        
        with ThreadPoolExecutor(max_workers=self.parallel_workers) as executor:
            futures = []
            for file_path in files_to_backup:
                future = executor.submit(
                    self.backup_single_file,
                    file_path, source_dir, backup_dir, backup_id
                )
                futures.append((file_path, future))
            
            for file_path, future in futures:
                try:
                    success, file_size = future.result()
                    if success:
                        files_backed_up += 1
                        total_size += file_size
                        
                        # Progress update
                        if files_backed_up % 100 == 0:
                            progress = (files_backed_up / total_files) * 100
                            self.logger.info(f"Progress: {progress:.1f}% ({files_backed_up}/{total_files})")
                except Exception as e:
                    errors.append((file_path, str(e)))
                    self.logger.error(f"Error backing up {file_path}: {e}")
        
        # Create backup manifest
        self.create_manifest(backup_dir, backup_id, files_backed_up, total_size)
        
        # Report errors if any
        if errors:
            self.logger.warning(f"Backup completed with {len(errors)} errors")
            for file_path, error in errors[:10]:  # Show first 10 errors
                self.logger.error(f"  {file_path}: {error}")
        
        return files_backed_up, total_size
    
    def backup_single_file(self, file_path: Path, source_base: Path, 
                          backup_dir: Path, backup_id: str) -> Tuple[bool, int]:
        """
        Backup a single file with compression and encryption if enabled.
        Returns (success, file_size)
        """
        try:
            # Calculate relative path
            rel_path = file_path.relative_to(source_base)
            dest_path = backup_dir / rel_path
            
            # Create destination directory
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            
            # Read file
            with open(file_path, 'rb') as f:
                data = f.read()
            
            file_size = len(data)
            
            # Encrypt if enabled
            if self.encryption_enabled:
                data = self.cipher.encrypt(data)
                dest_path = dest_path.with_suffix(dest_path.suffix + '.enc')
            
            # Compress if enabled
            if self.config.get('compression', True):
                import gzip
                data = gzip.compress(data, compresslevel=self.compression_level)
                dest_path = dest_path.with_suffix(dest_path.suffix + '.gz')
            
            # Write backup file
            with open(dest_path, 'wb') as f:
                f.write(data)
            
            # Update database
            self.update_file_record(backup_id, file_path, file_size)
            
            return True, file_size
            
        except Exception as e:
            self.logger.error(f"Failed to backup {file_path}: {e}")
            return False, 0
    
    def incremental_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
        """
        Perform incremental backup - only backup changed files since last backup.
        """
        self.logger.info(f"Starting incremental backup of {source_dir}")
        
        # Get last backup info
        last_backup = self.get_last_backup_info(source_dir)
        
        if not last_backup:
            self.logger.info("No previous backup found, performing full backup")
            return self.full_backup(source_dir, backup_id)
        
        backup_dir = self.local_backup / 'incremental' / backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        files_backed_up = 0
        total_size = 0
        
        # Find changed files
        changed_files = self.find_changed_files(source_dir, last_backup['backup_id'])
        
        self.logger.info(f"Found {len(changed_files)} changed files")
        
        # Backup changed files
        for file_path in changed_files:
            success, file_size = self.backup_single_file(
                file_path, source_dir, backup_dir, backup_id
            )
            if success:
                files_backed_up += 1
                total_size += file_size
        
        # Create incremental manifest
        self.create_incremental_manifest(
            backup_dir, backup_id, last_backup['backup_id'],
            files_backed_up, total_size
        )
        
        return files_backed_up, total_size
    
    def differential_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
        """
        Perform differential backup - backup all changes since last full backup.
        """
        self.logger.info(f"Starting differential backup of {source_dir}")
        
        # Get last full backup info
        last_full_backup = self.get_last_full_backup_info(source_dir)
        
        if not last_full_backup:
            self.logger.info("No previous full backup found, performing full backup")
            return self.full_backup(source_dir, backup_id)
        
        backup_dir = self.local_backup / 'differential' / backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        files_backed_up = 0
        total_size = 0
        
        # Find all changes since last full backup
        changed_files = self.find_changed_files(
            source_dir, last_full_backup['backup_id']
        )
        
        self.logger.info(f"Found {len(changed_files)} changed files since last full backup")
        
        # Backup changed files
        for file_path in changed_files:
            success, file_size = self.backup_single_file(
                file_path, source_dir, backup_dir, backup_id
            )
            if success:
                files_backed_up += 1
                total_size += file_size
        
        return files_backed_up, total_size
    
    def mirror_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
        """
        Create an exact mirror of the source directory.
        Removes files from backup that no longer exist in source.
        """
        self.logger.info(f"Starting mirror backup of {source_dir}")
        
        backup_dir = self.local_backup / 'mirror' / source_dir.name
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        files_backed_up = 0
        total_size = 0
        files_removed = 0
        
        # Get all source files
        source_files = {}
        for file_path in source_dir.rglob('*'):
            if file_path.is_file() and self.should_backup_file(file_path):
                rel_path = file_path.relative_to(source_dir)
                source_files[str(rel_path)] = file_path
        
        # Get all backup files
        backup_files = {}
        for file_path in backup_dir.rglob('*'):
            if file_path.is_file():
                rel_path = file_path.relative_to(backup_dir)
                # Remove compression/encryption extensions for comparison
                clean_path = str(rel_path).replace('.gz', '').replace('.enc', '')
                backup_files[clean_path] = file_path
        
        # Remove files that no longer exist in source
        for rel_path, backup_path in backup_files.items():
            if rel_path not in source_files:
                backup_path.unlink()
                files_removed += 1
                self.logger.info(f"Removed from mirror: {rel_path}")
        
        # Update changed files
        for rel_path, source_path in source_files.items():
            backup_path = backup_dir / rel_path
            
            # Check if backup exists and is up to date
            needs_backup = True
            if backup_path.exists() or backup_path.with_suffix('.gz').exists():
                source_mtime = source_path.stat().st_mtime
                backup_mtime = (backup_path if backup_path.exists() 
                               else backup_path.with_suffix('.gz')).stat().st_mtime
                
                if source_mtime <= backup_mtime:
                    needs_backup = False
            
            if needs_backup:
                success, file_size = self.backup_single_file(
                    source_path, source_dir, backup_dir, backup_id
                )
                if success:
                    files_backed_up += 1
                    total_size += file_size
        
        self.logger.info(f"Mirror complete: {files_backed_up} updated, {files_removed} removed")
        return files_backed_up, total_size
    
    def find_changed_files(self, source_dir: Path, since_backup_id: str) -> List[Path]:
        """Find files that have changed since a specific backup."""
        changed_files = []
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Get file records from last backup
        cursor.execute('''
            SELECT file_path, file_hash, modified_time 
            FROM file_records 
            WHERE backup_id = ?
        ''', (since_backup_id,))
        
        last_backup_files = {}
        for row in cursor.fetchall():
            last_backup_files[row[0]] = {
                'hash': row[1],
                'modified': datetime.fromisoformat(row[2])
            }
        
        # Check current files
        for file_path in source_dir.rglob('*'):
            if not file_path.is_file() or not self.should_backup_file(file_path):
                continue
            
            str_path = str(file_path)
            
            # New file
            if str_path not in last_backup_files:
                changed_files.append(file_path)
                continue
            
            # Check if modified
            current_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
            if current_mtime > last_backup_files[str_path]['modified']:
                # Verify actual change with hash
                current_hash = self.calculate_file_hash(file_path)
                if current_hash != last_backup_files[str_path]['hash']:
                    changed_files.append(file_path)
        
        conn.close()
        return changed_files
    
    def update_file_record(self, backup_id: str, file_path: Path, file_size: int):
        """Update file record in database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        file_hash = self.calculate_file_hash(file_path)
        
        cursor.execute('''
            INSERT INTO file_records 
            (backup_id, file_path, file_hash, file_size, modified_time, backed_up)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            backup_id,
            str(file_path),
            file_hash,
            file_size,
            datetime.fromtimestamp(file_path.stat().st_mtime),
            True
        ))
        
        # Update version history
        cursor.execute('''
            SELECT MAX(version_number) FROM version_history WHERE file_path = ?
        ''', (str(file_path),))
        
        result = cursor.fetchone()
        version = (result[0] + 1) if result[0] else 1
        
        cursor.execute('''
            INSERT INTO version_history 
            (file_path, version_number, backup_id, file_hash, backup_time)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            str(file_path),
            version,
            backup_id,
            file_hash,
            datetime.now()
        ))
        
        conn.commit()
        conn.close()
    
    def get_last_backup_info(self, source_dir: Path) -> Optional[Dict]:
        """Get information about the last backup for a source directory."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT backup_id, backup_type, end_time, total_files, total_size
            FROM backup_sets
            WHERE source_paths LIKE ? AND status = 'completed'
            ORDER BY end_time DESC
            LIMIT 1
        ''', (f'%{str(source_dir)}%',))
        
        result = cursor.fetchone()
        conn.close()
        
        if result:
            return {
                'backup_id': result[0],
                'backup_type': result[1],
                'end_time': result[2],
                'total_files': result[3],
                'total_size': result[4]
            }
        return None
    
    def get_last_full_backup_info(self, source_dir: Path) -> Optional[Dict]:
        """Get information about the last full backup."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT backup_id, end_time, total_files, total_size
            FROM backup_sets
            WHERE source_paths LIKE ? AND backup_type = 'full' AND status = 'completed'
            ORDER BY end_time DESC
            LIMIT 1
        ''', (f'%{str(source_dir)}%',))
        
        result = cursor.fetchone()
        conn.close()
        
        if result:
            return {
                'backup_id': result[0],
                'end_time': result[1],
                'total_files': result[2],
                'total_size': result[3]
            }
        return None
    
    def create_manifest(self, backup_dir: Path, backup_id: str, 
                       files_count: int, total_size: int):
        """Create a backup manifest file with all backup information."""
        manifest = {
            'backup_id': backup_id,
            'timestamp': datetime.now().isoformat(),
            'files_count': files_count,
            'total_size': total_size,
            'compression': self.config.get('compression', True),
            'encryption': self.encryption_enabled,
            'source_dirs': [str(d) for d in self.source_dirs]
        }
        
        manifest_file = backup_dir / 'manifest.json'
        with open(manifest_file, 'w') as f:
            json.dump(manifest, f, indent=2)
    
    def create_incremental_manifest(self, backup_dir: Path, backup_id: str,
                                   base_backup_id: str, files_count: int, 
                                   total_size: int):
        """Create manifest for incremental backup."""
        manifest = {
            'backup_id': backup_id,
            'backup_type': 'incremental',
            'base_backup_id': base_backup_id,
            'timestamp': datetime.now().isoformat(),
            'files_count': files_count,
            'total_size': total_size
        }
        
        manifest_file = backup_dir / 'manifest.json'
        with open(manifest_file, 'w') as f:
            json.dump(manifest, f, indent=2)
    
    def verify_backup(self, backup_id: str) -> bool:
        """
        Verify backup integrity by checking hashes.
        """
        self.logger.info(f"Verifying backup {backup_id}")
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT file_path, file_hash FROM file_records WHERE backup_id = ?
        ''', (backup_id,))
        
        files_to_verify = cursor.fetchall()
        conn.close()
        
        errors = 0
        for file_path, expected_hash in files_to_verify:
            current_hash = self.calculate_file_hash(Path(file_path))
            if current_hash != expected_hash:
                self.logger.error(f"Verification failed for {file_path}")
                errors += 1
        
        if errors == 0:
            self.logger.info("Backup verification completed successfully")
            return True
        else:
            self.logger.error(f"Backup verification failed with {errors} errors")
            return False
    
    def restore_backup(self, backup_id: str, restore_path: Path, 
                      file_pattern: Optional[str] = None):
        """
        Restore files from a backup.
        """
        self.logger.info(f"Starting restoration of backup {backup_id}")
        
        restore_path.mkdir(parents=True, exist_ok=True)
        
        # Find backup directory
        backup_dirs = [
            self.local_backup / 'full' / backup_id,
            self.local_backup / 'incremental' / backup_id,
            self.local_backup / 'differential' / backup_id
        ]
        
        backup_dir = None
        for d in backup_dirs:
            if d.exists():
                backup_dir = d
                break
        
        if not backup_dir:
            self.logger.error(f"Backup {backup_id} not found")
            return False
        
        # Restore files
        restored_count = 0
        for backup_file in backup_dir.rglob('*'):
            if not backup_file.is_file():
                continue
            
            # Skip if pattern specified and doesn't match
            if file_pattern and not backup_file.match(file_pattern):
                continue
            
            # Calculate restore path
            rel_path = backup_file.relative_to(backup_dir)
            
            # Remove compression/encryption extensions
            clean_name = str(rel_path)
            if clean_name.endswith('.gz'):
                clean_name = clean_name[:-3]
            if clean_name.endswith('.enc'):
                clean_name = clean_name[:-4]
            
            dest_file = restore_path / clean_name
            dest_file.parent.mkdir(parents=True, exist_ok=True)
            
            # Read and process backup file
            with open(backup_file, 'rb') as f:
                data = f.read()
            
            # Decompress if needed
            if str(backup_file).endswith('.gz'):
                import gzip
                data = gzip.decompress(data)
            
            # Decrypt if needed
            if str(backup_file).endswith('.enc'):
                if self.encryption_enabled:
                    data = self.cipher.decrypt(data)
                else:
                    self.logger.error(f"Cannot decrypt {backup_file}: encryption key not available")
                    continue
            
            # Write restored file
            with open(dest_file, 'wb') as f:
                f.write(data)
            
            restored_count += 1
            
            if restored_count % 100 == 0:
                self.logger.info(f"Restored {restored_count} files...")
        
        self.logger.info(f"Restoration complete: {restored_count} files restored")
        return True
    
    def cleanup_old_backups(self):
        """Remove old backups based on retention policy."""
        retention_days = self.config.get('retention_days', 30)
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        
        self.logger.info(f"Cleaning up backups older than {retention_days} days")
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Find old backups
        cursor.execute('''
            SELECT backup_id, destination_path 
            FROM backup_sets 
            WHERE end_time < ?
        ''', (cutoff_date,))
        
        old_backups = cursor.fetchall()
        
        for backup_id, dest_path in old_backups:
            # Remove backup directory
            if Path(dest_path).exists():
                shutil.rmtree(dest_path)
                self.logger.info(f"Removed old backup: {backup_id}")
            
            # Remove from database
            cursor.execute('DELETE FROM file_records WHERE backup_id = ?', (backup_id,))
            cursor.execute('DELETE FROM backup_sets WHERE backup_id = ?', (backup_id,))
        
        conn.commit()
        conn.close()
        
        self.logger.info(f"Cleaned up {len(old_backups)} old backups")
    
    def backup_to_cloud(self, backup_id: str):
        """
        Upload backup to cloud storage (AWS S3 example).
        """
        if not self.cloud_config.get('enabled'):
            return
        
        self.logger.info(f"Uploading backup {backup_id} to cloud")
        
        # Initialize S3 client
        s3 = boto3.client(
            's3',
            aws_access_key_id=self.cloud_config['aws_access_key'],
            aws_secret_access_key=self.cloud_config['aws_secret_key']
        )
        
        bucket = self.cloud_config['s3_bucket']
        
        # Find backup files
        backup_dir = self.local_backup / 'full' / backup_id
        if not backup_dir.exists():
            backup_dir = self.local_backup / 'incremental' / backup_id
        
        # Upload files
        for file_path in backup_dir.rglob('*'):
            if file_path.is_file():
                key = f"backups/{backup_id}/{file_path.relative_to(backup_dir)}"
                
                try:
                    s3.upload_file(str(file_path), bucket, key)
                    self.logger.info(f"Uploaded to S3: {key}")
                except Exception as e:
                    self.logger.error(f"Failed to upload {file_path}: {e}")
        
        self.logger.info("Cloud backup complete")
    
    def perform_backup(self, strategy: str = 'incremental'):
        """
        Main method to perform backup based on selected strategy.
        """
        backup_id = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        self.logger.info(f"Starting {strategy} backup with ID: {backup_id}")
        
        # Record backup start
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO backup_sets 
            (backup_id, backup_type, source_paths, start_time, status)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            backup_id,
            strategy,
            json.dumps([str(d) for d in self.source_dirs]),
            datetime.now(),
            'in_progress'
        ))
        conn.commit()
        
        total_files = 0
        total_size = 0
        
        # Perform backup for each source directory
        backup_func = self.strategies.get(strategy, self.incremental_backup)
        
        for source_dir in self.source_dirs:
            files, size = backup_func(source_dir, backup_id)
            total_files += files
            total_size += size
        
        # Update backup record
        cursor.execute('''
            UPDATE backup_sets 
            SET end_time = ?, total_files = ?, total_size = ?, status = ?
            WHERE backup_id = ?
        ''', (
            datetime.now(),
            total_files,
            total_size,
            'completed',
            backup_id
        ))
        conn.commit()
        conn.close()
        
        # Verify if configured
        if self.config.get('verify_after_backup', True):
            self.verify_backup(backup_id)
        
        # Upload to cloud if configured
        if self.cloud_config.get('enabled'):
            self.backup_to_cloud(backup_id)
        
        # Cleanup old backups
        self.cleanup_old_backups()
        
        self.logger.info(f"Backup complete: {total_files} files, {total_size / (1024*1024):.2f} MB")
        
        return backup_id

# Real-world usage example
if __name__ == "__main__":
    # Configure backup system
    config = {
        'local_backup_path': '/backup/automated',
        'network_backup_path': '//nas/backups',
        'backup_strategy': 'incremental',
        'compression': True,
        'encryption': {
            'enabled': True
        },
        'cloud': {
            'enabled': True,
            'aws_access_key': 'your_key',
            'aws_secret_key': 'your_secret',
            's3_bucket': 'my-backups'
        },
        'retention_days': 30,
        'max_versions': 10
    }
    
    # Create backup system
    backup_system = ComprehensiveBackupSystem(
        source_dirs=[
            '/home/user/Documents',
            '/home/user/Projects',
            '/home/user/Pictures'
        ],
        backup_config=config
    )
    
    # Perform backup
    backup_id = backup_system.perform_backup('incremental')
    
    print(f"āœ… Backup completed successfully: {backup_id}")

Specialized Backup Strategies šŸŽÆ

Different types of data require different backup strategies. Let's explore specialized approaches for various scenarios, from database backups to real-time synchronization!

graph LR A[Data Type] --> B{Classification} B --> C[Databases] B --> D[Media Files] B --> E[Code Projects] B --> F[System Config] C --> G[Dump & Compress] D --> H[Deduplicate & Archive] E --> I[Version Control] F --> J[Snapshot & Document] G --> K[Automated Restoration] H --> K I --> K J --> K
import subprocess
import pymongo  # For MongoDB
import psycopg2  # For PostgreSQL
import mysql.connector  # For MySQL
from git import Repo  # GitPython
import docker  # For Docker backups

class SpecializedBackupStrategies:
    """
    Specialized backup strategies for different types of data and systems.
    """
    
    def __init__(self, backup_base: Path):
        self.backup_base = Path(backup_base)
        self.backup_base.mkdir(parents=True, exist_ok=True)
        
    def backup_postgresql(self, connection_params: Dict, backup_name: str):
        """
        Backup PostgreSQL database with pg_dump.
        """
        backup_dir = self.backup_base / 'databases' / 'postgresql'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = backup_dir / f"{backup_name}_{timestamp}.sql.gz"
        
        # Build pg_dump command
        cmd = [
            'pg_dump',
            f"--host={connection_params['host']}",
            f"--port={connection_params.get('port', 5432)}",
            f"--username={connection_params['user']}",
            f"--dbname={connection_params['database']}",
            '--verbose',
            '--format=custom',
            '--compress=9',
            f"--file={backup_file}"
        ]
        
        # Set password via environment variable
        env = os.environ.copy()
        env['PGPASSWORD'] = connection_params['password']
        
        try:
            result = subprocess.run(cmd, env=env, capture_output=True, text=True)
            if result.returncode == 0:
                print(f"āœ… PostgreSQL backup successful: {backup_file.name}")
                
                # Verify backup
                verify_cmd = ['pg_restore', '--list', str(backup_file)]
                verify_result = subprocess.run(verify_cmd, capture_output=True)
                if verify_result.returncode == 0:
                    print("  āœ“ Backup verified")
                
                return backup_file
            else:
                print(f"āŒ PostgreSQL backup failed: {result.stderr}")
                return None
                
        except Exception as e:
            print(f"āŒ Error backing up PostgreSQL: {e}")
            return None
    
    def backup_mysql(self, connection_params: Dict, backup_name: str):
        """
        Backup MySQL database with mysqldump.
        """
        backup_dir = self.backup_base / 'databases' / 'mysql'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = backup_dir / f"{backup_name}_{timestamp}.sql"
        
        try:
            # Connect to MySQL
            connection = mysql.connector.connect(
                host=connection_params['host'],
                user=connection_params['user'],
                password=connection_params['password'],
                database=connection_params['database']
            )
            
            # Build mysqldump command
            cmd = [
                'mysqldump',
                f"--host={connection_params['host']}",
                f"--user={connection_params['user']}",
                f"--password={connection_params['password']}",
                '--single-transaction',
                '--routines',
                '--triggers',
                '--events',
                connection_params['database']
            ]
            
            # Execute dump
            with open(backup_file, 'w') as f:
                result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
            
            if result.returncode == 0:
                # Compress the backup
                import gzip
                with open(backup_file, 'rb') as f_in:
                    with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                
                backup_file.unlink()  # Remove uncompressed file
                print(f"āœ… MySQL backup successful: {backup_file.name}.gz")
                return f"{backup_file}.gz"
            else:
                print(f"āŒ MySQL backup failed: {result.stderr}")
                return None
                
        except Exception as e:
            print(f"āŒ Error backing up MySQL: {e}")
            return None
    
    def backup_mongodb(self, connection_params: Dict, backup_name: str):
        """
        Backup MongoDB database.
        """
        backup_dir = self.backup_base / 'databases' / 'mongodb'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = backup_dir / f"{backup_name}_{timestamp}"
        
        try:
            # Connect to MongoDB
            client = pymongo.MongoClient(
                host=connection_params['host'],
                port=connection_params.get('port', 27017),
                username=connection_params.get('user'),
                password=connection_params.get('password')
            )
            
            # Use mongodump
            cmd = [
                'mongodump',
                '--host', f"{connection_params['host']}:{connection_params.get('port', 27017)}",
                '--db', connection_params['database'],
                '--out', str(backup_path)
            ]
            
            if 'user' in connection_params:
                cmd.extend(['--username', connection_params['user']])
                cmd.extend(['--password', connection_params['password']])
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                # Create tar archive
                tar_file = f"{backup_path}.tar.gz"
                with tarfile.open(tar_file, 'w:gz') as tar:
                    tar.add(backup_path, arcname=backup_path.name)
                
                # Remove uncompressed directory
                shutil.rmtree(backup_path)
                
                print(f"āœ… MongoDB backup successful: {backup_path.name}.tar.gz")
                return tar_file
            else:
                print(f"āŒ MongoDB backup failed: {result.stderr}")
                return None
                
        except Exception as e:
            print(f"āŒ Error backing up MongoDB: {e}")
            return None
    
    def backup_git_repositories(self, repo_paths: List[Path]):
        """
        Backup Git repositories with full history.
        """
        backup_dir = self.backup_base / 'repositories'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        for repo_path in repo_paths:
            try:
                repo = Repo(repo_path)
                
                # Check for uncommitted changes
                if repo.is_dirty():
                    print(f"āš ļø Repository {repo_path.name} has uncommitted changes")
                
                # Create bundle (includes all branches and tags)
                bundle_name = f"{repo_path.name}_{timestamp}.bundle"
                bundle_path = backup_dir / bundle_name
                
                repo.git.bundle('create', str(bundle_path), '--all')
                
                # Compress bundle
                with open(bundle_path, 'rb') as f_in:
                    with gzip.open(f"{bundle_path}.gz", 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                
                bundle_path.unlink()
                
                print(f"āœ… Git repository backed up: {bundle_name}.gz")
                
                # Also backup working directory if dirty
                if repo.is_dirty():
                    working_backup = backup_dir / f"{repo_path.name}_{timestamp}_working.tar.gz"
                    with tarfile.open(working_backup, 'w:gz') as tar:
                        tar.add(repo_path, arcname=repo_path.name,
                               filter=lambda x: x if '.git' not in x.name else None)
                    print(f"  šŸ“ Working directory backed up: {working_backup.name}")
                    
            except Exception as e:
                print(f"āŒ Error backing up repository {repo_path}: {e}")
    
    def backup_docker_containers(self):
        """
        Backup Docker containers and volumes.
        """
        backup_dir = self.backup_base / 'docker'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        try:
            client = docker.from_env()
            
            # Backup running containers
            for container in client.containers.list():
                try:
                    # Commit container to image
                    image_name = f"backup_{container.name}_{timestamp}"
                    container.commit(repository=image_name)
                    
                    # Export image
                    image_file = backup_dir / f"{container.name}_{timestamp}.tar"
                    with open(image_file, 'wb') as f:
                        for chunk in client.images.get(image_name).save():
                            f.write(chunk)
                    
                    # Compress
                    with open(image_file, 'rb') as f_in:
                        with gzip.open(f"{image_file}.gz", 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    
                    image_file.unlink()
                    
                    print(f"āœ… Docker container backed up: {container.name}")
                    
                    # Remove temporary image
                    client.images.remove(image_name)
                    
                except Exception as e:
                    print(f"āŒ Error backing up container {container.name}: {e}")
            
            # Backup volumes
            for volume in client.volumes.list():
                try:
                    volume_backup = backup_dir / f"volume_{volume.name}_{timestamp}.tar.gz"
                    
                    # Create temporary container to access volume
                    temp_container = client.containers.run(
                        'alpine',
                        'tar czf /backup.tar.gz /data',
                        volumes={volume.name: {'bind': '/data', 'mode': 'ro'}},
                        detach=True,
                        remove=False
                    )
                    
                    # Wait for completion
                    temp_container.wait()
                    
                    # Copy backup from container
                    with open(volume_backup, 'wb') as f:
                        bits, _ = temp_container.get_archive('/backup.tar.gz')
                        for chunk in bits:
                            f.write(chunk)
                    
                    # Remove temporary container
                    temp_container.remove()
                    
                    print(f"āœ… Docker volume backed up: {volume.name}")
                    
                except Exception as e:
                    print(f"āŒ Error backing up volume {volume.name}: {e}")
                    
        except Exception as e:
            print(f"āŒ Error connecting to Docker: {e}")
    
    def backup_system_configuration(self):
        """
        Backup important system configuration files.
        """
        backup_dir = self.backup_base / 'system_config'
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Important config locations (Linux example)
        config_locations = [
            '/etc/fstab',
            '/etc/hosts',
            '/etc/hostname',
            '/etc/network/interfaces',
            '/etc/crontab',
            '/etc/ssh/sshd_config',
            '/etc/apache2',
            '/etc/nginx',
            '/home/*/.bashrc',
            '/home/*/.ssh/config'
        ]
        
        backup_archive = backup_dir / f"system_config_{timestamp}.tar.gz"
        
        with tarfile.open(backup_archive, 'w:gz') as tar:
            for pattern in config_locations:
                for path in Path('/').glob(pattern.lstrip('/')):
                    if path.exists():
                        try:
                            tar.add(path, arcname=str(path))
                            print(f"  šŸ“„ Added: {path}")
                        except PermissionError:
                            print(f"  āš ļø Permission denied: {path}")
        
        print(f"āœ… System configuration backed up: {backup_archive.name}")
        return backup_archive

class RealTimeBackupSync:
    """
    Real-time backup synchronization using file system monitoring.
    """
    
    def __init__(self, source: Path, destination: Path):
        self.source = Path(source)
        self.destination = Path(destination)
        self.destination.mkdir(parents=True, exist_ok=True)
        
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler
        
        self.observer = Observer()
        self.event_handler = self.BackupEventHandler(self)
    
    class BackupEventHandler(FileSystemEventHandler):
        def __init__(self, parent):
            self.parent = parent
            
        def on_created(self, event):
            if not event.is_directory:
                self.parent.backup_file(Path(event.src_path))
        
        def on_modified(self, event):
            if not event.is_directory:
                self.parent.backup_file(Path(event.src_path))
        
        def on_deleted(self, event):
            if not event.is_directory:
                self.parent.remove_backup(Path(event.src_path))
    
    def backup_file(self, source_file: Path):
        """Backup a single file in real-time."""
        rel_path = source_file.relative_to(self.source)
        dest_file = self.destination / rel_path
        
        dest_file.parent.mkdir(parents=True, exist_ok=True)
        
        try:
            shutil.copy2(source_file, dest_file)
            print(f"  ↻ Synced: {rel_path}")
        except Exception as e:
            print(f"  āŒ Sync failed for {rel_path}: {e}")
    
    def remove_backup(self, source_file: Path):
        """Remove file from backup when deleted from source."""
        rel_path = source_file.relative_to(self.source)
        dest_file = self.destination / rel_path
        
        if dest_file.exists():
            dest_file.unlink()
            print(f"  šŸ—‘ļø Removed from backup: {rel_path}")
    
    def start(self):
        """Start real-time synchronization."""
        print(f"šŸ”„ Starting real-time backup sync")
        print(f"  Source: {self.source}")
        print(f"  Destination: {self.destination}")
        
        # Initial sync
        self.initial_sync()
        
        # Start monitoring
        self.observer.schedule(
            self.event_handler,
            str(self.source),
            recursive=True
        )
        self.observer.start()
        
        print("āœ… Real-time sync active (press Ctrl+C to stop)")
        
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.observer.stop()
            print("\nā¹ļø Real-time sync stopped")
        
        self.observer.join()
    
    def initial_sync(self):
        """Perform initial synchronization."""
        print("  Performing initial sync...")
        
        for source_file in self.source.rglob('*'):
            if source_file.is_file():
                self.backup_file(source_file)
        
        print("  Initial sync complete")

# Usage examples
if __name__ == "__main__":
    # Database backups
    db_backup = SpecializedBackupStrategies(Path("/backup/specialized"))
    
    # Backup PostgreSQL
    db_backup.backup_postgresql({
        'host': 'localhost',
        'user': 'dbuser',
        'password': 'dbpass',
        'database': 'production'
    }, 'production_db')
    
    # Backup Git repositories
    db_backup.backup_git_repositories([
        Path("/home/user/projects/website"),
        Path("/home/user/projects/api")
    ])
    
    # Real-time sync for critical folders
    realtime = RealTimeBackupSync(
        source="/home/user/critical_documents",
        destination="/backup/realtime/documents"
    )
    realtime.start()

Key Takeaways and Best Practices šŸŽÆ

The Backup Commandments šŸ“œ

Pro Tip: Remember the backup paradox: The value of backups is only realized when you need them, but by then it's too late to create them. Start your backup automation today, test it tomorrow, and sleep peacefully knowing your data is safe!

Backup automation is your insurance policy against digital disasters. Whether it's hardware failure, ransomware, or human error, a well-designed backup system ensures you can always recover. The scripts and strategies we've covered give you enterprise-level data protection with Python's power and flexibility! šŸš€