Skip to main content

โš™๏ธ Process Management: Control System Resources with Python

Your computer is like a bustling city, with thousands of processes running like cars on highways, each consuming resources and performing tasks. Process management is your traffic control system - you can start new processes, stop runaway ones, monitor resource usage, and orchestrate complex workflows. Let's become the mayors of our digital cities! ๐Ÿ™๏ธ

The Process Management Ecosystem

Think of processes as workers in a factory. Some are assembly line workers (CPU-intensive), others are delivery drivers (I/O-intensive), and some are managers (orchestrating other processes). Python gives you the power to hire, fire, monitor, and coordinate all these workers efficiently!

graph TB A[Process Management] --> B[Process Control] A --> C[Resource Monitoring] A --> D[Inter-Process Communication] A --> E[Process Scheduling] A --> F[System Optimization] B --> G[Start/Stop Processes] B --> H[Signal Handling] B --> I[Process Trees] C --> J[CPU Usage] C --> K[Memory Usage] C --> L[Disk I/O] C --> M[Network I/O] D --> N[Pipes] D --> O[Queues] D --> P[Shared Memory] E --> Q[Priority Management] E --> R[Resource Limits] E --> S[Affinity Control] F --> T[Load Balancing] F --> U[Resource Allocation] F --> V[Performance Tuning] style A fill:#ff6b6b style G fill:#51cf66 style J fill:#51cf66 style N fill:#51cf66 style Q fill:#51cf66 style T fill:#51cf66

Real-World Scenario: The Server Resource Manager ๐Ÿ–ฅ๏ธ

You're managing a server farm running web applications, background jobs, databases, and monitoring tools. Some processes consume too much memory, others spawn zombie children, and occasionally something goes haywire and needs immediate termination. Let's build a comprehensive process management system that keeps everything running smoothly!

import psutil
import subprocess
import os
import signal
import time
import threading
import multiprocessing
from multiprocessing import Process, Queue, Pool, Manager, Lock
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
import json
import sys
import resource
import queue
from collections import defaultdict, deque
import warnings

@dataclass
class ProcessInfo:
    """Detailed process information."""
    pid: int
    name: str
    status: str
    cpu_percent: float
    memory_percent: float
    memory_info: Dict
    create_time: float
    username: str
    cmdline: List[str]
    connections: List
    open_files: List
    threads: int
    children: List[int]

