šļø Database Automation: Master Data Persistence with Python
Databases are the memory banks of the digital world. They store everything from user profiles to financial transactions, from IoT sensor data to social media posts. But managing databases manually is like trying to organize a library while blindfolded. Python gives you X-ray vision and robotic precision for database operations! š¾
The Database Automation Ecosystem
Think of databases as massive, organized warehouses. SQL is your forklift, Python is your warehouse management system, and automation is your army of tireless workers who never make mistakes. Whether you're dealing with PostgreSQL's reliability, MySQL's speed, MongoDB's flexibility, or Redis's lightning-fast caching, Python speaks all their languages fluently!
Real-World Scenario: The Multi-Database Orchestra š
You're the data architect for an e-commerce platform. You have PostgreSQL for transactions, MongoDB for product catalogs, Redis for session management, Elasticsearch for search, and MySQL for legacy systems. Each database needs maintenance, backups, migrations, and monitoring. Let's build an automation system that conducts this orchestra flawlessly!
import psycopg2
import pymongo
import mysql.connector
import redis
import sqlite3
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, Float, Boolean, ForeignKey, text
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
import pandas as pd
import asyncio
import asyncpg
import aiomysql
import motor.motor_asyncio
from typing import Dict, List, Any, Optional, Union, Tuple
import logging
from datetime import datetime, timedelta
from pathlib import Path
import json
import yaml
from contextlib import contextmanager
import subprocess
import schedule
import time
from dataclasses import dataclass
from enum import Enum
class DatabaseType(Enum):
"""Supported database types."""
POSTGRESQL = "postgresql"
MYSQL = "mysql"
MONGODB = "mongodb"
REDIS = "redis"
SQLITE = "sqlite"
MSSQL = "mssql"
ORACLE = "oracle"
@dataclass
class DatabaseConfig:
"""Database configuration."""
db_type: DatabaseType
host: str
port: int
database: str
username: str
password: str
options: Dict = None
class UniversalDatabaseManager:
"""
Universal database management system for multiple database types.
"""
def __init__(self, config_file: str = None):
self.configs = {}
self.connections = {}
self.engines = {}
self.pools = {}
if config_file:
self.load_config(config_file)
self.setup_logging()
def setup_logging(self):
"""Setup comprehensive logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self, config_file: str):
"""Load database configurations from file."""
with open(config_file, 'r') as f:
if config_file.endswith('.yaml'):
configs = yaml.safe_load(f)
else:
configs = json.load(f)
for name, config in configs.items():
self.configs[name] = DatabaseConfig(
db_type=DatabaseType(config['type']),
host=config['host'],
port=config['port'],
database=config['database'],
username=config['username'],
password=config['password'],
options=config.get('options', {})
)
def get_connection_string(self, config: DatabaseConfig) -> str:
"""Generate connection string for different databases."""
if config.db_type == DatabaseType.POSTGRESQL:
return f"postgresql://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}"
elif config.db_type == DatabaseType.MYSQL:
return f"mysql+mysqlconnector://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}"
elif config.db_type == DatabaseType.SQLITE:
return f"sqlite:///{config.database}"
elif config.db_type == DatabaseType.MSSQL:
return f"mssql+pyodbc://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}?driver=ODBC+Driver+17+for+SQL+Server"
else:
raise ValueError(f"Unsupported database type: {config.db_type}")
@contextmanager
def get_connection(self, db_name: str):
"""Get database connection with context manager."""
config = self.configs[db_name]
if config.db_type == DatabaseType.POSTGRESQL:
conn = psycopg2.connect(
host=config.host,
port=config.port,
database=config.database,
user=config.username,
password=config.password
)
elif config.db_type == DatabaseType.MYSQL:
conn = mysql.connector.connect(
host=config.host,
port=config.port,
database=config.database,
user=config.username,
password=config.password
)
elif config.db_type == DatabaseType.MONGODB:
client = pymongo.MongoClient(
host=config.host,
port=config.port,
username=config.username,
password=config.password
)
conn = client[config.database]
elif config.db_type == DatabaseType.REDIS:
conn = redis.Redis(
host=config.host,
port=config.port,
password=config.password,
decode_responses=True
)
elif config.db_type == DatabaseType.SQLITE:
conn = sqlite3.connect(config.database)
else:
raise ValueError(f"Unsupported database type: {config.db_type}")
try:
yield conn
finally:
if hasattr(conn, 'close'):
conn.close()
def get_sqlalchemy_engine(self, db_name: str, pool_size: int = 10):
"""Get SQLAlchemy engine with connection pooling."""
if db_name not in self.engines:
config = self.configs[db_name]
connection_string = self.get_connection_string(config)
self.engines[db_name] = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600
)
return self.engines[db_name]
def execute_query(self, db_name: str, query: str, params: Dict = None) -> List[Dict]:
"""Execute query on any database."""
config = self.configs[db_name]
results = []
if config.db_type == DatabaseType.MONGODB:
# MongoDB query
with self.get_connection(db_name) as db:
collection_name = query.split('.')[0] if '.' in query else 'default'
collection = db[collection_name]
if params:
cursor = collection.find(params)
else:
cursor = collection.find()
results = list(cursor)
else:
# SQL databases
with self.get_connection(db_name) as conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
if cursor.description:
columns = [desc[0] for desc in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
if hasattr(conn, 'commit'):
conn.commit()
self.logger.info(f"Executed query on {db_name}: {len(results)} results")
return results
def bulk_insert(self, db_name: str, table: str, data: List[Dict]) -> int:
"""Bulk insert data into database."""
if not data:
return 0
config = self.configs[db_name]
if config.db_type == DatabaseType.MONGODB:
with self.get_connection(db_name) as db:
collection = db[table]
result = collection.insert_many(data)
return len(result.inserted_ids)
else:
# SQL databases - use pandas for convenience
df = pd.DataFrame(data)
engine = self.get_sqlalchemy_engine(db_name)
rows_inserted = df.to_sql(
table,
engine,
if_exists='append',
index=False,
method='multi',
chunksize=1000
)
self.logger.info(f"Bulk inserted {len(data)} rows into {db_name}.{table}")
return len(data)
def migrate_data(self, source_db: str, target_db: str,
table_mapping: Dict[str, str],
transform_func: callable = None) -> Dict:
"""
Migrate data between databases with optional transformation.
"""
stats = {
'tables_migrated': 0,
'total_rows': 0,
'errors': []
}
for source_table, target_table in table_mapping.items():
try:
# Extract from source
self.logger.info(f"Extracting data from {source_db}.{source_table}")
if self.configs[source_db].db_type == DatabaseType.MONGODB:
data = self.execute_query(source_db, source_table)
else:
query = f"SELECT * FROM {source_table}"
data = self.execute_query(source_db, query)
# Transform if needed
if transform_func:
data = transform_func(data)
# Load to target
if data:
self.logger.info(f"Loading {len(data)} rows to {target_db}.{target_table}")
self.bulk_insert(target_db, target_table, data)
stats['tables_migrated'] += 1
stats['total_rows'] += len(data)
except Exception as e:
error_msg = f"Migration failed for {source_table}: {e}"
self.logger.error(error_msg)
stats['errors'].append(error_msg)
return stats
def backup_database(self, db_name: str, backup_path: str) -> bool:
"""
Create database backup.
"""
config = self.configs[db_name]
backup_file = Path(backup_path) / f"{db_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.backup"
try:
if config.db_type == DatabaseType.POSTGRESQL:
# Use pg_dump
cmd = [
'pg_dump',
f'--host={config.host}',
f'--port={config.port}',
f'--username={config.username}',
f'--dbname={config.database}',
'--format=custom',
'--verbose',
f'--file={backup_file}'
]
env = {'PGPASSWORD': config.password}
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Backup failed: {result.stderr}")
elif config.db_type == DatabaseType.MYSQL:
# Use mysqldump
cmd = [
'mysqldump',
f'--host={config.host}',
f'--port={config.port}',
f'--user={config.username}',
f'--password={config.password}',
config.database,
'--single-transaction',
'--routines',
'--triggers',
'--events'
]
with open(backup_file, 'w') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise Exception(f"Backup failed: {result.stderr}")
elif config.db_type == DatabaseType.MONGODB:
# Use mongodump
cmd = [
'mongodump',
'--host', f'{config.host}:{config.port}',
'--db', config.database,
'--out', str(backup_file),
'--username', config.username,
'--password', config.password
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Backup failed: {result.stderr}")
elif config.db_type == DatabaseType.SQLITE:
# Simple file copy for SQLite
import shutil
shutil.copy2(config.database, backup_file)
self.logger.info(f"Backup created: {backup_file}")
return True
except Exception as e:
self.logger.error(f"Backup failed for {db_name}: {e}")
return False
def restore_database(self, db_name: str, backup_file: str) -> bool:
"""
Restore database from backup.
"""
config = self.configs[db_name]
try:
if config.db_type == DatabaseType.POSTGRESQL:
# Use pg_restore
cmd = [
'pg_restore',
f'--host={config.host}',
f'--port={config.port}',
f'--username={config.username}',
f'--dbname={config.database}',
'--verbose',
'--clean',
backup_file
]
env = {'PGPASSWORD': config.password}
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Restore failed: {result.stderr}")
elif config.db_type == DatabaseType.MYSQL:
# Use mysql
cmd = [
'mysql',
f'--host={config.host}',
f'--port={config.port}',
f'--user={config.username}',
f'--password={config.password}',
config.database
]
with open(backup_file, 'r') as f:
result = subprocess.run(cmd, stdin=f, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Restore failed: {result.stderr}")
elif config.db_type == DatabaseType.SQLITE:
# Simple file copy for SQLite
import shutil
shutil.copy2(backup_file, config.database)
self.logger.info(f"Database restored from: {backup_file}")
return True
except Exception as e:
self.logger.error(f"Restore failed for {db_name}: {e}")
return False
class DatabaseOptimizer:
"""
Database optimization and maintenance automation.
"""
def __init__(self, db_manager: UniversalDatabaseManager):
self.db_manager = db_manager
self.logger = logging.getLogger(__name__)
def analyze_query_performance(self, db_name: str, query: str) -> Dict:
"""
Analyze query performance.
"""
config = self.db_manager.configs[db_name]
analysis = {}
if config.db_type == DatabaseType.POSTGRESQL:
# Use EXPLAIN ANALYZE
explain_query = f"EXPLAIN ANALYZE {query}"
results = self.db_manager.execute_query(db_name, explain_query)
# Parse execution plan
analysis['execution_plan'] = results
analysis['total_time'] = self._extract_postgres_time(results)
elif config.db_type == DatabaseType.MYSQL:
# Use EXPLAIN
explain_query = f"EXPLAIN {query}"
results = self.db_manager.execute_query(db_name, explain_query)
analysis['execution_plan'] = results
analysis['possible_keys'] = [r.get('possible_keys') for r in results]
analysis['key_used'] = [r.get('key') for r in results]
return analysis
def _extract_postgres_time(self, explain_results: List) -> float:
"""Extract execution time from PostgreSQL EXPLAIN output."""
for result in explain_results:
if 'Execution Time' in str(result):
# Parse execution time
import re
match = re.search(r'Execution Time: ([\d.]+)', str(result))
if match:
return float(match.group(1))
return 0.0
def optimize_indexes(self, db_name: str, table: str) -> List[str]:
"""
Suggest and create optimal indexes.
"""
suggestions = []
config = self.db_manager.configs[db_name]
if config.db_type in [DatabaseType.POSTGRESQL, DatabaseType.MYSQL]:
# Analyze table statistics
if config.db_type == DatabaseType.POSTGRESQL:
# Get column statistics
query = f"""
SELECT
attname AS column_name,
n_distinct,
null_frac
FROM pg_stats
WHERE tablename = '{table}'
"""
else: # MySQL
query = f"""
SELECT
COLUMN_NAME as column_name,
CARDINALITY as n_distinct
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_NAME = '{table}'
"""
stats = self.db_manager.execute_query(db_name, query)
# Suggest indexes based on cardinality
for stat in stats:
if stat.get('n_distinct', 0) > 100: # High cardinality
column = stat['column_name']
index_name = f"idx_{table}_{column}"
create_index = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table}({column})"
suggestions.append(create_index)
# Create suggested indexes
for index_query in suggestions:
try:
self.db_manager.execute_query(db_name, index_query)
self.logger.info(f"Created index: {index_query}")
except Exception as e:
self.logger.warning(f"Index creation failed: {e}")
return suggestions
def vacuum_database(self, db_name: str):
"""
Perform database maintenance (VACUUM, ANALYZE, etc.)
"""
config = self.db_manager.configs[db_name]
if config.db_type == DatabaseType.POSTGRESQL:
# Run VACUUM ANALYZE
self.db_manager.execute_query(db_name, "VACUUM ANALYZE")
self.logger.info(f"PostgreSQL VACUUM ANALYZE completed for {db_name}")
elif config.db_type == DatabaseType.MYSQL:
# Optimize all tables
query = "SHOW TABLES"
tables = self.db_manager.execute_query(db_name, query)
for table in tables:
table_name = list(table.values())[0]
optimize_query = f"OPTIMIZE TABLE {table_name}"
self.db_manager.execute_query(db_name, optimize_query)
self.logger.info(f"MySQL OPTIMIZE completed for {db_name}")
elif config.db_type == DatabaseType.SQLITE:
# Run VACUUM
self.db_manager.execute_query(db_name, "VACUUM")
self.logger.info(f"SQLite VACUUM completed for {db_name}")
def analyze_table_sizes(self, db_name: str) -> List[Dict]:
"""
Analyze table sizes and row counts.
"""
config = self.db_manager.configs[db_name]
sizes = []
if config.db_type == DatabaseType.POSTGRESQL:
query = """
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
n_live_tup AS row_count
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
"""
sizes = self.db_manager.execute_query(db_name, query)
elif config.db_type == DatabaseType.MYSQL:
query = """
SELECT
TABLE_NAME as tablename,
ROUND(((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024), 2) AS size_mb,
TABLE_ROWS as row_count
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY (DATA_LENGTH + INDEX_LENGTH) DESC
"""
sizes = self.db_manager.execute_query(db_name, query)
return sizes
class AsyncDatabaseManager:
"""
Asynchronous database operations for high performance.
"""
def __init__(self):
self.pools = {}
self.logger = logging.getLogger(__name__)
async def create_postgres_pool(self, config: DatabaseConfig):
"""Create async PostgreSQL connection pool."""
pool = await asyncpg.create_pool(
host=config.host,
port=config.port,
database=config.database,
user=config.username,
password=config.password,
min_size=10,
max_size=20
)
self.pools['postgres'] = pool
return pool
async def create_mysql_pool(self, config: DatabaseConfig):
"""Create async MySQL connection pool."""
pool = await aiomysql.create_pool(
host=config.host,
port=config.port,
db=config.database,
user=config.username,
password=config.password,
minsize=10,
maxsize=20
)
self.pools['mysql'] = pool
return pool
async def execute_postgres_query(self, query: str, params: tuple = None):
"""Execute async PostgreSQL query."""
pool = self.pools.get('postgres')
if not pool:
raise Exception("PostgreSQL pool not initialized")
async with pool.acquire() as connection:
if params:
result = await connection.fetch(query, *params)
else:
result = await connection.fetch(query)
return [dict(row) for row in result]
async def bulk_insert_async(self, table: str, data: List[Dict]):
"""Async bulk insert."""
pool = self.pools.get('postgres')
if not pool:
raise Exception("PostgreSQL pool not initialized")
async with pool.acquire() as connection:
# Prepare the COPY statement
columns = list(data[0].keys())
# Use COPY for maximum performance
result = await connection.copy_records_to_table(
table,
records=[tuple(row.values()) for row in data],
columns=columns
)
self.logger.info(f"Async bulk inserted {len(data)} rows")
return result
async def parallel_queries(self, queries: List[str]):
"""Execute multiple queries in parallel."""
tasks = [self.execute_postgres_query(query) for query in queries]
results = await asyncio.gather(*tasks)
return results
class DatabaseMonitor:
"""
Database monitoring and alerting system.
"""
def __init__(self, db_manager: UniversalDatabaseManager):
self.db_manager = db_manager
self.alerts = []
self.metrics = {}
self.logger = logging.getLogger(__name__)
def check_connection_health(self, db_name: str) -> bool:
"""Check if database connection is healthy."""
try:
config = self.db_manager.configs[db_name]
if config.db_type == DatabaseType.POSTGRESQL:
query = "SELECT 1"
elif config.db_type == DatabaseType.MYSQL:
query = "SELECT 1"
elif config.db_type == DatabaseType.MONGODB:
with self.db_manager.get_connection(db_name) as db:
db.command('ping')
return True
elif config.db_type == DatabaseType.REDIS:
with self.db_manager.get_connection(db_name) as conn:
return conn.ping()
else:
query = "SELECT 1"
if config.db_type != DatabaseType.MONGODB:
self.db_manager.execute_query(db_name, query)
return True
except Exception as e:
self.logger.error(f"Connection health check failed for {db_name}: {e}")
self.alerts.append({
'database': db_name,
'type': 'connection_failure',
'message': str(e),
'timestamp': datetime.now()
})
return False
def monitor_performance_metrics(self, db_name: str) -> Dict:
"""Monitor database performance metrics."""
config = self.db_manager.configs[db_name]
metrics = {}
if config.db_type == DatabaseType.POSTGRESQL:
# Get PostgreSQL metrics
queries = {
'active_connections': """
SELECT count(*) as count
FROM pg_stat_activity
WHERE state = 'active'
""",
'database_size': """
SELECT pg_database_size(current_database()) as size
""",
'cache_hit_ratio': """
SELECT
sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) as ratio
FROM pg_statio_user_tables
""",
'transaction_count': """
SELECT xact_commit + xact_rollback as count
FROM pg_stat_database
WHERE datname = current_database()
"""
}
for metric_name, query in queries.items():
try:
result = self.db_manager.execute_query(db_name, query)
if result:
metrics[metric_name] = result[0].get('count') or result[0].get('size') or result[0].get('ratio')
except Exception as e:
self.logger.warning(f"Failed to get metric {metric_name}: {e}")
elif config.db_type == DatabaseType.MYSQL:
# Get MySQL metrics
queries = {
'threads_connected': "SHOW STATUS LIKE 'Threads_connected'",
'questions': "SHOW STATUS LIKE 'Questions'",
'slow_queries': "SHOW STATUS LIKE 'Slow_queries'",
'uptime': "SHOW STATUS LIKE 'Uptime'"
}
for metric_name, query in queries.items():
try:
result = self.db_manager.execute_query(db_name, query)
if result:
metrics[metric_name] = result[0].get('Value')
except Exception as e:
self.logger.warning(f"Failed to get metric {metric_name}: {e}")
self.metrics[db_name] = metrics
return metrics
def check_slow_queries(self, db_name: str, threshold_ms: int = 1000) -> List[Dict]:
"""Check for slow queries."""
config = self.db_manager.configs[db_name]
slow_queries = []
if config.db_type == DatabaseType.POSTGRESQL:
query = f"""
SELECT
query,
calls,
mean_exec_time,
max_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > {threshold_ms}
ORDER BY mean_exec_time DESC
LIMIT 10
"""
try:
slow_queries = self.db_manager.execute_query(db_name, query)
except:
# pg_stat_statements might not be enabled
self.logger.warning("pg_stat_statements not available")
elif config.db_type == DatabaseType.MYSQL:
# Check slow query log
query = "SHOW VARIABLES LIKE 'slow_query_log'"
result = self.db_manager.execute_query(db_name, query)
if result and result[0].get('Value') == 'ON':
# Get recent slow queries
query = """
SELECT
sql_text,
execution_count,
total_latency
FROM performance_schema.events_statements_summary_by_digest
WHERE avg_timer_wait > %s * 1000000
ORDER BY avg_timer_wait DESC
LIMIT 10
"""
try:
slow_queries = self.db_manager.execute_query(db_name, query, (threshold_ms,))
except:
self.logger.warning("Performance schema not available")
if slow_queries:
self.alerts.append({
'database': db_name,
'type': 'slow_queries',
'count': len(slow_queries),
'queries': slow_queries,
'timestamp': datetime.now()
})
return slow_queries
class DatabaseScheduler:
"""
Schedule and automate database tasks.
"""
def __init__(self, db_manager: UniversalDatabaseManager):
self.db_manager = db_manager
self.optimizer = DatabaseOptimizer(db_manager)
self.monitor = DatabaseMonitor(db_manager)
self.jobs = []
self.logger = logging.getLogger(__name__)
def schedule_backup(self, db_name: str, backup_path: str,
schedule_time: str = "02:00"):
"""Schedule daily backups."""
def backup_job():
self.logger.info(f"Starting scheduled backup for {db_name}")
success = self.db_manager.backup_database(db_name, backup_path)
if success:
# Clean old backups (keep last 7 days)
self.cleanup_old_backups(backup_path, days_to_keep=7)
schedule.every().day.at(schedule_time).do(backup_job)
self.jobs.append(f"Daily backup of {db_name} at {schedule_time}")
self.logger.info(f"Scheduled daily backup for {db_name} at {schedule_time}")
def schedule_optimization(self, db_name: str, schedule_day: str = "sunday"):
"""Schedule weekly optimization."""
def optimization_job():
self.logger.info(f"Starting scheduled optimization for {db_name}")
# Run VACUUM/OPTIMIZE
self.optimizer.vacuum_database(db_name)
# Analyze and create indexes
tables = self.get_tables(db_name)
for table in tables:
self.optimizer.optimize_indexes(db_name, table)
getattr(schedule.every(), schedule_day).do(optimization_job)
self.jobs.append(f"Weekly optimization of {db_name} on {schedule_day}")
self.logger.info(f"Scheduled weekly optimization for {db_name} on {schedule_day}")
def schedule_monitoring(self, db_names: List[str], interval_minutes: int = 5):
"""Schedule regular monitoring."""
def monitoring_job():
for db_name in db_names:
# Check connection health
if not self.monitor.check_connection_health(db_name):
self.send_alert(f"Connection failed for {db_name}")
# Get performance metrics
metrics = self.monitor.monitor_performance_metrics(db_name)
# Check for anomalies
self.check_metric_thresholds(db_name, metrics)
# Check slow queries
self.monitor.check_slow_queries(db_name)
schedule.every(interval_minutes).minutes.do(monitoring_job)
self.jobs.append(f"Monitoring every {interval_minutes} minutes")
self.logger.info(f"Scheduled monitoring every {interval_minutes} minutes")
def cleanup_old_backups(self, backup_path: str, days_to_keep: int):
"""Clean up old backup files."""
backup_dir = Path(backup_path)
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
for backup_file in backup_dir.glob("*.backup"):
if backup_file.stat().st_mtime < cutoff_date.timestamp():
backup_file.unlink()
self.logger.info(f"Deleted old backup: {backup_file}")
def get_tables(self, db_name: str) -> List[str]:
"""Get list of tables in database."""
config = self.db_manager.configs[db_name]
if config.db_type == DatabaseType.POSTGRESQL:
query = """
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
"""
elif config.db_type == DatabaseType.MYSQL:
query = "SHOW TABLES"
else:
return []
results = self.db_manager.execute_query(db_name, query)
return [list(r.values())[0] for r in results]
def check_metric_thresholds(self, db_name: str, metrics: Dict):
"""Check if metrics exceed thresholds."""
thresholds = {
'active_connections': 100,
'cache_hit_ratio': 0.9, # Alert if below
'slow_queries': 10
}
for metric, threshold in thresholds.items():
if metric in metrics:
value = metrics[metric]
if metric == 'cache_hit_ratio':
if value < threshold:
self.send_alert(f"Low cache hit ratio in {db_name}: {value:.2%}")
else:
if value > threshold:
self.send_alert(f"High {metric} in {db_name}: {value}")
def send_alert(self, message: str):
"""Send alert (implement your notification method)."""
self.logger.warning(f"ALERT: {message}")
# Implement your alerting mechanism here:
# - Email
# - Slack
# - PagerDuty
# - SMS
# etc.
def run(self):
"""Run the scheduler."""
self.logger.info("Database scheduler started")
self.logger.info(f"Scheduled jobs: {self.jobs}")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Example usage
if __name__ == "__main__":
# Configuration
db_config = {
'postgres_main': {
'type': 'postgresql',
'host': 'localhost',
'port': 5432,
'database': 'maindb',
'username': 'admin',
'password': 'password'
},
'mysql_legacy': {
'type': 'mysql',
'host': 'localhost',
'port': 3306,
'database': 'legacydb',
'username': 'root',
'password': 'password'
},
'mongodb_products': {
'type': 'mongodb',
'host': 'localhost',
'port': 27017,
'database': 'products',
'username': 'mongo_user',
'password': 'password'
}
}
# Initialize manager
db_manager = UniversalDatabaseManager()
# Add configurations
for name, config in db_config.items():
db_manager.configs[name] = DatabaseConfig(
db_type=DatabaseType(config['type']),
host=config['host'],
port=config['port'],
database=config['database'],
username=config['username'],
password=config['password']
)
# Example: Execute query
results = db_manager.execute_query('postgres_main', 'SELECT * FROM users LIMIT 10')
print(f"Query results: {results}")
# Example: Bulk insert
data_to_insert = [
{'name': 'John Doe', 'email': 'john@example.com', 'age': 30},
{'name': 'Jane Smith', 'email': 'jane@example.com', 'age': 25}
]
db_manager.bulk_insert('postgres_main', 'users', data_to_insert)
# Example: Backup database
db_manager.backup_database('postgres_main', '/backups')
# Example: Optimize database
optimizer = DatabaseOptimizer(db_manager)
optimizer.vacuum_database('postgres_main')
optimizer.optimize_indexes('postgres_main', 'users')
# Example: Monitor database
monitor = DatabaseMonitor(db_manager)
health = monitor.check_connection_health('postgres_main')
metrics = monitor.monitor_performance_metrics('postgres_main')
print(f"Database health: {health}")
print(f"Performance metrics: {metrics}")
# Example: Schedule tasks
scheduler = DatabaseScheduler(db_manager)
scheduler.schedule_backup('postgres_main', '/backups', '02:00')
scheduler.schedule_optimization('postgres_main', 'sunday')
scheduler.schedule_monitoring(['postgres_main', 'mysql_legacy'], 5)
# Run scheduler (in production, this would be a daemon/service)
# scheduler.run()
print("\nā
Database automation complete!")
Key Takeaways and Best Practices šÆ
- Use Connection Pooling: Always use connection pools to avoid the overhead of creating new connections.
- Implement Retry Logic: Database connections can fail - always have retry mechanisms.
- Monitor Performance: Track query execution times, connection counts, and resource usage.
- Automate Backups: Regular, automated backups are your insurance against data loss.
- Use Transactions: Group related operations in transactions to ensure data consistency.
- Index Strategically: Good indexes can make queries 100x faster, bad ones slow everything down.
- Handle Migrations Carefully: Always test migrations on a copy of production data first.
Database Automation Best Practices š
Database automation mastery transforms you from a data janitor to a data architect. You can manage multiple databases, ensure data integrity, optimize performance, and sleep soundly knowing your backups are running automatically. Whether you're managing a startup's single database or an enterprise's database fleet, these automation skills are invaluable! š
Pro Tip: Think of database automation as building a self-healing, self-optimizing data infrastructure. Start with the basics - automated backups and monitoring. Then add optimization, migration, and scaling capabilities. Remember: the goal isn't to replace DBAs, but to free them from repetitive tasks so they can focus on architecture and optimization. Always test automation scripts thoroughly - a bug in database automation can be catastrophic!