š¾ Backup Automation: Never Lose Important Files Again
Imagine this: Your hard drive crashes, your laptop gets stolen, or you accidentally delete that crucial project you've been working on for months. Without backups, it's gone forever. But with automated backup systems, you're invincible! Think of backups as time travel for your files ā you can always go back! ā°
The 3-2-1 Backup Philosophy
The golden rule of backups: Keep 3 copies of important data, on 2 different media types, with 1 copy off-site. It's like having a safety net under your safety net, with another safety net in a different building! Let's build systems that implement this automatically.
Real-World Scenario: The Complete Disaster Recovery System š”ļø
You're a freelance designer with years of client work, a developer with countless projects, or a photographer with irreplaceable photos. Let's build a backup system that protects everything automatically, efficiently, and intelligently!
import os
import shutil
import hashlib
import json
import sqlite3
import zipfile
import tarfile
import boto3 # For AWS S3
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set, Tuple
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
import logging
class ComprehensiveBackupSystem:
"""
A complete backup solution with local, network, and cloud backups,
incremental backups, version control, and automatic restoration.
"""
def __init__(self, source_dirs: List[str], backup_config: Dict):
self.source_dirs = [Path(d) for d in source_dirs]
self.config = self.load_config(backup_config)
# Setup backup destinations
self.local_backup = Path(self.config['local_backup_path'])
self.network_backup = Path(self.config.get('network_backup_path', ''))
self.cloud_config = self.config.get('cloud', {})
# Create backup directories
self.local_backup.mkdir(parents=True, exist_ok=True)
# Setup database for tracking
self.db_path = self.local_backup / 'backup_catalog.db'
self.init_database()
# Setup logging
self.setup_logging()
# Backup strategies
self.strategies = {
'full': self.full_backup,
'incremental': self.incremental_backup,
'differential': self.differential_backup,
'mirror': self.mirror_backup
}
# File filters
self.include_patterns = self.config.get('include_patterns', ['*'])
self.exclude_patterns = self.config.get('exclude_patterns', [
'*.tmp', '*.temp', '~*', '.DS_Store', 'Thumbs.db',
'__pycache__', 'node_modules', '.git/objects'
])
# Performance settings
self.chunk_size = self.config.get('chunk_size', 65536)
self.compression_level = self.config.get('compression_level', 6)
self.parallel_workers = self.config.get('parallel_workers', 4)
# Encryption settings (if enabled)
self.encryption_enabled = self.config.get('encryption', {}).get('enabled', False)
if self.encryption_enabled:
from cryptography.fernet import Fernet
key = self.config['encryption'].get('key')
if not key:
key = Fernet.generate_key()
self.config['encryption']['key'] = key.decode()
self.save_config()
self.cipher = Fernet(key.encode() if isinstance(key, str) else key)
def load_config(self, config_dict: Dict) -> Dict:
"""Load and validate backup configuration."""
default_config = {
'local_backup_path': '/backup/local',
'backup_strategy': 'incremental',
'retention_days': 30,
'max_versions': 10,
'compression': True,
'verify_after_backup': True,
'backup_schedule': {
'full_backup_interval_days': 7,
'incremental_interval_hours': 6
}
}
# Merge with provided config
for key, value in config_dict.items():
default_config[key] = value
return default_config
def save_config(self):
"""Save configuration to file."""
config_file = self.local_backup / 'backup_config.json'
with open(config_file, 'w') as f:
json.dump(self.config, f, indent=2)
def setup_logging(self):
"""Setup comprehensive logging."""
log_dir = self.local_backup / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"backup_{datetime.now().strftime('%Y%m%d')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def init_database(self):
"""Initialize SQLite database for backup tracking."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Backup sets table
cursor.execute('''
CREATE TABLE IF NOT EXISTS backup_sets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backup_id TEXT UNIQUE,
backup_type TEXT,
source_paths TEXT,
destination_path TEXT,
start_time DATETIME,
end_time DATETIME,
total_files INTEGER,
total_size INTEGER,
status TEXT,
error_message TEXT
)
''')
# File records table
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backup_id TEXT,
file_path TEXT,
file_hash TEXT,
file_size INTEGER,
modified_time DATETIME,
backed_up BOOLEAN,
FOREIGN KEY (backup_id) REFERENCES backup_sets(backup_id)
)
''')
# Version history table
cursor.execute('''
CREATE TABLE IF NOT EXISTS version_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT,
version_number INTEGER,
backup_id TEXT,
file_hash TEXT,
backup_time DATETIME,
UNIQUE(file_path, version_number)
)
''')
conn.commit()
conn.close()
def calculate_file_hash(self, file_path: Path) -> str:
"""Calculate SHA256 hash of a file."""
sha256 = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(self.chunk_size), b''):
sha256.update(chunk)
return sha256.hexdigest()
except Exception as e:
self.logger.error(f"Error hashing {file_path}: {e}")
return ""
def should_backup_file(self, file_path: Path) -> bool:
"""Determine if a file should be backed up based on filters."""
# Check exclude patterns
for pattern in self.exclude_patterns:
if file_path.match(pattern):
return False
# Check include patterns
if self.include_patterns != ['*']:
included = False
for pattern in self.include_patterns:
if file_path.match(pattern):
included = True
break
if not included:
return False
# Check file size limits
max_size = self.config.get('max_file_size_mb', 1000) * 1024 * 1024
if file_path.stat().st_size > max_size:
self.logger.warning(f"Skipping {file_path.name}: exceeds size limit")
return False
return True
def full_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
"""
Perform a full backup of all files.
Returns (files_backed_up, total_size)
"""
self.logger.info(f"Starting full backup of {source_dir}")
backup_dir = self.local_backup / 'full' / backup_id
backup_dir.mkdir(parents=True, exist_ok=True)
files_backed_up = 0
total_size = 0
errors = []
# Collect all files to backup
files_to_backup = []
for file_path in source_dir.rglob('*'):
if file_path.is_file() and self.should_backup_file(file_path):
files_to_backup.append(file_path)
# Backup files with progress tracking
total_files = len(files_to_backup)
self.logger.info(f"Found {total_files} files to backup")
with ThreadPoolExecutor(max_workers=self.parallel_workers) as executor:
futures = []
for file_path in files_to_backup:
future = executor.submit(
self.backup_single_file,
file_path, source_dir, backup_dir, backup_id
)
futures.append((file_path, future))
for file_path, future in futures:
try:
success, file_size = future.result()
if success:
files_backed_up += 1
total_size += file_size
# Progress update
if files_backed_up % 100 == 0:
progress = (files_backed_up / total_files) * 100
self.logger.info(f"Progress: {progress:.1f}% ({files_backed_up}/{total_files})")
except Exception as e:
errors.append((file_path, str(e)))
self.logger.error(f"Error backing up {file_path}: {e}")
# Create backup manifest
self.create_manifest(backup_dir, backup_id, files_backed_up, total_size)
# Report errors if any
if errors:
self.logger.warning(f"Backup completed with {len(errors)} errors")
for file_path, error in errors[:10]: # Show first 10 errors
self.logger.error(f" {file_path}: {error}")
return files_backed_up, total_size
def backup_single_file(self, file_path: Path, source_base: Path,
backup_dir: Path, backup_id: str) -> Tuple[bool, int]:
"""
Backup a single file with compression and encryption if enabled.
Returns (success, file_size)
"""
try:
# Calculate relative path
rel_path = file_path.relative_to(source_base)
dest_path = backup_dir / rel_path
# Create destination directory
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Read file
with open(file_path, 'rb') as f:
data = f.read()
file_size = len(data)
# Encrypt if enabled
if self.encryption_enabled:
data = self.cipher.encrypt(data)
dest_path = dest_path.with_suffix(dest_path.suffix + '.enc')
# Compress if enabled
if self.config.get('compression', True):
import gzip
data = gzip.compress(data, compresslevel=self.compression_level)
dest_path = dest_path.with_suffix(dest_path.suffix + '.gz')
# Write backup file
with open(dest_path, 'wb') as f:
f.write(data)
# Update database
self.update_file_record(backup_id, file_path, file_size)
return True, file_size
except Exception as e:
self.logger.error(f"Failed to backup {file_path}: {e}")
return False, 0
def incremental_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
"""
Perform incremental backup - only backup changed files since last backup.
"""
self.logger.info(f"Starting incremental backup of {source_dir}")
# Get last backup info
last_backup = self.get_last_backup_info(source_dir)
if not last_backup:
self.logger.info("No previous backup found, performing full backup")
return self.full_backup(source_dir, backup_id)
backup_dir = self.local_backup / 'incremental' / backup_id
backup_dir.mkdir(parents=True, exist_ok=True)
files_backed_up = 0
total_size = 0
# Find changed files
changed_files = self.find_changed_files(source_dir, last_backup['backup_id'])
self.logger.info(f"Found {len(changed_files)} changed files")
# Backup changed files
for file_path in changed_files:
success, file_size = self.backup_single_file(
file_path, source_dir, backup_dir, backup_id
)
if success:
files_backed_up += 1
total_size += file_size
# Create incremental manifest
self.create_incremental_manifest(
backup_dir, backup_id, last_backup['backup_id'],
files_backed_up, total_size
)
return files_backed_up, total_size
def differential_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
"""
Perform differential backup - backup all changes since last full backup.
"""
self.logger.info(f"Starting differential backup of {source_dir}")
# Get last full backup info
last_full_backup = self.get_last_full_backup_info(source_dir)
if not last_full_backup:
self.logger.info("No previous full backup found, performing full backup")
return self.full_backup(source_dir, backup_id)
backup_dir = self.local_backup / 'differential' / backup_id
backup_dir.mkdir(parents=True, exist_ok=True)
files_backed_up = 0
total_size = 0
# Find all changes since last full backup
changed_files = self.find_changed_files(
source_dir, last_full_backup['backup_id']
)
self.logger.info(f"Found {len(changed_files)} changed files since last full backup")
# Backup changed files
for file_path in changed_files:
success, file_size = self.backup_single_file(
file_path, source_dir, backup_dir, backup_id
)
if success:
files_backed_up += 1
total_size += file_size
return files_backed_up, total_size
def mirror_backup(self, source_dir: Path, backup_id: str) -> Tuple[int, int]:
"""
Create an exact mirror of the source directory.
Removes files from backup that no longer exist in source.
"""
self.logger.info(f"Starting mirror backup of {source_dir}")
backup_dir = self.local_backup / 'mirror' / source_dir.name
backup_dir.mkdir(parents=True, exist_ok=True)
files_backed_up = 0
total_size = 0
files_removed = 0
# Get all source files
source_files = {}
for file_path in source_dir.rglob('*'):
if file_path.is_file() and self.should_backup_file(file_path):
rel_path = file_path.relative_to(source_dir)
source_files[str(rel_path)] = file_path
# Get all backup files
backup_files = {}
for file_path in backup_dir.rglob('*'):
if file_path.is_file():
rel_path = file_path.relative_to(backup_dir)
# Remove compression/encryption extensions for comparison
clean_path = str(rel_path).replace('.gz', '').replace('.enc', '')
backup_files[clean_path] = file_path
# Remove files that no longer exist in source
for rel_path, backup_path in backup_files.items():
if rel_path not in source_files:
backup_path.unlink()
files_removed += 1
self.logger.info(f"Removed from mirror: {rel_path}")
# Update changed files
for rel_path, source_path in source_files.items():
backup_path = backup_dir / rel_path
# Check if backup exists and is up to date
needs_backup = True
if backup_path.exists() or backup_path.with_suffix('.gz').exists():
source_mtime = source_path.stat().st_mtime
backup_mtime = (backup_path if backup_path.exists()
else backup_path.with_suffix('.gz')).stat().st_mtime
if source_mtime <= backup_mtime:
needs_backup = False
if needs_backup:
success, file_size = self.backup_single_file(
source_path, source_dir, backup_dir, backup_id
)
if success:
files_backed_up += 1
total_size += file_size
self.logger.info(f"Mirror complete: {files_backed_up} updated, {files_removed} removed")
return files_backed_up, total_size
def find_changed_files(self, source_dir: Path, since_backup_id: str) -> List[Path]:
"""Find files that have changed since a specific backup."""
changed_files = []
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get file records from last backup
cursor.execute('''
SELECT file_path, file_hash, modified_time
FROM file_records
WHERE backup_id = ?
''', (since_backup_id,))
last_backup_files = {}
for row in cursor.fetchall():
last_backup_files[row[0]] = {
'hash': row[1],
'modified': datetime.fromisoformat(row[2])
}
# Check current files
for file_path in source_dir.rglob('*'):
if not file_path.is_file() or not self.should_backup_file(file_path):
continue
str_path = str(file_path)
# New file
if str_path not in last_backup_files:
changed_files.append(file_path)
continue
# Check if modified
current_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if current_mtime > last_backup_files[str_path]['modified']:
# Verify actual change with hash
current_hash = self.calculate_file_hash(file_path)
if current_hash != last_backup_files[str_path]['hash']:
changed_files.append(file_path)
conn.close()
return changed_files
def update_file_record(self, backup_id: str, file_path: Path, file_size: int):
"""Update file record in database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
file_hash = self.calculate_file_hash(file_path)
cursor.execute('''
INSERT INTO file_records
(backup_id, file_path, file_hash, file_size, modified_time, backed_up)
VALUES (?, ?, ?, ?, ?, ?)
''', (
backup_id,
str(file_path),
file_hash,
file_size,
datetime.fromtimestamp(file_path.stat().st_mtime),
True
))
# Update version history
cursor.execute('''
SELECT MAX(version_number) FROM version_history WHERE file_path = ?
''', (str(file_path),))
result = cursor.fetchone()
version = (result[0] + 1) if result[0] else 1
cursor.execute('''
INSERT INTO version_history
(file_path, version_number, backup_id, file_hash, backup_time)
VALUES (?, ?, ?, ?, ?)
''', (
str(file_path),
version,
backup_id,
file_hash,
datetime.now()
))
conn.commit()
conn.close()
def get_last_backup_info(self, source_dir: Path) -> Optional[Dict]:
"""Get information about the last backup for a source directory."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT backup_id, backup_type, end_time, total_files, total_size
FROM backup_sets
WHERE source_paths LIKE ? AND status = 'completed'
ORDER BY end_time DESC
LIMIT 1
''', (f'%{str(source_dir)}%',))
result = cursor.fetchone()
conn.close()
if result:
return {
'backup_id': result[0],
'backup_type': result[1],
'end_time': result[2],
'total_files': result[3],
'total_size': result[4]
}
return None
def get_last_full_backup_info(self, source_dir: Path) -> Optional[Dict]:
"""Get information about the last full backup."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT backup_id, end_time, total_files, total_size
FROM backup_sets
WHERE source_paths LIKE ? AND backup_type = 'full' AND status = 'completed'
ORDER BY end_time DESC
LIMIT 1
''', (f'%{str(source_dir)}%',))
result = cursor.fetchone()
conn.close()
if result:
return {
'backup_id': result[0],
'end_time': result[1],
'total_files': result[2],
'total_size': result[3]
}
return None
def create_manifest(self, backup_dir: Path, backup_id: str,
files_count: int, total_size: int):
"""Create a backup manifest file with all backup information."""
manifest = {
'backup_id': backup_id,
'timestamp': datetime.now().isoformat(),
'files_count': files_count,
'total_size': total_size,
'compression': self.config.get('compression', True),
'encryption': self.encryption_enabled,
'source_dirs': [str(d) for d in self.source_dirs]
}
manifest_file = backup_dir / 'manifest.json'
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
def create_incremental_manifest(self, backup_dir: Path, backup_id: str,
base_backup_id: str, files_count: int,
total_size: int):
"""Create manifest for incremental backup."""
manifest = {
'backup_id': backup_id,
'backup_type': 'incremental',
'base_backup_id': base_backup_id,
'timestamp': datetime.now().isoformat(),
'files_count': files_count,
'total_size': total_size
}
manifest_file = backup_dir / 'manifest.json'
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
def verify_backup(self, backup_id: str) -> bool:
"""
Verify backup integrity by checking hashes.
"""
self.logger.info(f"Verifying backup {backup_id}")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT file_path, file_hash FROM file_records WHERE backup_id = ?
''', (backup_id,))
files_to_verify = cursor.fetchall()
conn.close()
errors = 0
for file_path, expected_hash in files_to_verify:
current_hash = self.calculate_file_hash(Path(file_path))
if current_hash != expected_hash:
self.logger.error(f"Verification failed for {file_path}")
errors += 1
if errors == 0:
self.logger.info("Backup verification completed successfully")
return True
else:
self.logger.error(f"Backup verification failed with {errors} errors")
return False
def restore_backup(self, backup_id: str, restore_path: Path,
file_pattern: Optional[str] = None):
"""
Restore files from a backup.
"""
self.logger.info(f"Starting restoration of backup {backup_id}")
restore_path.mkdir(parents=True, exist_ok=True)
# Find backup directory
backup_dirs = [
self.local_backup / 'full' / backup_id,
self.local_backup / 'incremental' / backup_id,
self.local_backup / 'differential' / backup_id
]
backup_dir = None
for d in backup_dirs:
if d.exists():
backup_dir = d
break
if not backup_dir:
self.logger.error(f"Backup {backup_id} not found")
return False
# Restore files
restored_count = 0
for backup_file in backup_dir.rglob('*'):
if not backup_file.is_file():
continue
# Skip if pattern specified and doesn't match
if file_pattern and not backup_file.match(file_pattern):
continue
# Calculate restore path
rel_path = backup_file.relative_to(backup_dir)
# Remove compression/encryption extensions
clean_name = str(rel_path)
if clean_name.endswith('.gz'):
clean_name = clean_name[:-3]
if clean_name.endswith('.enc'):
clean_name = clean_name[:-4]
dest_file = restore_path / clean_name
dest_file.parent.mkdir(parents=True, exist_ok=True)
# Read and process backup file
with open(backup_file, 'rb') as f:
data = f.read()
# Decompress if needed
if str(backup_file).endswith('.gz'):
import gzip
data = gzip.decompress(data)
# Decrypt if needed
if str(backup_file).endswith('.enc'):
if self.encryption_enabled:
data = self.cipher.decrypt(data)
else:
self.logger.error(f"Cannot decrypt {backup_file}: encryption key not available")
continue
# Write restored file
with open(dest_file, 'wb') as f:
f.write(data)
restored_count += 1
if restored_count % 100 == 0:
self.logger.info(f"Restored {restored_count} files...")
self.logger.info(f"Restoration complete: {restored_count} files restored")
return True
def cleanup_old_backups(self):
"""Remove old backups based on retention policy."""
retention_days = self.config.get('retention_days', 30)
cutoff_date = datetime.now() - timedelta(days=retention_days)
self.logger.info(f"Cleaning up backups older than {retention_days} days")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Find old backups
cursor.execute('''
SELECT backup_id, destination_path
FROM backup_sets
WHERE end_time < ?
''', (cutoff_date,))
old_backups = cursor.fetchall()
for backup_id, dest_path in old_backups:
# Remove backup directory
if Path(dest_path).exists():
shutil.rmtree(dest_path)
self.logger.info(f"Removed old backup: {backup_id}")
# Remove from database
cursor.execute('DELETE FROM file_records WHERE backup_id = ?', (backup_id,))
cursor.execute('DELETE FROM backup_sets WHERE backup_id = ?', (backup_id,))
conn.commit()
conn.close()
self.logger.info(f"Cleaned up {len(old_backups)} old backups")
def backup_to_cloud(self, backup_id: str):
"""
Upload backup to cloud storage (AWS S3 example).
"""
if not self.cloud_config.get('enabled'):
return
self.logger.info(f"Uploading backup {backup_id} to cloud")
# Initialize S3 client
s3 = boto3.client(
's3',
aws_access_key_id=self.cloud_config['aws_access_key'],
aws_secret_access_key=self.cloud_config['aws_secret_key']
)
bucket = self.cloud_config['s3_bucket']
# Find backup files
backup_dir = self.local_backup / 'full' / backup_id
if not backup_dir.exists():
backup_dir = self.local_backup / 'incremental' / backup_id
# Upload files
for file_path in backup_dir.rglob('*'):
if file_path.is_file():
key = f"backups/{backup_id}/{file_path.relative_to(backup_dir)}"
try:
s3.upload_file(str(file_path), bucket, key)
self.logger.info(f"Uploaded to S3: {key}")
except Exception as e:
self.logger.error(f"Failed to upload {file_path}: {e}")
self.logger.info("Cloud backup complete")
def perform_backup(self, strategy: str = 'incremental'):
"""
Main method to perform backup based on selected strategy.
"""
backup_id = datetime.now().strftime('%Y%m%d_%H%M%S')
self.logger.info(f"Starting {strategy} backup with ID: {backup_id}")
# Record backup start
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO backup_sets
(backup_id, backup_type, source_paths, start_time, status)
VALUES (?, ?, ?, ?, ?)
''', (
backup_id,
strategy,
json.dumps([str(d) for d in self.source_dirs]),
datetime.now(),
'in_progress'
))
conn.commit()
total_files = 0
total_size = 0
# Perform backup for each source directory
backup_func = self.strategies.get(strategy, self.incremental_backup)
for source_dir in self.source_dirs:
files, size = backup_func(source_dir, backup_id)
total_files += files
total_size += size
# Update backup record
cursor.execute('''
UPDATE backup_sets
SET end_time = ?, total_files = ?, total_size = ?, status = ?
WHERE backup_id = ?
''', (
datetime.now(),
total_files,
total_size,
'completed',
backup_id
))
conn.commit()
conn.close()
# Verify if configured
if self.config.get('verify_after_backup', True):
self.verify_backup(backup_id)
# Upload to cloud if configured
if self.cloud_config.get('enabled'):
self.backup_to_cloud(backup_id)
# Cleanup old backups
self.cleanup_old_backups()
self.logger.info(f"Backup complete: {total_files} files, {total_size / (1024*1024):.2f} MB")
return backup_id
# Real-world usage example
if __name__ == "__main__":
# Configure backup system
config = {
'local_backup_path': '/backup/automated',
'network_backup_path': '//nas/backups',
'backup_strategy': 'incremental',
'compression': True,
'encryption': {
'enabled': True
},
'cloud': {
'enabled': True,
'aws_access_key': 'your_key',
'aws_secret_key': 'your_secret',
's3_bucket': 'my-backups'
},
'retention_days': 30,
'max_versions': 10
}
# Create backup system
backup_system = ComprehensiveBackupSystem(
source_dirs=[
'/home/user/Documents',
'/home/user/Projects',
'/home/user/Pictures'
],
backup_config=config
)
# Perform backup
backup_id = backup_system.perform_backup('incremental')
print(f"ā
Backup completed successfully: {backup_id}")
Specialized Backup Strategies šÆ
Different types of data require different backup strategies. Let's explore specialized approaches for various scenarios, from database backups to real-time synchronization!
import subprocess
import pymongo # For MongoDB
import psycopg2 # For PostgreSQL
import mysql.connector # For MySQL
from git import Repo # GitPython
import docker # For Docker backups
class SpecializedBackupStrategies:
"""
Specialized backup strategies for different types of data and systems.
"""
def __init__(self, backup_base: Path):
self.backup_base = Path(backup_base)
self.backup_base.mkdir(parents=True, exist_ok=True)
def backup_postgresql(self, connection_params: Dict, backup_name: str):
"""
Backup PostgreSQL database with pg_dump.
"""
backup_dir = self.backup_base / 'databases' / 'postgresql'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = backup_dir / f"{backup_name}_{timestamp}.sql.gz"
# Build pg_dump command
cmd = [
'pg_dump',
f"--host={connection_params['host']}",
f"--port={connection_params.get('port', 5432)}",
f"--username={connection_params['user']}",
f"--dbname={connection_params['database']}",
'--verbose',
'--format=custom',
'--compress=9',
f"--file={backup_file}"
]
# Set password via environment variable
env = os.environ.copy()
env['PGPASSWORD'] = connection_params['password']
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode == 0:
print(f"ā
PostgreSQL backup successful: {backup_file.name}")
# Verify backup
verify_cmd = ['pg_restore', '--list', str(backup_file)]
verify_result = subprocess.run(verify_cmd, capture_output=True)
if verify_result.returncode == 0:
print(" ā Backup verified")
return backup_file
else:
print(f"ā PostgreSQL backup failed: {result.stderr}")
return None
except Exception as e:
print(f"ā Error backing up PostgreSQL: {e}")
return None
def backup_mysql(self, connection_params: Dict, backup_name: str):
"""
Backup MySQL database with mysqldump.
"""
backup_dir = self.backup_base / 'databases' / 'mysql'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = backup_dir / f"{backup_name}_{timestamp}.sql"
try:
# Connect to MySQL
connection = mysql.connector.connect(
host=connection_params['host'],
user=connection_params['user'],
password=connection_params['password'],
database=connection_params['database']
)
# Build mysqldump command
cmd = [
'mysqldump',
f"--host={connection_params['host']}",
f"--user={connection_params['user']}",
f"--password={connection_params['password']}",
'--single-transaction',
'--routines',
'--triggers',
'--events',
connection_params['database']
]
# Execute dump
with open(backup_file, 'w') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
# Compress the backup
import gzip
with open(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
backup_file.unlink() # Remove uncompressed file
print(f"ā
MySQL backup successful: {backup_file.name}.gz")
return f"{backup_file}.gz"
else:
print(f"ā MySQL backup failed: {result.stderr}")
return None
except Exception as e:
print(f"ā Error backing up MySQL: {e}")
return None
def backup_mongodb(self, connection_params: Dict, backup_name: str):
"""
Backup MongoDB database.
"""
backup_dir = self.backup_base / 'databases' / 'mongodb'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = backup_dir / f"{backup_name}_{timestamp}"
try:
# Connect to MongoDB
client = pymongo.MongoClient(
host=connection_params['host'],
port=connection_params.get('port', 27017),
username=connection_params.get('user'),
password=connection_params.get('password')
)
# Use mongodump
cmd = [
'mongodump',
'--host', f"{connection_params['host']}:{connection_params.get('port', 27017)}",
'--db', connection_params['database'],
'--out', str(backup_path)
]
if 'user' in connection_params:
cmd.extend(['--username', connection_params['user']])
cmd.extend(['--password', connection_params['password']])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Create tar archive
tar_file = f"{backup_path}.tar.gz"
with tarfile.open(tar_file, 'w:gz') as tar:
tar.add(backup_path, arcname=backup_path.name)
# Remove uncompressed directory
shutil.rmtree(backup_path)
print(f"ā
MongoDB backup successful: {backup_path.name}.tar.gz")
return tar_file
else:
print(f"ā MongoDB backup failed: {result.stderr}")
return None
except Exception as e:
print(f"ā Error backing up MongoDB: {e}")
return None
def backup_git_repositories(self, repo_paths: List[Path]):
"""
Backup Git repositories with full history.
"""
backup_dir = self.backup_base / 'repositories'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
for repo_path in repo_paths:
try:
repo = Repo(repo_path)
# Check for uncommitted changes
if repo.is_dirty():
print(f"ā ļø Repository {repo_path.name} has uncommitted changes")
# Create bundle (includes all branches and tags)
bundle_name = f"{repo_path.name}_{timestamp}.bundle"
bundle_path = backup_dir / bundle_name
repo.git.bundle('create', str(bundle_path), '--all')
# Compress bundle
with open(bundle_path, 'rb') as f_in:
with gzip.open(f"{bundle_path}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
bundle_path.unlink()
print(f"ā
Git repository backed up: {bundle_name}.gz")
# Also backup working directory if dirty
if repo.is_dirty():
working_backup = backup_dir / f"{repo_path.name}_{timestamp}_working.tar.gz"
with tarfile.open(working_backup, 'w:gz') as tar:
tar.add(repo_path, arcname=repo_path.name,
filter=lambda x: x if '.git' not in x.name else None)
print(f" š Working directory backed up: {working_backup.name}")
except Exception as e:
print(f"ā Error backing up repository {repo_path}: {e}")
def backup_docker_containers(self):
"""
Backup Docker containers and volumes.
"""
backup_dir = self.backup_base / 'docker'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
try:
client = docker.from_env()
# Backup running containers
for container in client.containers.list():
try:
# Commit container to image
image_name = f"backup_{container.name}_{timestamp}"
container.commit(repository=image_name)
# Export image
image_file = backup_dir / f"{container.name}_{timestamp}.tar"
with open(image_file, 'wb') as f:
for chunk in client.images.get(image_name).save():
f.write(chunk)
# Compress
with open(image_file, 'rb') as f_in:
with gzip.open(f"{image_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
image_file.unlink()
print(f"ā
Docker container backed up: {container.name}")
# Remove temporary image
client.images.remove(image_name)
except Exception as e:
print(f"ā Error backing up container {container.name}: {e}")
# Backup volumes
for volume in client.volumes.list():
try:
volume_backup = backup_dir / f"volume_{volume.name}_{timestamp}.tar.gz"
# Create temporary container to access volume
temp_container = client.containers.run(
'alpine',
'tar czf /backup.tar.gz /data',
volumes={volume.name: {'bind': '/data', 'mode': 'ro'}},
detach=True,
remove=False
)
# Wait for completion
temp_container.wait()
# Copy backup from container
with open(volume_backup, 'wb') as f:
bits, _ = temp_container.get_archive('/backup.tar.gz')
for chunk in bits:
f.write(chunk)
# Remove temporary container
temp_container.remove()
print(f"ā
Docker volume backed up: {volume.name}")
except Exception as e:
print(f"ā Error backing up volume {volume.name}: {e}")
except Exception as e:
print(f"ā Error connecting to Docker: {e}")
def backup_system_configuration(self):
"""
Backup important system configuration files.
"""
backup_dir = self.backup_base / 'system_config'
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Important config locations (Linux example)
config_locations = [
'/etc/fstab',
'/etc/hosts',
'/etc/hostname',
'/etc/network/interfaces',
'/etc/crontab',
'/etc/ssh/sshd_config',
'/etc/apache2',
'/etc/nginx',
'/home/*/.bashrc',
'/home/*/.ssh/config'
]
backup_archive = backup_dir / f"system_config_{timestamp}.tar.gz"
with tarfile.open(backup_archive, 'w:gz') as tar:
for pattern in config_locations:
for path in Path('/').glob(pattern.lstrip('/')):
if path.exists():
try:
tar.add(path, arcname=str(path))
print(f" š Added: {path}")
except PermissionError:
print(f" ā ļø Permission denied: {path}")
print(f"ā
System configuration backed up: {backup_archive.name}")
return backup_archive
class RealTimeBackupSync:
"""
Real-time backup synchronization using file system monitoring.
"""
def __init__(self, source: Path, destination: Path):
self.source = Path(source)
self.destination = Path(destination)
self.destination.mkdir(parents=True, exist_ok=True)
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
self.observer = Observer()
self.event_handler = self.BackupEventHandler(self)
class BackupEventHandler(FileSystemEventHandler):
def __init__(self, parent):
self.parent = parent
def on_created(self, event):
if not event.is_directory:
self.parent.backup_file(Path(event.src_path))
def on_modified(self, event):
if not event.is_directory:
self.parent.backup_file(Path(event.src_path))
def on_deleted(self, event):
if not event.is_directory:
self.parent.remove_backup(Path(event.src_path))
def backup_file(self, source_file: Path):
"""Backup a single file in real-time."""
rel_path = source_file.relative_to(self.source)
dest_file = self.destination / rel_path
dest_file.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy2(source_file, dest_file)
print(f" ā» Synced: {rel_path}")
except Exception as e:
print(f" ā Sync failed for {rel_path}: {e}")
def remove_backup(self, source_file: Path):
"""Remove file from backup when deleted from source."""
rel_path = source_file.relative_to(self.source)
dest_file = self.destination / rel_path
if dest_file.exists():
dest_file.unlink()
print(f" šļø Removed from backup: {rel_path}")
def start(self):
"""Start real-time synchronization."""
print(f"š Starting real-time backup sync")
print(f" Source: {self.source}")
print(f" Destination: {self.destination}")
# Initial sync
self.initial_sync()
# Start monitoring
self.observer.schedule(
self.event_handler,
str(self.source),
recursive=True
)
self.observer.start()
print("ā
Real-time sync active (press Ctrl+C to stop)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.observer.stop()
print("\nā¹ļø Real-time sync stopped")
self.observer.join()
def initial_sync(self):
"""Perform initial synchronization."""
print(" Performing initial sync...")
for source_file in self.source.rglob('*'):
if source_file.is_file():
self.backup_file(source_file)
print(" Initial sync complete")
# Usage examples
if __name__ == "__main__":
# Database backups
db_backup = SpecializedBackupStrategies(Path("/backup/specialized"))
# Backup PostgreSQL
db_backup.backup_postgresql({
'host': 'localhost',
'user': 'dbuser',
'password': 'dbpass',
'database': 'production'
}, 'production_db')
# Backup Git repositories
db_backup.backup_git_repositories([
Path("/home/user/projects/website"),
Path("/home/user/projects/api")
])
# Real-time sync for critical folders
realtime = RealTimeBackupSync(
source="/home/user/critical_documents",
destination="/backup/realtime/documents"
)
realtime.start()
Key Takeaways and Best Practices šÆ
- Automate Everything: Manual backups are forgotten backups. Schedule automatic backups that run without your intervention.
- Test Restores Regularly: A backup that can't be restored is worse than no backup. Test your restoration process monthly.
- Use Multiple Strategies: Combine full, incremental, and differential backups for optimal storage and recovery time.
- Implement Versioning: Keep multiple versions of files to protect against corruption and accidental changes.
- Encrypt Sensitive Data: Always encrypt backups containing personal or sensitive information.
- Monitor and Alert: Set up notifications for backup failures and successes.
- Document Your System: Keep clear documentation of your backup strategy and restoration procedures.
The Backup Commandments š
Backup automation is your insurance policy against digital disasters. Whether it's hardware failure, ransomware, or human error, a well-designed backup system ensures you can always recover. The scripts and strategies we've covered give you enterprise-level data protection with Python's power and flexibility! š
Pro Tip: Remember the backup paradox: The value of backups is only realized when you need them, but by then it's too late to create them. Start your backup automation today, test it tomorrow, and sleep peacefully knowing your data is safe!