class ProcessManager:
    """
    Comprehensive process management system for monitoring,
    controlling, and optimizing system processes.
    """
    
    def __init__(self, config: Dict = None):
        self.config = config or self.get_default_config()
        self.monitored_processes = {}
        self.process_history = defaultdict(deque)
        self.alerts = []
        self.setup_logging()
        
        # Resource thresholds
        self.thresholds = {
            'cpu_percent': 80.0,
            'memory_percent': 75.0,
            'disk_io_rate': 100 * 1024 * 1024,  # 100 MB/s
            'network_io_rate': 50 * 1024 * 1024,  # 50 MB/s
            'process_count': 500,
            'zombie_count': 10
        }
        
        # Process patterns to watch
        self.watch_patterns = self.config.get('watch_patterns', [])
        
        # Blacklist of processes that should never be killed
        self.protected_processes = {
            'systemd', 'init', 'kernel', 'sshd', 'systemd-resolved'
        }
    
    def get_default_config(self) -> Dict:
        """Get default configuration."""
        return {
            'monitoring_interval': 5,  # seconds
            'history_size': 100,  # number of records to keep
            'auto_kill_zombies': True,
            'auto_restart_crashed': False,
            'log_file': '/var/log/process_manager.log',
            'alert_methods': ['log', 'email', 'slack']
        }
    
    def setup_logging(self):
        """Setup logging configuration."""
        log_file = self.config.get('log_file', 'process_manager.log')
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_process_info(self, pid: int) -> Optional[ProcessInfo]:
        """Get detailed information about a process."""
        try:
            proc = psutil.Process(pid)
            
            # Gather process information
            with proc.oneshot():
                info = ProcessInfo(
                    pid=pid,
                    name=proc.name(),
                    status=proc.status(),
                    cpu_percent=proc.cpu_percent(interval=0.1),
                    memory_percent=proc.memory_percent(),
                    memory_info=proc.memory_info()._asdict(),
                    create_time=proc.create_time(),
                    username=proc.username(),
                    cmdline=proc.cmdline(),
                    connections=proc.connections(kind='all'),
                    open_files=proc.open_files(),
                    threads=proc.num_threads(),
                    children=[p.pid for p in proc.children()]
                )
            
            return info
            
        except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
            self.logger.warning(f"Cannot access process {pid}: {e}")
            return None
    
    def list_all_processes(self, sort_by: str = 'cpu') -> List[ProcessInfo]:
        """List all running processes sorted by specified criteria."""
        processes = []
        
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                info = self.get_process_info(proc.pid)
                if info:
                    processes.append(info)
            except:
                continue
        
        # Sort processes
        if sort_by == 'cpu':
            processes.sort(key=lambda x: x.cpu_percent, reverse=True)
        elif sort_by == 'memory':
            processes.sort(key=lambda x: x.memory_percent, reverse=True)
        elif sort_by == 'pid':
            processes.sort(key=lambda x: x.pid)
        elif sort_by == 'name':
            processes.sort(key=lambda x: x.name)
        
        return processes
    
    def find_processes(self, pattern: str) -> List[ProcessInfo]:
        """Find processes matching a pattern."""
        matching_processes = []
        
        for proc in psutil.process_iter():
            try:
                # Check process name
                if pattern.lower() in proc.name().lower():
                    info = self.get_process_info(proc.pid)
                    if info:
                        matching_processes.append(info)
                    continue
                
                # Check command line
                cmdline = ' '.join(proc.cmdline())
                if pattern.lower() in cmdline.lower():
                    info = self.get_process_info(proc.pid)
                    if info:
                        matching_processes.append(info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        return matching_processes
    
    def kill_process(self, pid: int, force: bool = False) -> bool:
        """Kill a process by PID."""
        try:
            proc = psutil.Process(pid)
            
            # Check if process is protected
            if proc.name() in self.protected_processes and not force:
                self.logger.warning(f"Process {proc.name()} ({pid}) is protected")
                return False
            
            # Try graceful termination first
            proc.terminate()
            
            # Wait for process to terminate
            try:
                proc.wait(timeout=5)
                self.logger.info(f"Process {pid} terminated gracefully")
                return True
            except psutil.TimeoutExpired:
                # Force kill if graceful termination failed
                self.logger.warning(f"Process {pid} did not terminate, forcing kill")
                proc.kill()
                proc.wait(timeout=5)
                self.logger.info(f"Process {pid} killed forcefully")
                return True
                
        except psutil.NoSuchProcess:
            self.logger.warning(f"Process {pid} does not exist")
            return False
        except psutil.AccessDenied:
            self.logger.error(f"Access denied to kill process {pid}")
            return False
        except Exception as e:
            self.logger.error(f"Error killing process {pid}: {e}")
            return False
    
    def kill_process_tree(self, pid: int, include_parent: bool = True) -> int:
        """Kill a process and all its children."""
        try:
            parent = psutil.Process(pid)
            children = parent.children(recursive=True)
            
            # Kill children first
            for child in children:
                try:
                    child.terminate()
                except psutil.NoSuchProcess:
                    pass
            
            # Wait for children to terminate
            gone, alive = psutil.wait_procs(children, timeout=5)
            
            # Force kill remaining children
            for p in alive:
                try:
                    p.kill()
                except psutil.NoSuchProcess:
                    pass
            
            # Kill parent if requested
            if include_parent:
                parent.terminate()
                try:
                    parent.wait(timeout=5)
                except psutil.TimeoutExpired:
                    parent.kill()
            
            killed_count = len(children) + (1 if include_parent else 0)
            self.logger.info(f"Killed process tree for PID {pid}: {killed_count} processes")
            return killed_count
            
        except psutil.NoSuchProcess:
            self.logger.warning(f"Process {pid} does not exist")
            return 0
        except Exception as e:
            self.logger.error(f"Error killing process tree {pid}: {e}")
            return 0
    
    def start_process(self, command: List[str], **kwargs) -> subprocess.Popen:
        """Start a new process."""
        try:
            # Default subprocess options
            default_options = {
                'stdout': subprocess.PIPE,
                'stderr': subprocess.PIPE,
                'text': True
            }
            default_options.update(kwargs)
            
            # Start the process
            process = subprocess.Popen(command, **default_options)
            
            self.logger.info(f"Started process: {' '.join(command)} (PID: {process.pid})")
            
            # Add to monitored processes
            self.monitored_processes[process.pid] = {
                'command': command,
                'start_time': datetime.now(),
                'process': process
            }
            
            return process
            
        except Exception as e:
            self.logger.error(f"Failed to start process: {e}")
            raise
    
    def restart_process(self, pid: int) -> Optional[subprocess.Popen]:
        """Restart a process."""
        try:
            # Get process info before killing
            proc = psutil.Process(pid)
            cmdline = proc.cmdline()
            cwd = proc.cwd()
            
            # Kill the process
            self.kill_process(pid)
            
            # Wait a moment
            time.sleep(1)
            
            # Restart the process
            new_process = self.start_process(cmdline, cwd=cwd)
            
            self.logger.info(f"Restarted process {pid} as {new_process.pid}")
            return new_process
            
        except Exception as e:
            self.logger.error(f"Failed to restart process {pid}: {e}")
            return None
    
    def monitor_process(self, pid: int, callback: Callable = None) -> Dict:
        """Monitor a specific process."""
        try:
            proc = psutil.Process(pid)
            
            # Collect metrics
            metrics = {
                'timestamp': datetime.now(),
                'pid': pid,
                'name': proc.name(),
                'status': proc.status(),
                'cpu_percent': proc.cpu_percent(interval=1),
                'memory_percent': proc.memory_percent(),
                'memory_rss': proc.memory_info().rss,
                'memory_vms': proc.memory_info().vms,
                'num_threads': proc.num_threads(),
                'num_fds': proc.num_fds() if hasattr(proc, 'num_fds') else 0,
                'io_counters': proc.io_counters()._asdict() if hasattr(proc, 'io_counters') else {}
            }
            
            # Store in history
            self.process_history[pid].append(metrics)
            
            # Limit history size
            max_history = self.config.get('history_size', 100)
            if len(self.process_history[pid]) > max_history:
                self.process_history[pid].popleft()
            
            # Check thresholds
            self.check_thresholds(metrics)
            
            # Call callback if provided
            if callback:
                callback(metrics)
            
            return metrics
            
        except psutil.NoSuchProcess:
            self.logger.warning(f"Process {pid} no longer exists")
            return {}
        except Exception as e:
            self.logger.error(f"Error monitoring process {pid}: {e}")
            return {}
    
    def check_thresholds(self, metrics: Dict):
        """Check if metrics exceed thresholds."""
        alerts = []
        
        # Check CPU usage
        if metrics.get('cpu_percent', 0) > self.thresholds['cpu_percent']:
            alert = f"High CPU usage for process {metrics['pid']} ({metrics['name']}): {metrics['cpu_percent']:.1f}%"
            alerts.append(alert)
        
        # Check memory usage
        if metrics.get('memory_percent', 0) > self.thresholds['memory_percent']:
            alert = f"High memory usage for process {metrics['pid']} ({metrics['name']}): {metrics['memory_percent']:.1f}%"
            alerts.append(alert)
        
        # Send alerts
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, message: str):
        """Send alert through configured channels."""
        self.logger.warning(f"ALERT: {message}")
        self.alerts.append({
            'timestamp': datetime.now(),
            'message': message
        })
        
        # Implement additional alert methods here
        # Email, Slack, SMS, etc.
    
    def get_resource_usage(self) -> Dict:
        """Get system-wide resource usage."""
        return {
            'cpu': {
                'percent': psutil.cpu_percent(interval=1),
                'count': psutil.cpu_count(),
                'freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else {},
                'per_cpu': psutil.cpu_percent(interval=1, percpu=True)
            },
            'memory': {
                'percent': psutil.virtual_memory().percent,
                'total': psutil.virtual_memory().total,
                'available': psutil.virtual_memory().available,
                'used': psutil.virtual_memory().used,
                'free': psutil.virtual_memory().free,
                'swap': psutil.swap_memory()._asdict()
            },
            'disk': {
                'usage': psutil.disk_usage('/')._asdict(),
                'io_counters': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else {}
            },
            'network': {
                'io_counters': psutil.net_io_counters()._asdict() if psutil.net_io_counters() else {},
                'connections': len(psutil.net_connections())
            },
            'processes': {
                'count': len(psutil.pids()),
                'running': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_RUNNING]),
                'sleeping': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_SLEEPING]),
                'zombies': len([p for p in psutil.process_iter() if p.status() == psutil.STATUS_ZOMBIE])
            }
        }
    
    def find_resource_hogs(self, resource_type: str = 'cpu', top_n: int = 10) -> List[ProcessInfo]:
        """Find processes consuming the most resources."""
        processes = self.list_all_processes(sort_by=resource_type)
        return processes[:top_n]
    
    def cleanup_zombies(self) -> int:
        """Clean up zombie processes."""
        zombies_cleaned = 0
        
        for proc in psutil.process_iter():
            try:
                if proc.status() == psutil.STATUS_ZOMBIE:
                    # Try to reap the zombie
                    os.waitpid(proc.pid, os.WNOHANG)
                    zombies_cleaned += 1
                    self.logger.info(f"Cleaned up zombie process: {proc.pid}")
            except:
                continue
        
        if zombies_cleaned > 0:
            self.logger.info(f"Cleaned up {zombies_cleaned} zombie processes")
        
        return zombies_cleaned
    
    def set_process_priority(self, pid: int, priority: int) -> bool:
        """Set process priority (nice value)."""
        try:
            proc = psutil.Process(pid)
            
            # Set nice value (-20 to 19, lower is higher priority)
            proc.nice(priority)
            
            self.logger.info(f"Set process {pid} priority to {priority}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to set process priority: {e}")
            return False
    
    def set_process_affinity(self, pid: int, cpus: List[int]) -> bool:
        """Set CPU affinity for a process."""
        try:
            proc = psutil.Process(pid)
            
            # Set CPU affinity
            proc.cpu_affinity(cpus)
            
            self.logger.info(f"Set process {pid} CPU affinity to CPUs {cpus}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to set process affinity: {e}")
            return False
    
    def limit_process_resources(self, pid: int, limits: Dict) -> bool:
        """Set resource limits for a process."""
        try:
            # This requires running as root on Unix systems
            if sys.platform != 'win32':
                proc = psutil.Process(pid)
                
                # Set memory limit
                if 'memory' in limits:
                    resource.prlimit(pid, resource.RLIMIT_AS, 
                                   (limits['memory'], limits['memory']))
                
                # Set CPU time limit
                if 'cpu_time' in limits:
                    resource.prlimit(pid, resource.RLIMIT_CPU, 
                                   (limits['cpu_time'], limits['cpu_time']))
                
                # Set number of open files limit
                if 'open_files' in limits:
                    resource.prlimit(pid, resource.RLIMIT_NOFILE, 
                                   (limits['open_files'], limits['open_files']))
                
                self.logger.info(f"Set resource limits for process {pid}")
                return True
            else:
                self.logger.warning("Resource limits not supported on Windows")
                return False
                
        except Exception as e:
            self.logger.error(f"Failed to set resource limits: {e}")
            return False

