Skip to main content

šŸ—„ļø Database Automation: Master Data Persistence with Python

Databases are the memory banks of the digital world. They store everything from user profiles to financial transactions, from IoT sensor data to social media posts. But managing databases manually is like trying to organize a library while blindfolded. Python gives you X-ray vision and robotic precision for database operations! šŸ’¾

The Database Automation Ecosystem

Think of databases as massive, organized warehouses. SQL is your forklift, Python is your warehouse management system, and automation is your army of tireless workers who never make mistakes. Whether you're dealing with PostgreSQL's reliability, MySQL's speed, MongoDB's flexibility, or Redis's lightning-fast caching, Python speaks all their languages fluently!

graph TB A[Database Operations] --> B[Connection Management] A --> C[Query Automation] A --> D[Data Migration] A --> E[Backup & Recovery] A --> F[Performance Optimization] B --> G[Connection Pooling] B --> H[Failover Handling] C --> I[Query Building] C --> J[Batch Operations] C --> K[Stored Procedures] D --> L[Schema Migration] D --> M[Data Transfer] D --> N[ETL Pipelines] E --> O[Automated Backups] E --> P[Point-in-Time Recovery] F --> Q[Index Management] F --> R[Query Optimization] F --> S[Cache Management] style A fill:#ff6b6b style G fill:#51cf66 style I fill:#51cf66 style L fill:#51cf66 style O fill:#51cf66 style Q fill:#51cf66

Real-World Scenario: The Multi-Database Orchestra šŸŽ­

You're the data architect for an e-commerce platform. You have PostgreSQL for transactions, MongoDB for product catalogs, Redis for session management, Elasticsearch for search, and MySQL for legacy systems. Each database needs maintenance, backups, migrations, and monitoring. Let's build an automation system that conducts this orchestra flawlessly!

import psycopg2
import pymongo
import mysql.connector
import redis
import sqlite3
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, Float, Boolean, ForeignKey, text
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
import pandas as pd
import asyncio
import asyncpg
import aiomysql
import motor.motor_asyncio
from typing import Dict, List, Any, Optional, Union, Tuple
import logging
from datetime import datetime, timedelta
from pathlib import Path
import json
import yaml
from contextlib import contextmanager
import subprocess
import schedule
import time
from dataclasses import dataclass
from enum import Enum

class DatabaseType(Enum):
    """Supported database types."""
    POSTGRESQL = "postgresql"
    MYSQL = "mysql"
    MONGODB = "mongodb"
    REDIS = "redis"
    SQLITE = "sqlite"
    MSSQL = "mssql"
    ORACLE = "oracle"

@dataclass
class DatabaseConfig:
    """Database configuration."""
    db_type: DatabaseType
    host: str
    port: int
    database: str
    username: str
    password: str
    options: Dict = None

