โ๏ธ Process Management: Control System Resources with Python
Your computer is like a bustling city, with thousands of processes running like cars on highways, each consuming resources and performing tasks. Process management is your traffic control system - you can start new processes, stop runaway ones, monitor resource usage, and orchestrate complex workflows. Let's become the mayors of our digital cities! ๐๏ธ
The Process Management Ecosystem
Think of processes as workers in a factory. Some are assembly line workers (CPU-intensive), others are delivery drivers (I/O-intensive), and some are managers (orchestrating other processes). Python gives you the power to hire, fire, monitor, and coordinate all these workers efficiently!
Real-World Scenario: The Server Resource Manager ๐ฅ๏ธ
You're managing a server farm running web applications, background jobs, databases, and monitoring tools. Some processes consume too much memory, others spawn zombie children, and occasionally something goes haywire and needs immediate termination. Let's build a comprehensive process management system that keeps everything running smoothly!
import psutil
import subprocess
import os
import signal
import time
import threading
import multiprocessing
from multiprocessing import Process, Queue, Pool, Manager, Lock
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
import json
import sys
import resource
import queue
from collections import defaultdict, deque
import warnings
@dataclass
class ProcessInfo:
"""Detailed process information."""
pid: int
name: str
status: str
cpu_percent: float
memory_percent: float
memory_info: Dict
create_time: float
username: str
cmdline: List[str]
connections: List
open_files: List
threads: int
children: List[int]
class ProcessManager:
"""
Comprehensive process management system for monitoring,
controlling, and optimizing system processes.
"""
def __init__(self, config: Dict = None):
self.config = config or self.get_default_config()
self.monitored_processes = {}
self.process_history = defaultdict(deque)
self.alerts = []
self.setup_logging()
# Resource thresholds
self.thresholds = {
'cpu_percent': 80.0,
'memory_percent': 75.0,
'disk_io_rate': 100 * 1024 * 1024, # 100 MB/s
'network_io_rate': 50 * 1024 * 1024, # 50 MB/s
'process_count': 500,
'zombie_count': 10
}
# Process patterns to watch
self.watch_patterns = self.config.get('watch_patterns', [])
# Blacklist of processes that should never be killed
self.protected_processes = {
'systemd', 'init', 'kernel', 'sshd', 'systemd-resolved'
}
def get_default_config(self) -> Dict:
"""Get default configuration."""
return {
'monitoring_interval': 5, # seconds
'history_size': 100, # number of records to keep
'auto_kill_zombies': True,
'auto_restart_crashed': False,
'log_file': '/var/log/process_manager.log',
'alert_methods': ['log', 'email', 'slack']
}
def setup_logging(self):
"""Setup logging configuration."""
log_file = self.config.get('log_file', 'process_manager.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_process_info(self, pid: int) -> Optional[ProcessInfo]:
"""Get detailed information about a process."""
try:
proc = psutil.Process(pid)
# Gather process information
with proc.oneshot():
info = ProcessInfo(
pid=pid,
name=proc.name(),
status=proc.status(),
cpu_percent=proc.cpu_percent(interval=0.1),
memory_percent=proc.memory_percent(),
memory_info=proc.memory_info()._asdict(),
create_time=proc.create_time(),
username=proc.username(),
cmdline=proc.cmdline(),
connections=proc.connections(kind='all'),
open_files=proc.open_files(),
threads=proc.num_threads(),
children=[p.pid for p in proc.children()]
)
return info
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
self.logger.warning(f"Cannot access process {pid}: {e}")
return None
def list_all_processes(self, sort_by: str = 'cpu') -> List[ProcessInfo]:
"""List all running processes sorted by specified criteria."""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
info = self.get_process_info(proc.pid)
if info:
processes.append(info)
except:
continue
# Sort processes
if sort_by == 'cpu':
processes.sort(key=lambda x: x.cpu_percent, reverse=True)
elif sort_by == 'memory':
processes.sort(key=lambda x: x.memory_percent, reverse=True)
elif sort_by == 'pid':
processes.sort(key=lambda x: x.pid)
elif sort_by == 'name':
processes.sort(key=lambda x: x.name)
return processes
def find_processes(self, pattern: str) -> List[ProcessInfo]:
"""Find processes matching a pattern."""
matching_processes = []
for proc in psutil.process_iter():
try:
# Check process name
if pattern.lower() in proc.name().lower():
info = self.get_process_info(proc.pid)
if info:
matching_processes.append(info)
continue
# Check command line
cmdline = ' '.join(proc.cmdline())
if pattern.lower() in cmdline.lower():
info = self.get_process_info(proc.pid)
if info:
matching_processes.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return matching_processes
def kill_process(self, pid: int, force: bool = False) -> bool:
"""Kill a process by PID."""
try:
proc = psutil.Process(pid)
# Check if process is protected
if proc.name() in self.protected_processes and not force:
self.logger.warning(f"Process {proc.name()} ({pid}) is protected")
return False
# Try graceful termination first
proc.terminate()
# Wait for process to terminate
try:
proc.wait(timeout=5)
self.logger.info(f"Process {pid} terminated gracefully")
return True
except psutil.TimeoutExpired:
# Force kill if graceful termination failed
self.logger.warning(f"Process {pid} did not terminate, forcing kill")
proc.kill()
proc.wait(timeout=5)
self.logger.info(f"Process {pid} killed forcefully")
return True
except psutil.NoSuchProcess:
self.logger.warning(f"Process {pid} does not exist")
return False
except psutil.AccessDenied:
self.logger.error(f"Access denied to kill process {pid}")
return False
except Exception as e:
self.logger.error(f"Error killing process {pid}: {e}")
return False
def kill_process_tree(self, pid: int, include_parent: bool = True) -> int:
"""Kill a process and all its children."""
try:
parent = psutil.Process(pid)
children = parent.children(recursive=True)
# Kill children first
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass
# Wait for children to terminate
gone, alive = psutil.wait_procs(children, timeout=5)
# Force kill remaining children
for p in alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass
# Kill parent if requested
if include_parent:
parent.terminate()
try:
parent.wait(timeout=5)
except psutil.TimeoutExpired:
parent.kill()
killed_count = len(children) + (1 if include_parent else 0)
self.logger.info(f"Killed process tree for PID {pid}: {killed_count} processes")
return killed_count
except psutil.NoSuchProcess:
self.logger.warning(f"Process {pid} does not exist")
return 0
except Exception as e:
self.logger.error(f"Error killing process tree {pid}: {e}")
return 0
def start_process(self, command: List[str], **kwargs) -> subprocess.Popen:
"""Start a new process."""
try:
# Default subprocess options
default_options = {
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
'text': True
}
default_options.update(kwargs)
# Start the process
process = subprocess.Popen(command, **default_options)
self.logger.info(f"Started process: {' '.join(command)} (PID: {process.pid})")
# Add to monitored processes
self.monitored_processes[process.pid] = {
'command': command,
'start_time': datetime.now(),
'process': process
}
return process
except Exception as e:
self.logger.error(f"Failed to start process: {e}")
raise
def restart_process(self, pid: int) -> Optional[subprocess.Popen]:
"""Restart a process."""
try:
# Get process info before killing
proc = psutil.Process(pid)
cmdline = proc.cmdline()
cwd = proc.cwd()
# Kill the process
self.kill_process(pid)
# Wait a moment
time.sleep(1)
# Restart the process
new_process = self.start_process(cmdline, cwd=cwd)
self.logger.info(f"Restarted process {pid} as {new_process.pid}")
return new_process
except Exception as e:
self.logger.error(f"Failed to restart process {pid}: {e}")
return None
def monitor_process(self, pid: int, callback: Callable = None) -> Dict:
"""Monitor a specific process."""
try:
proc = psutil.Process(pid)
# Collect metrics
metrics = {
'timestamp': datetime.now(),
'pid': pid,
'name': proc.name(),
'status': proc.status(),
'cpu_percent': proc.cpu_percent(interval=1),
'memory_percent': proc.memory_percent(),
'memory_rss': proc.memory_info().rss,
'memory_vms': proc.memory_info().vms,
'num_threads': proc.num_threads(),
'num_fds': proc.num_fds() if hasattr(proc, 'num_fds') else 0,
'io_counters': proc.io_counters()._asdict() if hasattr(proc, 'io_counters') else {}
}
# Store in history
self.process_history[pid].append(metrics)
# Limit history size
max_history = self.config.get('history_size', 100)
if len(self.process_history[pid]) > max_history:
self.process_history[pid].popleft()
# Check thresholds
self.check_thresholds(metrics)
# Call callback if provided
if callback:
callback(metrics)
return metrics
except psutil.NoSuchProcess:
self.logger.warning(f"Process {pid} no longer exists")
return {}
except Exception as e:
self.logger.error(f"Error monitoring process {pid}: {e}")
return {}
def check_thresholds(self, metrics: Dict):
"""Check if metrics exceed thresholds."""
alerts = []
# Check CPU usage
if metrics.get('cpu_percent', 0) > self.thresholds['cpu_percent']:
alert = f"High CPU usage for process {metrics['pid']} ({metrics['name']}): {metrics['cpu_percent']:.1f}%"
alerts.append(alert)
# Check memory usage
if metrics.get('memory_percent', 0) > self.thresholds['memory_percent']:
alert = f"High memory usage for process {metrics['pid']} ({metrics['name']}): {metrics['memory_percent']:.1f}%"
alerts.append(alert)
# Send alerts
for alert in alerts:
self.send_alert(alert)
def send_alert(self, message: str):
"""Send alert through configured channels."""
self.logger.warning(f"ALERT: {message}")
self.alerts.append({
'timestamp': datetime.now(),
'message': message
})
# Implement additional alert methods here
# Email, Slack, SMS, etc.
def get_resource_usage(self) -> Dict:
"""Get system-wide resource usage."""
return {
'cpu': {
'percent': psutil.cpu_percent(interval=1),
'count': psutil.cpu_count(),
'freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else {},
'per_cpu': psutil.cpu_percent(interval=1, percpu=True)
},
'memory': {
'percent': psutil.virtual_memory().percent,
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'used': psutil.virtual_memory().used,
'free': psutil.virtual_memory().free,
'swap': psutil.swap_memory()._asdict()
},
'disk': {
'usage': psutil.disk_usage('/')._asdict(),
'io_counters': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else {}
},
'network': {
'io_counters': psutil.net_io_counters()._asdict() if psutil.net_io_counters() else {},
'connections': len(psutil.net_connections())
},
'processes': {
'count': len(psutil.pids()),
'running': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_RUNNING]),
'sleeping': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_SLEEPING]),
'zombies': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_ZOMBIE])
}
}
def find_resource_hogs(self, resource_type: str = 'cpu', top_n: int = 10) -> List[ProcessInfo]:
"""Find processes consuming the most resources."""
processes = self.list_all_processes(sort_by=resource_type)
return processes[:top_n]
def cleanup_zombies(self) -> int:
"""Clean up zombie processes."""
zombies_cleaned = 0
for proc in psutil.process_iter():
try:
if proc.status() == psutil.STATUS_ZOMBIE:
# Try to reap the zombie
os.waitpid(proc.pid, os.WNOHANG)
zombies_cleaned += 1
self.logger.info(f"Cleaned up zombie process: {proc.pid}")
except:
continue
if zombies_cleaned > 0:
self.logger.info(f"Cleaned up {zombies_cleaned} zombie processes")
return zombies_cleaned
def set_process_priority(self, pid: int, priority: int) -> bool:
"""Set process priority (nice value)."""
try:
proc = psutil.Process(pid)
# Set nice value (-20 to 19, lower is higher priority)
proc.nice(priority)
self.logger.info(f"Set process {pid} priority to {priority}")
return True
except Exception as e:
self.logger.error(f"Failed to set process priority: {e}")
return False
def set_process_affinity(self, pid: int, cpus: List[int]) -> bool:
"""Set CPU affinity for a process."""
try:
proc = psutil.Process(pid)
# Set CPU affinity
proc.cpu_affinity(cpus)
self.logger.info(f"Set process {pid} CPU affinity to CPUs {cpus}")
return True
except Exception as e:
self.logger.error(f"Failed to set process affinity: {e}")
return False
def limit_process_resources(self, pid: int, limits: Dict) -> bool:
"""Set resource limits for a process."""
try:
# This requires running as root on Unix systems
if sys.platform != 'win32':
proc = psutil.Process(pid)
# Set memory limit
if 'memory' in limits:
resource.prlimit(pid, resource.RLIMIT_AS,
(limits['memory'], limits['memory']))
# Set CPU time limit
if 'cpu_time' in limits:
resource.prlimit(pid, resource.RLIMIT_CPU,
(limits['cpu_time'], limits['cpu_time']))
# Set number of open files limit
if 'open_files' in limits:
resource.prlimit(pid, resource.RLIMIT_NOFILE,
(limits['open_files'], limits['open_files']))
self.logger.info(f"Set resource limits for process {pid}")
return True
else:
self.logger.warning("Resource limits not supported on Windows")
return False
except Exception as e:
self.logger.error(f"Failed to set resource limits: {e}")
return False
class ProcessPoolManager:
"""
Manage pools of worker processes for parallel execution.
"""
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or multiprocessing.cpu_count()
self.pools = {}
self.results = {}
self.logger = logging.getLogger(__name__)
def create_pool(self, name: str, workers: int = None) -> Pool:
"""Create a named process pool."""
workers = workers or self.max_workers
pool = Pool(processes=workers)
self.pools[name] = pool
self.logger.info(f"Created process pool '{name}' with {workers} workers")
return pool
def execute_parallel(self, func: Callable, tasks: List,
pool_name: str = 'default',
chunk_size: int = 1) -> List:
"""Execute tasks in parallel using process pool."""
if pool_name not in self.pools:
self.create_pool(pool_name)
pool = self.pools[pool_name]
try:
# Execute tasks in parallel
results = pool.map(func, tasks, chunksize=chunk_size)
self.logger.info(f"Executed {len(tasks)} tasks in parallel")
return results
except Exception as e:
self.logger.error(f"Parallel execution failed: {e}")
raise
def execute_async(self, func: Callable, tasks: List,
pool_name: str = 'default',
callback: Callable = None) -> List:
"""Execute tasks asynchronously."""
if pool_name not in self.pools:
self.create_pool(pool_name)
pool = self.pools[pool_name]
async_results = []
for task in tasks:
result = pool.apply_async(func, args=(task,), callback=callback)
async_results.append(result)
# Store results for later retrieval
self.results[pool_name] = async_results
self.logger.info(f"Started {len(tasks)} async tasks")
return async_results
def get_results(self, pool_name: str = 'default',
timeout: int = None) -> List:
"""Get results from async execution."""
if pool_name not in self.results:
return []
async_results = self.results[pool_name]
results = []
for async_result in async_results:
try:
result = async_result.get(timeout=timeout)
results.append(result)
except multiprocessing.TimeoutError:
self.logger.warning("Task timed out")
results.append(None)
except Exception as e:
self.logger.error(f"Task failed: {e}")
results.append(None)
return results
def cleanup(self):
"""Clean up all process pools."""
for name, pool in self.pools.items():
pool.close()
pool.join()
self.logger.info(f"Closed process pool '{name}'")
self.pools.clear()
self.results.clear()
class ProcessOrchestrator:
"""
Orchestrate complex multi-process workflows.
"""
def __init__(self):
self.workflows = {}
self.running_workflows = {}
self.manager = Manager()
self.shared_state = self.manager.dict()
self.logger = logging.getLogger(__name__)
def create_workflow(self, name: str, steps: List[Dict]) -> bool:
"""
Create a workflow definition.
steps = [
{'name': 'step1', 'function': func1, 'args': [], 'depends_on': []},
{'name': 'step2', 'function': func2, 'args': [], 'depends_on': ['step1']},
]
"""
self.workflows[name] = {
'steps': steps,
'created': datetime.now()
}
self.logger.info(f"Created workflow '{name}' with {len(steps)} steps")
return True
def execute_workflow(self, name: str, context: Dict = None) -> Dict:
"""Execute a complete workflow."""
if name not in self.workflows:
raise ValueError(f"Workflow '{name}' not found")
workflow = self.workflows[name]
workflow_id = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Initialize workflow execution
self.running_workflows[workflow_id] = {
'status': 'running',
'start_time': datetime.now(),
'completed_steps': [],
'results': {},
'context': context or {}
}
# Execute steps
for step in workflow['steps']:
self.execute_step(workflow_id, step)
# Mark workflow as complete
self.running_workflows[workflow_id]['status'] = 'completed'
self.running_workflows[workflow_id]['end_time'] = datetime.now()
self.logger.info(f"Workflow '{workflow_id}' completed")
return self.running_workflows[workflow_id]
def execute_step(self, workflow_id: str, step: Dict):
"""Execute a single workflow step."""
workflow_state = self.running_workflows[workflow_id]
# Check dependencies
for dep in step.get('depends_on', []):
if dep not in workflow_state['completed_steps']:
self.logger.warning(f"Dependency '{dep}' not met for step '{step['name']}'")
return
# Execute step
try:
func = step['function']
args = step.get('args', [])
# Add context and previous results to args
if workflow_state['context']:
args = [workflow_state['context']] + list(args)
# Execute in separate process
with ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args)
result = future.result(timeout=step.get('timeout', 300))
# Store result
workflow_state['results'][step['name']] = result
workflow_state['completed_steps'].append(step['name'])
self.logger.info(f"Step '{step['name']}' completed in workflow '{workflow_id}'")
except Exception as e:
self.logger.error(f"Step '{step['name']}' failed: {e}")
workflow_state['status'] = 'failed'
workflow_state['error'] = str(e)
raise
class SystemProcessMonitor:
"""
Real-time system process monitoring and alerting.
"""
def __init__(self, interval: int = 5):
self.interval = interval
self.monitoring = False
self.monitor_thread = None
self.process_manager = ProcessManager()
self.metrics_history = deque(maxlen=1000)
self.alerts = []
self.logger = logging.getLogger(__name__)
def start_monitoring(self):
"""Start continuous monitoring."""
if self.monitoring:
self.logger.warning("Monitoring already running")
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
self.logger.info("System monitoring started")
def stop_monitoring(self):
"""Stop monitoring."""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
self.logger.info("System monitoring stopped")
def _monitor_loop(self):
"""Main monitoring loop."""
while self.monitoring:
try:
# Collect system metrics
metrics = self.collect_metrics()
# Store metrics
self.metrics_history.append(metrics)
# Check for issues
self.check_system_health(metrics)
# Sleep until next interval
time.sleep(self.interval)
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
def collect_metrics(self) -> Dict:
"""Collect comprehensive system metrics."""
metrics = {
'timestamp': datetime.now(),
'system': self.process_manager.get_resource_usage(),
'top_cpu': [],
'top_memory': [],
'critical_processes': []
}
# Get top resource consumers
top_cpu = self.process_manager.find_resource_hogs('cpu', 5)
metrics['top_cpu'] = [
{'pid': p.pid, 'name': p.name, 'cpu': p.cpu_percent}
for p in top_cpu
]
top_memory = self.process_manager.find_resource_hogs('memory', 5)
metrics['top_memory'] = [
{'pid': p.pid, 'name': p.name, 'memory': p.memory_percent}
for p in top_memory
]
# Check critical processes
critical_processes = ['nginx', 'mysql', 'postgresql', 'redis']
for proc_name in critical_processes:
procs = self.process_manager.find_processes(proc_name)
if procs:
metrics['critical_processes'].append({
'name': proc_name,
'count': len(procs),
'pids': [p.pid for p in procs]
})
return metrics
def check_system_health(self, metrics: Dict):
"""Check system health and raise alerts."""
alerts = []
# Check CPU usage
cpu_percent = metrics['system']['cpu']['percent']
if cpu_percent > 90:
alerts.append(f"Critical CPU usage: {cpu_percent:.1f}%")
elif cpu_percent > 75:
alerts.append(f"High CPU usage: {cpu_percent:.1f}%")
# Check memory usage
mem_percent = metrics['system']['memory']['percent']
if mem_percent > 90:
alerts.append(f"Critical memory usage: {mem_percent:.1f}%")
elif mem_percent > 75:
alerts.append(f"High memory usage: {mem_percent:.1f}%")
# Check zombie processes
zombie_count = metrics['system']['processes']['zombies']
if zombie_count > 10:
alerts.append(f"High number of zombie processes: {zombie_count}")
# Auto-clean if configured
if self.process_manager.config.get('auto_kill_zombies'):
self.process_manager.cleanup_zombies()
# Check critical processes
for crit_proc in metrics['critical_processes']:
if crit_proc['count'] == 0:
alerts.append(f"Critical process '{crit_proc['name']}' is not running")
# Send alerts
for alert in alerts:
self.send_alert(alert)
def send_alert(self, message: str):
"""Send system alert."""
self.logger.warning(f"SYSTEM ALERT: {message}")
self.alerts.append({
'timestamp': datetime.now(),
'message': message
})
# Implement additional alerting mechanisms here
def get_metrics_summary(self) -> Dict:
"""Get summary of collected metrics."""
if not self.metrics_history:
return {}
# Calculate averages
cpu_values = [m['system']['cpu']['percent'] for m in self.metrics_history]
mem_values = [m['system']['memory']['percent'] for m in self.metrics_history]
return {
'period': {
'start': self.metrics_history[0]['timestamp'],
'end': self.metrics_history[-1]['timestamp'],
'samples': len(self.metrics_history)
},
'cpu': {
'avg': sum(cpu_values) / len(cpu_values),
'max': max(cpu_values),
'min': min(cpu_values)
},
'memory': {
'avg': sum(mem_values) / len(mem_values),
'max': max(mem_values),
'min': min(mem_values)
},
'alerts_count': len(self.alerts)
}
# Example usage
if __name__ == "__main__":
# Initialize process manager
process_manager = ProcessManager()
# List top CPU consuming processes
print("\n๐ฅ Top CPU Consumers:")
top_cpu = process_manager.find_resource_hogs('cpu', 5)
for proc in top_cpu:
print(f" {proc.name} (PID: {proc.pid}): {proc.cpu_percent:.1f}%")
# List top memory consuming processes
print("\n๐พ Top Memory Consumers:")
top_memory = process_manager.find_resource_hogs('memory', 5)
for proc in top_memory:
print(f" {proc.name} (PID: {proc.pid}): {proc.memory_percent:.1f}%")
# Get system resource usage
print("\n๐ System Resources:")
resources = process_manager.get_resource_usage()
print(f" CPU: {resources['cpu']['percent']:.1f}%")
print(f" Memory: {resources['memory']['percent']:.1f}%")
print(f" Processes: {resources['processes']['count']} total, "
f"{resources['processes']['zombies']} zombies")
# Find specific processes
print("\n๐ Finding Python processes:")
python_procs = process_manager.find_processes('python')
for proc in python_procs[:5]:
print(f" {proc.name} (PID: {proc.pid})")
# Start a new process
print("\n๐ Starting new process:")
new_proc = process_manager.start_process(['echo', 'Hello from subprocess'])
print(f" Started with PID: {new_proc.pid}")
# Clean up zombies
print("\n๐งน Cleaning up zombie processes:")
zombies_cleaned = process_manager.cleanup_zombies()
print(f" Cleaned {zombies_cleaned} zombie processes")
# Create process pool for parallel execution
print("\nโก Parallel Processing Example:")
pool_manager = ProcessPoolManager(max_workers=4)
def cpu_intensive_task(n):
"""Example CPU-intensive task."""
return sum(i * i for i in range(n))
tasks = [1000000, 2000000, 3000000, 4000000]
results = pool_manager.execute_parallel(cpu_intensive_task, tasks)
print(f" Parallel results: {results}")
# Clean up
pool_manager.cleanup()
# Start system monitoring
print("\n๐๏ธ Starting System Monitor:")
monitor = SystemProcessMonitor(interval=5)
monitor.start_monitoring()
# Let it run for a bit
time.sleep(10)
# Get monitoring summary
summary = monitor.get_metrics_summary()
if summary:
print(f" Average CPU: {summary['cpu']['avg']:.1f}%")
print(f" Average Memory: {summary['memory']['avg']:.1f}%")
print(f" Alerts: {summary['alerts_count']}")
# Stop monitoring
monitor.stop_monitoring()
print("\nโ
Process management demonstration complete!")
Advanced Process Control Techniques ๐ฎ
Let's explore more sophisticated process control patterns for complex scenarios!
class AdvancedProcessControl:
"""
Advanced process control techniques.
"""
@staticmethod
def create_daemon_process(target_func: Callable, args: tuple = ()):
"""Create a true daemon process."""
# First fork
try:
pid = os.fork()
if pid > 0:
# Parent process returns
return pid
except OSError as e:
sys.stderr.write(f"First fork failed: {e}\n")
sys.exit(1)
# Decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# Second fork
try:
pid = os.fork()
if pid > 0:
# First child exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Second fork failed: {e}\n")
sys.exit(1)
# Redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open('/dev/null', 'r')
so = open('/dev/null', 'a+')
se = open('/dev/null', 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# Run the daemon process
target_func(*args)
@staticmethod
def create_supervised_process(command: List[str],
restart_on_failure: bool = True,
max_restarts: int = 5):
"""Create a supervised process that auto-restarts on failure."""
restart_count = 0
while restart_count < max_restarts:
try:
process = subprocess.Popen(command)
# Monitor the process
while True:
ret = process.poll()
if ret is None:
# Process still running
time.sleep(1)
else:
# Process terminated
if ret == 0:
# Clean exit
break
elif restart_on_failure:
# Restart on failure
restart_count += 1
logging.warning(f"Process failed with code {ret}, restarting... ({restart_count}/{max_restarts})")
time.sleep(2) # Wait before restart
break
else:
# Don't restart
break
if ret == 0 or not restart_on_failure:
break
except Exception as e:
logging.error(f"Failed to start process: {e}")
if restart_on_failure:
restart_count += 1
time.sleep(2)
else:
break
if restart_count >= max_restarts:
logging.error(f"Process failed {max_restarts} times, giving up")
@staticmethod
def implement_circuit_breaker(func: Callable,
failure_threshold: int = 5,
timeout: int = 60):
"""Implement circuit breaker pattern for process execution."""
failure_count = 0
last_failure_time = None
circuit_open = False
def wrapper(*args, **kwargs):
nonlocal failure_count, last_failure_time, circuit_open
# Check if circuit is open
if circuit_open:
if time.time() - last_failure_time > timeout:
# Try to close circuit
circuit_open = False
failure_count = 0
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
failure_count = 0 # Reset on success
return result
except Exception as e:
failure_count += 1
last_failure_time = time.time()
if failure_count >= failure_threshold:
circuit_open = True
logging.error(f"Circuit breaker opened after {failure_count} failures")
raise e
return wrapper
Key Takeaways and Best Practices ๐ฏ
- Always Use Context Managers: Ensure proper cleanup of processes and resources.
- Monitor Resource Usage: Track CPU, memory, and I/O to prevent resource exhaustion.
- Handle Zombie Processes: Regularly clean up zombie processes to prevent resource leaks.
- Implement Graceful Shutdown: Always try SIGTERM before SIGKILL.
- Use Process Pools: For parallel execution, use process pools instead of creating processes manually.
- Set Resource Limits: Prevent runaway processes from consuming all system resources.
- Log Everything: Comprehensive logging helps debug process-related issues.
Process Management Best Practices ๐
Process management mastery gives you complete control over your system's computational resources. You can orchestrate complex workflows, optimize resource usage, and ensure system stability. Whether you're managing a single server or a distributed system, these skills are essential for system automation! ๐
Pro Tip: Process management is like conducting an orchestra - you need to know when to start instruments (processes), when to stop them, and how to keep them in harmony. Always implement proper monitoring and alerting. Remember that killing processes is easy, but understanding why they misbehave is valuable. Use tools like strace and ltrace to debug process issues, and always test process management scripts thoroughly before deploying to production!