class ProcessPoolManager:
    """
    Manage pools of worker processes for parallel execution.
    """
    
    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or multiprocessing.cpu_count()
        self.pools = {}
        self.results = {}
        self.logger = logging.getLogger(__name__)
    
    def create_pool(self, name: str, workers: int = None) -> Pool:
        """Create a named process pool."""
        workers = workers or self.max_workers
        
        pool = Pool(processes=workers)
        self.pools[name] = pool
        
        self.logger.info(f"Created process pool '{name}' with {workers} workers")
        return pool
    
    def execute_parallel(self, func: Callable, tasks: List, 
                        pool_name: str = 'default',
                        chunk_size: int = 1) -> List:
        """Execute tasks in parallel using process pool."""
        if pool_name not in self.pools:
            self.create_pool(pool_name)
        
        pool = self.pools[pool_name]
        
        try:
            # Execute tasks in parallel
            results = pool.map(func, tasks, chunksize=chunk_size)
            
            self.logger.info(f"Executed {len(tasks)} tasks in parallel")
            return results
            
        except Exception as e:
            self.logger.error(f"Parallel execution failed: {e}")
            raise
    
    def execute_async(self, func: Callable, tasks: List, 
                     pool_name: str = 'default',
                     callback: Callable = None) -> List:
        """Execute tasks asynchronously."""
        if pool_name not in self.pools:
            self.create_pool(pool_name)
        
        pool = self.pools[pool_name]
        async_results = []
        
        for task in tasks:
            result = pool.apply_async(func, args=(task,), callback=callback)
            async_results.append(result)
        
        # Store results for later retrieval
        self.results[pool_name] = async_results
        
        self.logger.info(f"Started {len(tasks)} async tasks")
        return async_results
    
    def get_results(self, pool_name: str = 'default', 
                   timeout: int = None) -> List:
        """Get results from async execution."""
        if pool_name not in self.results:
            return []
        
        async_results = self.results[pool_name]
        results = []
        
        for async_result in async_results:
            try:
                result = async_result.get(timeout=timeout)
                results.append(result)
            except multiprocessing.TimeoutError:
                self.logger.warning("Task timed out")
                results.append(None)
            except Exception as e:
                self.logger.error(f"Task failed: {e}")
                results.append(None)
        
        return results
    
    def cleanup(self):
        """Clean up all process pools."""
        for name, pool in self.pools.items():
            pool.close()
            pool.join()
            self.logger.info(f"Closed process pool '{name}'")
        
        self.pools.clear()
        self.results.clear()