class UniversalDatabaseManager:
    """
    Universal database management system for multiple database types.
    """
    
    def __init__(self, config_file: str = None):
        self.configs = {}
        self.connections = {}
        self.engines = {}
        self.pools = {}
        
        if config_file:
            self.load_config(config_file)
        
        self.setup_logging()
    
    def setup_logging(self):
        """Setup comprehensive logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self, config_file: str):
        """Load database configurations from file."""
        with open(config_file, 'r') as f:
            if config_file.endswith('.yaml'):
                configs = yaml.safe_load(f)
            else:
                configs = json.load(f)
        
        for name, config in configs.items():
            self.configs[name] = DatabaseConfig(
                db_type=DatabaseType(config['type']),
                host=config['host'],
                port=config['port'],
                database=config['database'],
                username=config['username'],
                password=config['password'],
                options=config.get('options', {})
            )
    
    def get_connection_string(self, config: DatabaseConfig) -> str:
        """Generate connection string for different databases."""
        if config.db_type == DatabaseType.POSTGRESQL:
            return f"postgresql://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}"
        
        elif config.db_type == DatabaseType.MYSQL:
            return f"mysql+mysqlconnector://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}"
        
        elif config.db_type == DatabaseType.SQLITE:
            return f"sqlite:///{config.database}"
        
        elif config.db_type == DatabaseType.MSSQL:
            return f"mssql+pyodbc://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}?driver=ODBC+Driver+17+for+SQL+Server"
        
        else:
            raise ValueError(f"Unsupported database type: {config.db_type}")
    
    @contextmanager
    def get_connection(self, db_name: str):
        """Get database connection with context manager."""
        config = self.configs[db_name]
        
        if config.db_type == DatabaseType.POSTGRESQL:
            conn = psycopg2.connect(
                host=config.host,
                port=config.port,
                database=config.database,
                user=config.username,
                password=config.password
            )
        
        elif config.db_type == DatabaseType.MYSQL:
            conn = mysql.connector.connect(
                host=config.host,
                port=config.port,
                database=config.database,
                user=config.username,
                password=config.password
            )
        
        elif config.db_type == DatabaseType.MONGODB:
            client = pymongo.MongoClient(
                host=config.host,
                port=config.port,
                username=config.username,
                password=config.password
            )
            conn = client[config.database]
        
        elif config.db_type == DatabaseType.REDIS:
            conn = redis.Redis(
                host=config.host,
                port=config.port,
                password=config.password,
                decode_responses=True
            )
        
        elif config.db_type == DatabaseType.SQLITE:
            conn = sqlite3.connect(config.database)
        
        else:
            raise ValueError(f"Unsupported database type: {config.db_type}")
        
        try:
            yield conn
        finally:
            if hasattr(conn, 'close'):
                conn.close()
    
    def get_sqlalchemy_engine(self, db_name: str, pool_size: int = 10):
        """Get SQLAlchemy engine with connection pooling."""
        if db_name not in self.engines:
            config = self.configs[db_name]
            connection_string = self.get_connection_string(config)
            
            self.engines[db_name] = create_engine(
                connection_string,
                poolclass=QueuePool,
                pool_size=pool_size,
                max_overflow=20,
                pool_timeout=30,
                pool_recycle=3600
            )
        
        return self.engines[db_name]
    
    def execute_query(self, db_name: str, query: str, params: Dict = None) -> List[Dict]:
        """Execute query on any database."""
        config = self.configs[db_name]
        results = []
        
        if config.db_type == DatabaseType.MONGODB:
            # MongoDB query
            with self.get_connection(db_name) as db:
                collection_name = query.split('.')[0] if '.' in query else 'default'
                collection = db[collection_name]
                
                if params:
                    cursor = collection.find(params)
                else:
                    cursor = collection.find()
                
                results = list(cursor)
        
        else:
            # SQL databases
            with self.get_connection(db_name) as conn:
                cursor = conn.cursor()
                
                if params:
                    cursor.execute(query, params)
                else:
                    cursor.execute(query)
                
                if cursor.description:
                    columns = [desc[0] for desc in cursor.description]
                    results = [dict(zip(columns, row)) for row in cursor.fetchall()]
                
                if hasattr(conn, 'commit'):
                    conn.commit()
        
        self.logger.info(f"Executed query on {db_name}: {len(results)} results")
        return results
    
    def bulk_insert(self, db_name: str, table: str, data: List[Dict]) -> int:
        """Bulk insert data into database."""
        if not data:
            return 0
        
        config = self.configs[db_name]
        
        if config.db_type == DatabaseType.MONGODB:
            with self.get_connection(db_name) as db:
                collection = db[table]
                result = collection.insert_many(data)
                return len(result.inserted_ids)
        
        else:
            # SQL databases - use pandas for convenience
            df = pd.DataFrame(data)
            engine = self.get_sqlalchemy_engine(db_name)
            
            rows_inserted = df.to_sql(
                table, 
                engine, 
                if_exists='append', 
                index=False,
                method='multi',
                chunksize=1000
            )
            
            self.logger.info(f"Bulk inserted {len(data)} rows into {db_name}.{table}")
            return len(data)
    
    def migrate_data(self, source_db: str, target_db: str, 
                    table_mapping: Dict[str, str], 
                    transform_func: callable = None) -> Dict:
        """
        Migrate data between databases with optional transformation.
        """
        stats = {
            'tables_migrated': 0,
            'total_rows': 0,
            'errors': []
        }
        
        for source_table, target_table in table_mapping.items():
            try:
                # Extract from source
                self.logger.info(f"Extracting data from {source_db}.{source_table}")
                
                if self.configs[source_db].db_type == DatabaseType.MONGODB:
                    data = self.execute_query(source_db, source_table)
                else:
                    query = f"SELECT * FROM {source_table}"
                    data = self.execute_query(source_db, query)
                
                # Transform if needed
                if transform_func:
                    data = transform_func(data)
                
                # Load to target
                if data:
                    self.logger.info(f"Loading {len(data)} rows to {target_db}.{target_table}")
                    self.bulk_insert(target_db, target_table, data)
                    
                    stats['tables_migrated'] += 1
                    stats['total_rows'] += len(data)
                
            except Exception as e:
                error_msg = f"Migration failed for {source_table}: {e}"
                self.logger.error(error_msg)
                stats['errors'].append(error_msg)
        
        return stats
    
    def backup_database(self, db_name: str, backup_path: str) -> bool:
        """
        Create database backup.
        """
        config = self.configs[db_name]
        backup_file = Path(backup_path) / f"{db_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.backup"
        
        try:
            if config.db_type == DatabaseType.POSTGRESQL:
                # Use pg_dump
                cmd = [
                    'pg_dump',
                    f'--host={config.host}',
                    f'--port={config.port}',
                    f'--username={config.username}',
                    f'--dbname={config.database}',
                    '--format=custom',
                    '--verbose',
                    f'--file={backup_file}'
                ]
                
                env = {'PGPASSWORD': config.password}
                result = subprocess.run(cmd, env=env, capture_output=True, text=True)
                
                if result.returncode != 0:
                    raise Exception(f"Backup failed: {result.stderr}")
            
            elif config.db_type == DatabaseType.MYSQL:
                # Use mysqldump
                cmd = [
                    'mysqldump',
                    f'--host={config.host}',
                    f'--port={config.port}',
                    f'--user={config.username}',
                    f'--password={config.password}',
                    config.database,
                    '--single-transaction',
                    '--routines',
                    '--triggers',
                    '--events'
                ]
                
                with open(backup_file, 'w') as f:
                    result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
                
                if result.returncode != 0:
                    raise Exception(f"Backup failed: {result.stderr}")
            
            elif config.db_type == DatabaseType.MONGODB:
                # Use mongodump
                cmd = [
                    'mongodump',
                    '--host', f'{config.host}:{config.port}',
                    '--db', config.database,
                    '--out', str(backup_file),
                    '--username', config.username,
                    '--password', config.password
                ]
                
                result = subprocess.run(cmd, capture_output=True, text=True)
                
                if result.returncode != 0:
                    raise Exception(f"Backup failed: {result.stderr}")
            
            elif config.db_type == DatabaseType.SQLITE:
                # Simple file copy for SQLite
                import shutil
                shutil.copy2(config.database, backup_file)
            
            self.logger.info(f"Backup created: {backup_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"Backup failed for {db_name}: {e}")
            return False
    
    def restore_database(self, db_name: str, backup_file: str) -> bool:
        """
        Restore database from backup.
        """
        config = self.configs[db_name]
        
        try:
            if config.db_type == DatabaseType.POSTGRESQL:
                # Use pg_restore
                cmd = [
                    'pg_restore',
                    f'--host={config.host}',
                    f'--port={config.port}',
                    f'--username={config.username}',
                    f'--dbname={config.database}',
                    '--verbose',
                    '--clean',
                    backup_file
                ]
                
                env = {'PGPASSWORD': config.password}
                result = subprocess.run(cmd, env=env, capture_output=True, text=True)
                
                if result.returncode != 0:
                    raise Exception(f"Restore failed: {result.stderr}")
            
            elif config.db_type == DatabaseType.MYSQL:
                # Use mysql
                cmd = [
                    'mysql',
                    f'--host={config.host}',
                    f'--port={config.port}',
                    f'--user={config.username}',
                    f'--password={config.password}',
                    config.database
                ]
                
                with open(backup_file, 'r') as f:
                    result = subprocess.run(cmd, stdin=f, capture_output=True, text=True)
                
                if result.returncode != 0:
                    raise Exception(f"Restore failed: {result.stderr}")
            
            elif config.db_type == DatabaseType.SQLITE:
                # Simple file copy for SQLite
                import shutil
                shutil.copy2(backup_file, config.database)
            
            self.logger.info(f"Database restored from: {backup_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"Restore failed for {db_name}: {e}")
            return False

class DatabaseOptimizer:
    """
    Database optimization and maintenance automation.
    """
    
    def __init__(self, db_manager: UniversalDatabaseManager):
        self.db_manager = db_manager
        self.logger = logging.getLogger(__name__)
    
    def analyze_query_performance(self, db_name: str, query: str) -> Dict:
        """
        Analyze query performance.
        """
        config = self.db_manager.configs[db_name]
        analysis = {}
        
        if config.db_type == DatabaseType.POSTGRESQL:
            # Use EXPLAIN ANALYZE
            explain_query = f"EXPLAIN ANALYZE {query}"
            results = self.db_manager.execute_query(db_name, explain_query)
            
            # Parse execution plan
            analysis['execution_plan'] = results
            analysis['total_time'] = self._extract_postgres_time(results)
            
        elif config.db_type == DatabaseType.MYSQL:
            # Use EXPLAIN
            explain_query = f"EXPLAIN {query}"
            results = self.db_manager.execute_query(db_name, explain_query)
            
            analysis['execution_plan'] = results
            analysis['possible_keys'] = [r.get('possible_keys') for r in results]
            analysis['key_used'] = [r.get('key') for r in results]
        
        return analysis
    
    def _extract_postgres_time(self, explain_results: List) -> float:
        """Extract execution time from PostgreSQL EXPLAIN output."""
        for result in explain_results:
            if 'Execution Time' in str(result):
                # Parse execution time
                import re
                match = re.search(r'Execution Time: ([\d.]+)', str(result))
                if match:
                    return float(match.group(1))
        return 0.0
    
    def optimize_indexes(self, db_name: str, table: str) -> List[str]:
        """
        Suggest and create optimal indexes.
        """
        suggestions = []
        config = self.db_manager.configs[db_name]
        
        if config.db_type in [DatabaseType.POSTGRESQL, DatabaseType.MYSQL]:
            # Analyze table statistics
            if config.db_type == DatabaseType.POSTGRESQL:
                # Get column statistics
                query = f"""
                    SELECT 
                        attname AS column_name,
                        n_distinct,
                        null_frac
                    FROM pg_stats
                    WHERE tablename = '{table}'
                """
            else:  # MySQL
                query = f"""
                    SELECT 
                        COLUMN_NAME as column_name,
                        CARDINALITY as n_distinct
                    FROM INFORMATION_SCHEMA.STATISTICS
                    WHERE TABLE_NAME = '{table}'
                """
            
            stats = self.db_manager.execute_query(db_name, query)
            
            # Suggest indexes based on cardinality
            for stat in stats:
                if stat.get('n_distinct', 0) > 100:  # High cardinality
                    column = stat['column_name']
                    index_name = f"idx_{table}_{column}"
                    
                    create_index = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table}({column})"
                    suggestions.append(create_index)
            
            # Create suggested indexes
            for index_query in suggestions:
                try:
                    self.db_manager.execute_query(db_name, index_query)
                    self.logger.info(f"Created index: {index_query}")
                except Exception as e:
                    self.logger.warning(f"Index creation failed: {e}")
        
        return suggestions
    
    def vacuum_database(self, db_name: str):
        """
        Perform database maintenance (VACUUM, ANALYZE, etc.)
        """
        config = self.db_manager.configs[db_name]
        
        if config.db_type == DatabaseType.POSTGRESQL:
            # Run VACUUM ANALYZE
            self.db_manager.execute_query(db_name, "VACUUM ANALYZE")
            self.logger.info(f"PostgreSQL VACUUM ANALYZE completed for {db_name}")
            
        elif config.db_type == DatabaseType.MYSQL:
            # Optimize all tables
            query = "SHOW TABLES"
            tables = self.db_manager.execute_query(db_name, query)
            
            for table in tables:
                table_name = list(table.values())[0]
                optimize_query = f"OPTIMIZE TABLE {table_name}"
                self.db_manager.execute_query(db_name, optimize_query)
            
            self.logger.info(f"MySQL OPTIMIZE completed for {db_name}")
        
        elif config.db_type == DatabaseType.SQLITE:
            # Run VACUUM
            self.db_manager.execute_query(db_name, "VACUUM")
            self.logger.info(f"SQLite VACUUM completed for {db_name}")
    
    def analyze_table_sizes(self, db_name: str) -> List[Dict]:
        """
        Analyze table sizes and row counts.
        """
        config = self.db_manager.configs[db_name]
        sizes = []
        
        if config.db_type == DatabaseType.POSTGRESQL:
            query = """
                SELECT 
                    schemaname,
                    tablename,
                    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
                    n_live_tup AS row_count
                FROM pg_stat_user_tables
                ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
            """
            sizes = self.db_manager.execute_query(db_name, query)
            
        elif config.db_type == DatabaseType.MYSQL:
            query = """
                SELECT 
                    TABLE_NAME as tablename,
                    ROUND(((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024), 2) AS size_mb,
                    TABLE_ROWS as row_count
                FROM information_schema.TABLES
                WHERE TABLE_SCHEMA = DATABASE()
                ORDER BY (DATA_LENGTH + INDEX_LENGTH) DESC
            """
            sizes = self.db_manager.execute_query(db_name, query)
        
        return sizes

class AsyncDatabaseManager:
    """
    Asynchronous database operations for high performance.
    """
    
    def __init__(self):
        self.pools = {}
        self.logger = logging.getLogger(__name__)
    
    async def create_postgres_pool(self, config: DatabaseConfig):
        """Create async PostgreSQL connection pool."""
        pool = await asyncpg.create_pool(
            host=config.host,
            port=config.port,
            database=config.database,
            user=config.username,
            password=config.password,
            min_size=10,
            max_size=20
        )
        self.pools['postgres'] = pool
        return pool
    
    async def create_mysql_pool(self, config: DatabaseConfig):
        """Create async MySQL connection pool."""
        pool = await aiomysql.create_pool(
            host=config.host,
            port=config.port,
            db=config.database,
            user=config.username,
            password=config.password,
            minsize=10,
            maxsize=20
        )
        self.pools['mysql'] = pool
        return pool
    
    async def execute_postgres_query(self, query: str, params: tuple = None):
        """Execute async PostgreSQL query."""
        pool = self.pools.get('postgres')
        if not pool:
            raise Exception("PostgreSQL pool not initialized")
        
        async with pool.acquire() as connection:
            if params:
                result = await connection.fetch(query, *params)
            else:
                result = await connection.fetch(query)
            
            return [dict(row) for row in result]
    
    async def bulk_insert_async(self, table: str, data: List[Dict]):
        """Async bulk insert."""
        pool = self.pools.get('postgres')
        if not pool:
            raise Exception("PostgreSQL pool not initialized")
        
        async with pool.acquire() as connection:
            # Prepare the COPY statement
            columns = list(data[0].keys())
            
            # Use COPY for maximum performance
            result = await connection.copy_records_to_table(
                table,
                records=[tuple(row.values()) for row in data],
                columns=columns
            )
            
            self.logger.info(f"Async bulk inserted {len(data)} rows")
            return result
    
    async def parallel_queries(self, queries: List[str]):
        """Execute multiple queries in parallel."""
        tasks = [self.execute_postgres_query(query) for query in queries]
        results = await asyncio.gather(*tasks)
        return results

class DatabaseMonitor:
    """
    Database monitoring and alerting system.
    """
    
    def __init__(self, db_manager: UniversalDatabaseManager):
        self.db_manager = db_manager
        self.alerts = []
        self.metrics = {}
        self.logger = logging.getLogger(__name__)
    
    def check_connection_health(self, db_name: str) -> bool:
        """Check if database connection is healthy."""
        try:
            config = self.db_manager.configs[db_name]
            
            if config.db_type == DatabaseType.POSTGRESQL:
                query = "SELECT 1"
            elif config.db_type == DatabaseType.MYSQL:
                query = "SELECT 1"
            elif config.db_type == DatabaseType.MONGODB:
                with self.db_manager.get_connection(db_name) as db:
                    db.command('ping')
                return True
            elif config.db_type == DatabaseType.REDIS:
                with self.db_manager.get_connection(db_name) as conn:
                    return conn.ping()
            else:
                query = "SELECT 1"
            
            if config.db_type != DatabaseType.MONGODB:
                self.db_manager.execute_query(db_name, query)
            
            return True
            
        except Exception as e:
            self.logger.error(f"Connection health check failed for {db_name}: {e}")
            self.alerts.append({
                'database': db_name,
                'type': 'connection_failure',
                'message': str(e),
                'timestamp': datetime.now()
            })
            return False
    
    def monitor_performance_metrics(self, db_name: str) -> Dict:
        """Monitor database performance metrics."""
        config = self.db_manager.configs[db_name]
        metrics = {}
        
        if config.db_type == DatabaseType.POSTGRESQL:
            # Get PostgreSQL metrics
            queries = {
                'active_connections': """
                    SELECT count(*) as count 
                    FROM pg_stat_activity 
                    WHERE state = 'active'
                """,
                'database_size': """
                    SELECT pg_database_size(current_database()) as size
                """,
                'cache_hit_ratio': """
                    SELECT 
                        sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) as ratio
                    FROM pg_statio_user_tables
                """,
                'transaction_count': """
                    SELECT xact_commit + xact_rollback as count
                    FROM pg_stat_database
                    WHERE datname = current_database()
                """
            }
            
            for metric_name, query in queries.items():
                try:
                    result = self.db_manager.execute_query(db_name, query)
                    if result:
                        metrics[metric_name] = result[0].get('count') or result[0].get('size') or result[0].get('ratio')
                except Exception as e:
                    self.logger.warning(f"Failed to get metric {metric_name}: {e}")
        
        elif config.db_type == DatabaseType.MYSQL:
            # Get MySQL metrics
            queries = {
                'threads_connected': "SHOW STATUS LIKE 'Threads_connected'",
                'questions': "SHOW STATUS LIKE 'Questions'",
                'slow_queries': "SHOW STATUS LIKE 'Slow_queries'",
                'uptime': "SHOW STATUS LIKE 'Uptime'"
            }
            
            for metric_name, query in queries.items():
                try:
                    result = self.db_manager.execute_query(db_name, query)
                    if result:
                        metrics[metric_name] = result[0].get('Value')
                except Exception as e:
                    self.logger.warning(f"Failed to get metric {metric_name}: {e}")
        
        self.metrics[db_name] = metrics
        return metrics
    
    def check_slow_queries(self, db_name: str, threshold_ms: int = 1000) -> List[Dict]:
        """Check for slow queries."""
        config = self.db_manager.configs[db_name]
        slow_queries = []
        
        if config.db_type == DatabaseType.POSTGRESQL:
            query = f"""
                SELECT 
                    query,
                    calls,
                    mean_exec_time,
                    max_exec_time
                FROM pg_stat_statements
                WHERE mean_exec_time > {threshold_ms}
                ORDER BY mean_exec_time DESC
                LIMIT 10
            """
            
            try:
                slow_queries = self.db_manager.execute_query(db_name, query)
            except:
                # pg_stat_statements might not be enabled
                self.logger.warning("pg_stat_statements not available")
        
        elif config.db_type == DatabaseType.MYSQL:
            # Check slow query log
            query = "SHOW VARIABLES LIKE 'slow_query_log'"
            result = self.db_manager.execute_query(db_name, query)
            
            if result and result[0].get('Value') == 'ON':
                # Get recent slow queries
                query = """
                    SELECT 
                        sql_text,
                        execution_count,
                        total_latency
                    FROM performance_schema.events_statements_summary_by_digest
                    WHERE avg_timer_wait > %s * 1000000
                    ORDER BY avg_timer_wait DESC
                    LIMIT 10
                """
                
                try:
                    slow_queries = self.db_manager.execute_query(db_name, query, (threshold_ms,))
                except:
                    self.logger.warning("Performance schema not available")
        
        if slow_queries:
            self.alerts.append({
                'database': db_name,
                'type': 'slow_queries',
                'count': len(slow_queries),
                'queries': slow_queries,
                'timestamp': datetime.now()
            })
        
        return slow_queries

class DatabaseScheduler:
    """
    Schedule and automate database tasks.
    """
    
    def __init__(self, db_manager: UniversalDatabaseManager):
        self.db_manager = db_manager
        self.optimizer = DatabaseOptimizer(db_manager)
        self.monitor = DatabaseMonitor(db_manager)
        self.jobs = []
        self.logger = logging.getLogger(__name__)
    
    def schedule_backup(self, db_name: str, backup_path: str, 
                       schedule_time: str = "02:00"):
        """Schedule daily backups."""
        def backup_job():
            self.logger.info(f"Starting scheduled backup for {db_name}")
            success = self.db_manager.backup_database(db_name, backup_path)
            
            if success:
                # Clean old backups (keep last 7 days)
                self.cleanup_old_backups(backup_path, days_to_keep=7)
        
        schedule.every().day.at(schedule_time).do(backup_job)
        self.jobs.append(f"Daily backup of {db_name} at {schedule_time}")
        self.logger.info(f"Scheduled daily backup for {db_name} at {schedule_time}")
    
    def schedule_optimization(self, db_name: str, schedule_day: str = "sunday"):
        """Schedule weekly optimization."""
        def optimization_job():
            self.logger.info(f"Starting scheduled optimization for {db_name}")
            
            # Run VACUUM/OPTIMIZE
            self.optimizer.vacuum_database(db_name)
            
            # Analyze and create indexes
            tables = self.get_tables(db_name)
            for table in tables:
                self.optimizer.optimize_indexes(db_name, table)
        
        getattr(schedule.every(), schedule_day).do(optimization_job)
        self.jobs.append(f"Weekly optimization of {db_name} on {schedule_day}")
        self.logger.info(f"Scheduled weekly optimization for {db_name} on {schedule_day}")
    
    def schedule_monitoring(self, db_names: List[str], interval_minutes: int = 5):
        """Schedule regular monitoring."""
        def monitoring_job():
            for db_name in db_names:
                # Check connection health
                if not self.monitor.check_connection_health(db_name):
                    self.send_alert(f"Connection failed for {db_name}")
                
                # Get performance metrics
                metrics = self.monitor.monitor_performance_metrics(db_name)
                
                # Check for anomalies
                self.check_metric_thresholds(db_name, metrics)
                
                # Check slow queries
                self.monitor.check_slow_queries(db_name)
        
        schedule.every(interval_minutes).minutes.do(monitoring_job)
        self.jobs.append(f"Monitoring every {interval_minutes} minutes")
        self.logger.info(f"Scheduled monitoring every {interval_minutes} minutes")
    
    def cleanup_old_backups(self, backup_path: str, days_to_keep: int):
        """Clean up old backup files."""
        backup_dir = Path(backup_path)
        cutoff_date = datetime.now() - timedelta(days=days_to_keep)
        
        for backup_file in backup_dir.glob("*.backup"):
            if backup_file.stat().st_mtime < cutoff_date.timestamp():
                backup_file.unlink()
                self.logger.info(f"Deleted old backup: {backup_file}")
    
    def get_tables(self, db_name: str) -> List[str]:
        """Get list of tables in database."""
        config = self.db_manager.configs[db_name]
        
        if config.db_type == DatabaseType.POSTGRESQL:
            query = """
                SELECT tablename 
                FROM pg_tables 
                WHERE schemaname = 'public'
            """
        elif config.db_type == DatabaseType.MYSQL:
            query = "SHOW TABLES"
        else:
            return []
        
        results = self.db_manager.execute_query(db_name, query)
        return [list(r.values())[0] for r in results]
    
    def check_metric_thresholds(self, db_name: str, metrics: Dict):
        """Check if metrics exceed thresholds."""
        thresholds = {
            'active_connections': 100,
            'cache_hit_ratio': 0.9,  # Alert if below
            'slow_queries': 10
        }
        
        for metric, threshold in thresholds.items():
            if metric in metrics:
                value = metrics[metric]
                
                if metric == 'cache_hit_ratio':
                    if value < threshold:
                        self.send_alert(f"Low cache hit ratio in {db_name}: {value:.2%}")
                else:
                    if value > threshold:
                        self.send_alert(f"High {metric} in {db_name}: {value}")
    
    def send_alert(self, message: str):
        """Send alert (implement your notification method)."""
        self.logger.warning(f"ALERT: {message}")
        
        # Implement your alerting mechanism here:
        # - Email
        # - Slack
        # - PagerDuty
        # - SMS
        # etc.
    
    def run(self):
        """Run the scheduler."""
        self.logger.info("Database scheduler started")
        self.logger.info(f"Scheduled jobs: {self.jobs}")
        
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

# Example usage
if __name__ == "__main__":
    # Configuration
    db_config = {
        'postgres_main': {
            'type': 'postgresql',
            'host': 'localhost',
            'port': 5432,
            'database': 'maindb',
            'username': 'admin',
            'password': 'password'
        },
        'mysql_legacy': {
            'type': 'mysql',
            'host': 'localhost',
            'port': 3306,
            'database': 'legacydb',
            'username': 'root',
            'password': 'password'
        },
        'mongodb_products': {
            'type': 'mongodb',
            'host': 'localhost',
            'port': 27017,
            'database': 'products',
            'username': 'mongo_user',
            'password': 'password'
        }
    }
    
    # Initialize manager
    db_manager = UniversalDatabaseManager()
    
    # Add configurations
    for name, config in db_config.items():
        db_manager.configs[name] = DatabaseConfig(
            db_type=DatabaseType(config['type']),
            host=config['host'],
            port=config['port'],
            database=config['database'],
            username=config['username'],
            password=config['password']
        )
    
    # Example: Execute query
    results = db_manager.execute_query('postgres_main', 'SELECT * FROM users LIMIT 10')
    print(f"Query results: {results}")
    
    # Example: Bulk insert
    data_to_insert = [
        {'name': 'John Doe', 'email': 'john@example.com', 'age': 30},
        {'name': 'Jane Smith', 'email': 'jane@example.com', 'age': 25}
    ]
    db_manager.bulk_insert('postgres_main', 'users', data_to_insert)
    
    # Example: Backup database
    db_manager.backup_database('postgres_main', '/backups')
    
    # Example: Optimize database
    optimizer = DatabaseOptimizer(db_manager)
    optimizer.vacuum_database('postgres_main')
    optimizer.optimize_indexes('postgres_main', 'users')
    
    # Example: Monitor database
    monitor = DatabaseMonitor(db_manager)
    health = monitor.check_connection_health('postgres_main')
    metrics = monitor.monitor_performance_metrics('postgres_main')
    print(f"Database health: {health}")
    print(f"Performance metrics: {metrics}")
    
    # Example: Schedule tasks
    scheduler = DatabaseScheduler(db_manager)
    scheduler.schedule_backup('postgres_main', '/backups', '02:00')
    scheduler.schedule_optimization('postgres_main', 'sunday')
    scheduler.schedule_monitoring(['postgres_main', 'mysql_legacy'], 5)
    
    # Run scheduler (in production, this would be a daemon/service)
    # scheduler.run()
    
    print("\nāœ… Database automation complete!")

Key Takeaways and Best Practices šŸŽÆ

Database Automation Best Practices šŸ“‹

Pro Tip: Think of database automation as building a self-healing, self-optimizing data infrastructure. Start with the basics - automated backups and monitoring. Then add optimization, migration, and scaling capabilities. Remember: the goal isn't to replace DBAs, but to free them from repetitive tasks so they can focus on architecture and optimization. Always test automation scripts thoroughly - a bug in database automation can be catastrophic!

Database automation mastery transforms you from a data janitor to a data architect. You can manage multiple databases, ensure data integrity, optimize performance, and sleep soundly knowing your backups are running automatically. Whether you're managing a startup's single database or an enterprise's database fleet, these automation skills are invaluable! šŸš€