class ProcessOrchestrator:
    """
    Orchestrate complex multi-process workflows.
    """
    
    def __init__(self):
        self.workflows = {}
        self.running_workflows = {}
        self.manager = Manager()
        self.shared_state = self.manager.dict()
        self.logger = logging.getLogger(__name__)
    
    def create_workflow(self, name: str, steps: List[Dict]) -> bool:
        """
        Create a workflow definition.
        
        steps = [
            {'name': 'step1', 'function': func1, 'args': [], 'depends_on': []},
            {'name': 'step2', 'function': func2, 'args': [], 'depends_on': ['step1']},
        ]
        """
        self.workflows[name] = {
            'steps': steps,
            'created': datetime.now()
        }
        
        self.logger.info(f"Created workflow '{name}' with {len(steps)} steps")
        return True
    
    def execute_workflow(self, name: str, context: Dict = None) -> Dict:
        """Execute a complete workflow."""
        if name not in self.workflows:
            raise ValueError(f"Workflow '{name}' not found")
        
        workflow = self.workflows[name]
        workflow_id = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        # Initialize workflow execution
        self.running_workflows[workflow_id] = {
            'status': 'running',
            'start_time': datetime.now(),
            'completed_steps': [],
            'results': {},
            'context': context or {}
        }
        
        # Execute steps
        for step in workflow['steps']:
            self.execute_step(workflow_id, step)
        
        # Mark workflow as complete
        self.running_workflows[workflow_id]['status'] = 'completed'
        self.running_workflows[workflow_id]['end_time'] = datetime.now()
        
        self.logger.info(f"Workflow '{workflow_id}' completed")
        return self.running_workflows[workflow_id]
    
    def execute_step(self, workflow_id: str, step: Dict):
        """Execute a single workflow step."""
        workflow_state = self.running_workflows[workflow_id]
        
        # Check dependencies
        for dep in step.get('depends_on', []):
            if dep not in workflow_state['completed_steps']:
                self.logger.warning(f"Dependency '{dep}' not met for step '{step['name']}'")
                return
        
        # Execute step
        try:
            func = step['function']
            args = step.get('args', [])
            
            # Add context and previous results to args
            if workflow_state['context']:
                args = [workflow_state['context']] + list(args)
            
            # Execute in separate process
            with ProcessPoolExecutor(max_workers=1) as executor:
                future = executor.submit(func, *args)
                result = future.result(timeout=step.get('timeout', 300))
            
            # Store result
            workflow_state['results'][step['name']] = result
            workflow_state['completed_steps'].append(step['name'])
            
            self.logger.info(f"Step '{step['name']}' completed in workflow '{workflow_id}'")
            
        except Exception as e:
            self.logger.error(f"Step '{step['name']}' failed: {e}")
            workflow_state['status'] = 'failed'
            workflow_state['error'] = str(e)
            raise

class SystemProcessMonitor:
    """
    Real-time system process monitoring and alerting.
    """
    
    def __init__(self, interval: int = 5):
        self.interval = interval
        self.monitoring = False
        self.monitor_thread = None
        self.process_manager = ProcessManager()
        self.metrics_history = deque(maxlen=1000)
        self.alerts = []
        self.logger = logging.getLogger(__name__)
    
    def start_monitoring(self):
        """Start continuous monitoring."""
        if self.monitoring:
            self.logger.warning("Monitoring already running")
            return
        
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        self.logger.info("System monitoring started")
    
    def stop_monitoring(self):
        """Stop monitoring."""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        self.logger.info("System monitoring stopped")
    
    def _monitor_loop(self):
        """Main monitoring loop."""
        while self.monitoring:
            try:
                # Collect system metrics
                metrics = self.collect_metrics()
                
                # Store metrics
                self.metrics_history.append(metrics)
                
                # Check for issues
                self.check_system_health(metrics)
                
                # Sleep until next interval
                time.sleep(self.interval)
                
            except Exception as e:
                self.logger.error(f"Monitoring error: {e}")
    
    def collect_metrics(self) -> Dict:
        """Collect comprehensive system metrics."""
        metrics = {
            'timestamp': datetime.now(),
            'system': self.process_manager.get_resource_usage(),
            'top_cpu': [],
            'top_memory': [],
            'critical_processes': []
        }
        
        # Get top resource consumers
        top_cpu = self.process_manager.find_resource_hogs('cpu', 5)
        metrics['top_cpu'] = [
            {'pid': p.pid, 'name': p.name, 'cpu': p.cpu_percent}
            for p in top_cpu
        ]
        
        top_memory = self.process_manager.find_resource_hogs('memory', 5)
        metrics['top_memory'] = [
            {'pid': p.pid, 'name': p.name, 'memory': p.memory_percent}
            for p in top_memory
        ]
        
        # Check critical processes
        critical_processes = ['nginx', 'mysql', 'postgresql', 'redis']
        for proc_name in critical_processes:
            procs = self.process_manager.find_processes(proc_name)
            if procs:
                metrics['critical_processes'].append({
                    'name': proc_name,
                    'count': len(procs),
                    'pids': [p.pid for p in procs]
                })
        
        return metrics
    
    def check_system_health(self, metrics: Dict):
        """Check system health and raise alerts."""
        alerts = []
        
        # Check CPU usage
        cpu_percent = metrics['system']['cpu']['percent']
        if cpu_percent > 90:
            alerts.append(f"Critical CPU usage: {cpu_percent:.1f}%")
        elif cpu_percent > 75:
            alerts.append(f"High CPU usage: {cpu_percent:.1f}%")
        
        # Check memory usage
        mem_percent = metrics['system']['memory']['percent']
        if mem_percent > 90:
            alerts.append(f"Critical memory usage: {mem_percent:.1f}%")
        elif mem_percent > 75:
            alerts.append(f"High memory usage: {mem_percent:.1f}%")
        
        # Check zombie processes
        zombie_count = metrics['system']['processes']['zombies']
        if zombie_count > 10:
            alerts.append(f"High number of zombie processes: {zombie_count}")
            # Auto-clean if configured
            if self.process_manager.config.get('auto_kill_zombies'):
                self.process_manager.cleanup_zombies()
        
        # Check critical processes
        for crit_proc in metrics['critical_processes']:
            if crit_proc['count'] == 0:
                alerts.append(f"Critical process '{crit_proc['name']}' is not running")
        
        # Send alerts
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, message: str):
        """Send system alert."""
        self.logger.warning(f"SYSTEM ALERT: {message}")
        self.alerts.append({
            'timestamp': datetime.now(),
            'message': message
        })
        
        # Implement additional alerting mechanisms here
    
    def get_metrics_summary(self) -> Dict:
        """Get summary of collected metrics."""
        if not self.metrics_history:
            return {}
        
        # Calculate averages
        cpu_values = [m['system']['cpu']['percent'] for m in self.metrics_history]
        mem_values = [m['system']['memory']['percent'] for m in self.metrics_history]
        
        return {
            'period': {
                'start': self.metrics_history[0]['timestamp'],
                'end': self.metrics_history[-1]['timestamp'],
                'samples': len(self.metrics_history)
            },
            'cpu': {
                'avg': sum(cpu_values) / len(cpu_values),
                'max': max(cpu_values),
                'min': min(cpu_values)
            },
            'memory': {
                'avg': sum(mem_values) / len(mem_values),
                'max': max(mem_values),
                'min': min(mem_values)
            },
            'alerts_count': len(self.alerts)
        }

# Example usage
if __name__ == "__main__":
    # Initialize process manager
    process_manager = ProcessManager()
    
    # List top CPU consuming processes
    print("\n๐Ÿ”ฅ Top CPU Consumers:")
    top_cpu = process_manager.find_resource_hogs('cpu', 5)
    for proc in top_cpu:
        print(f"  {proc.name} (PID: {proc.pid}): {proc.cpu_percent:.1f}%")
    
    # List top memory consuming processes
    print("\n๐Ÿ’พ Top Memory Consumers:")
    top_memory = process_manager.find_resource_hogs('memory', 5)
    for proc in top_memory:
        print(f"  {proc.name} (PID: {proc.pid}): {proc.memory_percent:.1f}%")
    
    # Get system resource usage
    print("\n๐Ÿ“Š System Resources:")
    resources = process_manager.get_resource_usage()
    print(f"  CPU: {resources['cpu']['percent']:.1f}%")
    print(f"  Memory: {resources['memory']['percent']:.1f}%")
    print(f"  Processes: {resources['processes']['count']} total, "
          f"{resources['processes']['zombies']} zombies")
    
    # Find specific processes
    print("\n๐Ÿ” Finding Python processes:")
    python_procs = process_manager.find_processes('python')
    for proc in python_procs[:5]:
        print(f"  {proc.name} (PID: {proc.pid})")
    
    # Start a new process
    print("\n๐Ÿš€ Starting new process:")
    new_proc = process_manager.start_process(['echo', 'Hello from subprocess'])
    print(f"  Started with PID: {new_proc.pid}")
    
    # Clean up zombies
    print("\n๐Ÿงน Cleaning up zombie processes:")
    zombies_cleaned = process_manager.cleanup_zombies()
    print(f"  Cleaned {zombies_cleaned} zombie processes")
    
    # Create process pool for parallel execution
    print("\nโšก Parallel Processing Example:")
    pool_manager = ProcessPoolManager(max_workers=4)
    
    def cpu_intensive_task(n):
        """Example CPU-intensive task."""
        return sum(i * i for i in range(n))
    
    tasks = [1000000, 2000000, 3000000, 4000000]
    results = pool_manager.execute_parallel(cpu_intensive_task, tasks)
    print(f"  Parallel results: {results}")
    
    # Clean up
    pool_manager.cleanup()
    
    # Start system monitoring
    print("\n๐Ÿ‘๏ธ Starting System Monitor:")
    monitor = SystemProcessMonitor(interval=5)
    monitor.start_monitoring()
    
    # Let it run for a bit
    time.sleep(10)
    
    # Get monitoring summary
    summary = monitor.get_metrics_summary()
    if summary:
        print(f"  Average CPU: {summary['cpu']['avg']:.1f}%")
        print(f"  Average Memory: {summary['memory']['avg']:.1f}%")
        print(f"  Alerts: {summary['alerts_count']}")
    
    # Stop monitoring
    monitor.stop_monitoring()
    
    print("\nโœ… Process management demonstration complete!")

Advanced Process Control Techniques ๐ŸŽฎ

Let's explore more sophisticated process control patterns for complex scenarios!

sequenceDiagram participant User participant Manager participant Process participant Monitor participant System User->>Manager: Start Process Manager->>Process: Create & Launch Process->>Monitor: Register for Monitoring Monitor->>System: Check Resources System-->>Monitor: Resource Status Monitor->>Manager: Health Report Manager->>User: Process Status Note over Monitor: Continuous Monitoring Monitor->>Manager: Alert: High Resource Usage Manager->>Process: Apply Limits Process->>System: Reduced Resource Usage
class AdvancedProcessControl:
    """
    Advanced process control techniques.
    """
    
    @staticmethod
    def create_daemon_process(target_func: Callable, args: tuple = ()):
        """Create a true daemon process."""
        # First fork
        try:
            pid = os.fork()
            if pid > 0:
                # Parent process returns
                return pid
        except OSError as e:
            sys.stderr.write(f"First fork failed: {e}\n")
            sys.exit(1)
        
        # Decouple from parent environment
        os.chdir("/")
        os.setsid()
        os.umask(0)
        
        # Second fork
        try:
            pid = os.fork()
            if pid > 0:
                # First child exits
                sys.exit(0)
        except OSError as e:
            sys.stderr.write(f"Second fork failed: {e}\n")
            sys.exit(1)
        
        # Redirect standard file descriptors
        sys.stdout.flush()
        sys.stderr.flush()
        si = open('/dev/null', 'r')
        so = open('/dev/null', 'a+')
        se = open('/dev/null', 'a+')
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())
        
        # Run the daemon process
        target_func(*args)
    
    @staticmethod
    def create_supervised_process(command: List[str], 
                                 restart_on_failure: bool = True,
                                 max_restarts: int = 5):
        """Create a supervised process that auto-restarts on failure."""
        restart_count = 0
        
        while restart_count < max_restarts:
            try:
                process = subprocess.Popen(command)
                
                # Monitor the process
                while True:
                    ret = process.poll()
                    if ret is None:
                        # Process still running
                        time.sleep(1)
                    else:
                        # Process terminated
                        if ret == 0:
                            # Clean exit
                            break
                        elif restart_on_failure:
                            # Restart on failure
                            restart_count += 1
                            logging.warning(f"Process failed with code {ret}, restarting... ({restart_count}/{max_restarts})")
                            time.sleep(2)  # Wait before restart
                            break
                        else:
                            # Don't restart
                            break
                
                if ret == 0 or not restart_on_failure:
                    break
                    
            except Exception as e:
                logging.error(f"Failed to start process: {e}")
                if restart_on_failure:
                    restart_count += 1
                    time.sleep(2)
                else:
                    break
        
        if restart_count >= max_restarts:
            logging.error(f"Process failed {max_restarts} times, giving up")
    
    @staticmethod
    def implement_circuit_breaker(func: Callable, 
                                 failure_threshold: int = 5,
                                 timeout: int = 60):
        """Implement circuit breaker pattern for process execution."""
        failure_count = 0
        last_failure_time = None
        circuit_open = False
        
        def wrapper(*args, **kwargs):
            nonlocal failure_count, last_failure_time, circuit_open
            
            # Check if circuit is open
            if circuit_open:
                if time.time() - last_failure_time > timeout:
                    # Try to close circuit
                    circuit_open = False
                    failure_count = 0
                else:
                    raise Exception("Circuit breaker is open")
            
            try:
                result = func(*args, **kwargs)
                failure_count = 0  # Reset on success
                return result
                
            except Exception as e:
                failure_count += 1
                last_failure_time = time.time()
                
                if failure_count >= failure_threshold:
                    circuit_open = True
                    logging.error(f"Circuit breaker opened after {failure_count} failures")
                
                raise e
        
        return wrapper

Key Takeaways and Best Practices ๐ŸŽฏ

Process Management Best Practices ๐Ÿ“‹

Pro Tip: Process management is like conducting an orchestra - you need to know when to start instruments (processes), when to stop them, and how to keep them in harmony. Always implement proper monitoring and alerting. Remember that killing processes is easy, but understanding why they misbehave is valuable. Use tools like strace and ltrace to debug process issues, and always test process management scripts thoroughly before deploying to production!

Process management mastery gives you complete control over your system's computational resources. You can orchestrate complex workflows, optimize resource usage, and ensure system stability. Whether you're managing a single server or a distributed system, these skills are essential for system automation! ๐Ÿš€