Skip to main content

šŸ” Session Management: Maintain State Like a Pro

Sessions are the memory of web interactions - they remember who you are, what you've done, and where you've been. Like a skilled diplomat who maintains relationships across multiple meetings, session management lets you maintain complex, stateful interactions with websites over time. Let's master the art of digital persistence! šŸŽ­

The Session Management Architecture

Think of session management as conducting multiple conversations simultaneously. Each website is a different person you're talking to, and you need to remember the context of each conversation. Sessions keep track of your authentication, preferences, shopping carts, and navigation history. Master them, and you can automate the most complex web workflows!

graph TB A[Session Management] --> B[Authentication] A --> C[State Persistence] A --> D[Multi-Site Management] A --> E[Error Recovery] B --> F[Login Flows] B --> G[OAuth/SSO] B --> H[Token Management] B --> I[MFA Handling] C --> J[Cookie Storage] C --> K[Local Storage] C --> L[Session Storage] C --> M[Cache Management] D --> N[Session Pool] D --> O[Context Switching] D --> P[Concurrent Sessions] D --> Q[Session Isolation] E --> R[Retry Logic] E --> S[Session Recovery] E --> T[Failover] E --> U[Health Checks] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b

Real-World Scenario: The Multi-Platform Orchestrator 🌐

You're building an automation platform that manages accounts across dozens of services - social media, e-commerce, banking, and enterprise systems. Each has different session requirements, timeout policies, security measures, and state management needs. You need to handle OAuth flows, maintain multiple concurrent sessions, recover from failures, and ensure security. Let's build a robust session management system!

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from typing import Dict, List, Optional, Any, Callable, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import pickle
import time
import threading
import queue
from pathlib import Path
import hashlib
import base64
import jwt
import logging
from urllib.parse import urlparse, parse_qs, urlencode
from bs4 import BeautifulSoup
import re
from cryptography.fernet import Fernet
import sqlite3
from contextlib import contextmanager
import os

class SessionState(Enum):
    """Session states."""
    ACTIVE = "active"
    EXPIRED = "expired"
    INVALID = "invalid"
    LOCKED = "locked"
    REFRESHING = "refreshing"

class AuthType(Enum):
    """Authentication types."""
    BASIC = "basic"
    BEARER = "bearer"
    OAUTH1 = "oauth1"
    OAUTH2 = "oauth2"
    API_KEY = "api_key"
    COOKIE = "cookie"
    JWT = "jwt"
    CUSTOM = "custom"

@dataclass
class SessionConfig:
    """Session configuration."""
    name: str
    base_url: str
    auth_type: AuthType
    timeout: int = 30
    max_retries: int = 3
    retry_backoff: float = 1.0
    verify_ssl: bool = True
    proxy: Optional[Dict[str, str]] = None
    headers: Dict[str, str] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class SessionInfo:
    """Session information and metrics."""
    session_id: str
    created_at: datetime
    last_used: datetime
    state: SessionState
    request_count: int = 0
    error_count: int = 0
    data: Dict[str, Any] = field(default_factory=dict)

class AdvancedSession(requests.Session):
    """
    Enhanced session with advanced features.
    """
    
    def __init__(self, config: SessionConfig):
        super().__init__()
        self.config = config
        self.session_info = SessionInfo(
            session_id=self._generate_session_id(),
            created_at=datetime.now(),
            last_used=datetime.now(),
            state=SessionState.ACTIVE
        )
        
        # Setup session
        self._setup_session()
        self._setup_retries()
        
        # Thread safety
        self.lock = threading.Lock()
        
        # Logging
        self.logger = logging.getLogger(f"Session_{self.config.name}")
    
    def _generate_session_id(self) -> str:
        """Generate unique session ID."""
        data = f"{self.config.name}_{datetime.now().isoformat()}"
        return hashlib.sha256(data.encode()).hexdigest()[:16]
    
    def _setup_session(self):
        """Setup session configuration."""
        # Set default headers
        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/json,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }
        default_headers.update(self.config.headers)
        self.headers.update(default_headers)
        
        # Set SSL verification
        self.verify = self.config.verify_ssl
        
        # Set proxy if provided
        if self.config.proxy:
            self.proxies = self.config.proxy
        
        # Set timeout
        self.timeout = self.config.timeout
    
    def _setup_retries(self):
        """Setup retry strategy."""
        retry_strategy = Retry(
            total=self.config.max_retries,
            backoff_factor=self.config.retry_backoff,
            status_forcelist=[429, 500, 502, 503, 504],
            method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("http://", adapter)
        self.mount("https://", adapter)
    
    def request(self, method, url, **kwargs):
        """Override request to add session management."""
        with self.lock:
            # Update session info
            self.session_info.last_used = datetime.now()
            self.session_info.request_count += 1
            
            # Add base URL if relative
            if not url.startswith('http'):
                url = self.config.base_url.rstrip('/') + '/' + url.lstrip('/')
            
            # Set default timeout if not provided
            if 'timeout' not in kwargs:
                kwargs['timeout'] = self.timeout
            
            try:
                response = super().request(method, url, **kwargs)
                
                # Check for session expiration
                self._check_session_status(response)
                
                return response
                
            except Exception as e:
                self.session_info.error_count += 1
                self.logger.error(f"Request failed: {e}")
                raise
    
    def _check_session_status(self, response):
        """Check if session is still valid."""
        # Check for common session expiration indicators
        if response.status_code == 401:
            self.session_info.state = SessionState.EXPIRED
            self.logger.warning("Session expired (401 Unauthorized)")
        
        # Check for login page redirect
        if 'login' in response.url.lower() and 'login' not in response.request.url.lower():
            self.session_info.state = SessionState.EXPIRED
            self.logger.warning("Session expired (redirected to login)")
    
    def is_active(self) -> bool:
        """Check if session is active."""
        return self.session_info.state == SessionState.ACTIVE
    
    def get_info(self) -> SessionInfo:
        """Get session information."""
        return self.session_info

class SessionManager:
    """
    Comprehensive session management system.
    """
    
    def __init__(self, storage_path: str = "./sessions"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        
        self.sessions: Dict[str, AdvancedSession] = {}
        self.session_configs: Dict[str, SessionConfig] = {}
        
        # Setup components
        self.setup_logging()
        self.setup_database()
        self.auth_manager = AuthenticationManager()
        
        # Encryption for sensitive data
        self.cipher_key = Fernet.generate_key()
        self.cipher = Fernet(self.cipher_key)
        
        # Background tasks
        self.task_queue = queue.Queue()
        self.start_background_tasks()
    
    def setup_logging(self):
        """Setup logging configuration."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_database(self):
        """Setup SQLite database for session storage."""
        self.db_path = self.storage_path / "sessions.db"
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    name TEXT NOT NULL,
                    config TEXT,
                    cookies TEXT,
                    created_at TIMESTAMP,
                    last_used TIMESTAMP,
                    state TEXT,
                    metadata TEXT
                )
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS session_events (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    session_id TEXT,
                    event_type TEXT,
                    event_data TEXT,
                    timestamp TIMESTAMP,
                    FOREIGN KEY (session_id) REFERENCES sessions (session_id)
                )
            ''')
    
    def create_session(self, config: SessionConfig) -> AdvancedSession:
        """Create a new managed session."""
        session = AdvancedSession(config)
        
        self.sessions[config.name] = session
        self.session_configs[config.name] = config
        
        # Log session creation
        self.log_event(session.session_info.session_id, "created", {"config": config.name})
        
        self.logger.info(f"Created session: {config.name}")
        return session
    
    def get_session(self, name: str) -> Optional[AdvancedSession]:
        """Get session by name."""
        return self.sessions.get(name)
    
    def authenticate_session(self, name: str, credentials: Dict[str, Any]) -> bool:
        """Authenticate a session."""
        session = self.get_session(name)
        if not session:
            self.logger.error(f"Session not found: {name}")
            return False
        
        config = self.session_configs[name]
        
        # Perform authentication based on type
        if config.auth_type == AuthType.BASIC:
            success = self.auth_manager.basic_auth(session, credentials)
        elif config.auth_type == AuthType.BEARER:
            success = self.auth_manager.bearer_auth(session, credentials)
        elif config.auth_type == AuthType.OAUTH2:
            success = self.auth_manager.oauth2_auth(session, credentials)
        elif config.auth_type == AuthType.JWT:
            success = self.auth_manager.jwt_auth(session, credentials)
        elif config.auth_type == AuthType.COOKIE:
            success = self.auth_manager.cookie_auth(session, credentials)
        else:
            self.logger.warning(f"Unsupported auth type: {config.auth_type}")
            return False
        
        if success:
            self.log_event(session.session_info.session_id, "authenticated", {"method": config.auth_type.value})
        
        return success
    
    def save_session(self, name: str) -> bool:
        """Save session to persistent storage."""
        session = self.get_session(name)
        if not session:
            return False
        
        try:
            # Serialize cookies
            cookies_data = []
            for cookie in session.cookies:
                cookies_data.append({
                    'name': cookie.name,
                    'value': cookie.value,
                    'domain': cookie.domain,
                    'path': cookie.path,
                    'expires': cookie.expires
                })
            
            # Encrypt sensitive data
            encrypted_cookies = self.cipher.encrypt(
                json.dumps(cookies_data).encode()
            ).decode()
            
            # Save to database
            with sqlite3.connect(self.db_path) as conn:
                conn.execute('''
                    INSERT OR REPLACE INTO sessions 
                    (session_id, name, config, cookies, created_at, last_used, state, metadata)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    session.session_info.session_id,
                    name,
                    json.dumps(self.session_configs[name].__dict__, default=str),
                    encrypted_cookies,
                    session.session_info.created_at,
                    session.session_info.last_used,
                    session.session_info.state.value,
                    json.dumps(session.session_info.data)
                ))
            
            # Save to file backup
            session_file = self.storage_path / f"{name}.session"
            with open(session_file, 'wb') as f:
                pickle.dump(session.cookies, f)
            
            self.logger.info(f"Saved session: {name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to save session: {e}")
            return False
    
    def load_session(self, name: str) -> Optional[AdvancedSession]:
        """Load session from persistent storage."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute(
                    'SELECT * FROM sessions WHERE name = ?',
                    (name,)
                )
                row = cursor.fetchone()
            
            if not row:
                self.logger.warning(f"Session not found in storage: {name}")
                return None
            
            # Recreate session config
            config_data = json.loads(row[2])
            config = SessionConfig(
                name=config_data['name'],
                base_url=config_data['base_url'],
                auth_type=AuthType(config_data['auth_type']),
                timeout=config_data.get('timeout', 30),
                max_retries=config_data.get('max_retries', 3)
            )
            
            # Create new session
            session = self.create_session(config)
            
            # Restore cookies
            encrypted_cookies = row[3].encode()
            cookies_data = json.loads(
                self.cipher.decrypt(encrypted_cookies).decode()
            )
            
            for cookie_data in cookies_data:
                session.cookies.set(
                    cookie_data['name'],
                    cookie_data['value'],
                    domain=cookie_data.get('domain'),
                    path=cookie_data.get('path')
                )
            
            # Restore session info
            session.session_info.created_at = datetime.fromisoformat(row[4])
            session.session_info.last_used = datetime.fromisoformat(row[5])
            session.session_info.state = SessionState(row[6])
            session.session_info.data = json.loads(row[7])
            
            self.logger.info(f"Loaded session: {name}")
            return session
            
        except Exception as e:
            self.logger.error(f"Failed to load session: {e}")
            return None
    
    def refresh_session(self, name: str) -> bool:
        """Refresh an expired session."""
        session = self.get_session(name)
        if not session:
            return False
        
        session.session_info.state = SessionState.REFRESHING
        
        try:
            # Re-authenticate if needed
            if hasattr(session, '_refresh_token'):
                # OAuth2 refresh
                success = self.auth_manager.refresh_oauth2_token(
                    session,
                    session._refresh_token
                )
            else:
                # Re-login or other refresh mechanism
                success = self._relogin_session(session)
            
            if success:
                session.session_info.state = SessionState.ACTIVE
                self.log_event(session.session_info.session_id, "refreshed", {})
                return True
            else:
                session.session_info.state = SessionState.INVALID
                return False
                
        except Exception as e:
            self.logger.error(f"Failed to refresh session: {e}")
            session.session_info.state = SessionState.INVALID
            return False
    
    def _relogin_session(self, session: AdvancedSession) -> bool:
        """Attempt to re-login for a session."""
        # This would be implemented based on specific login requirements
        # For now, return False to indicate manual intervention needed
        return False
    
    def clone_session(self, name: str, new_name: str) -> Optional[AdvancedSession]:
        """Clone an existing session."""
        original = self.get_session(name)
        if not original:
            return None
        
        # Create new session with same config
        config = self.session_configs[name]
        new_config = SessionConfig(
            name=new_name,
            base_url=config.base_url,
            auth_type=config.auth_type,
            timeout=config.timeout,
            max_retries=config.max_retries,
            verify_ssl=config.verify_ssl,
            proxy=config.proxy,
            headers=config.headers.copy()
        )
        
        new_session = self.create_session(new_config)
        
        # Copy cookies
        for cookie in original.cookies:
            new_session.cookies.set_cookie(cookie)
        
        self.logger.info(f"Cloned session {name} to {new_name}")
        return new_session
    
    def destroy_session(self, name: str):
        """Destroy a session and clean up resources."""
        session = self.get_session(name)
        if session:
            # Clear cookies
            session.cookies.clear()
            
            # Close connections
            session.close()
            
            # Remove from storage
            del self.sessions[name]
            del self.session_configs[name]
            
            # Remove from database
            with sqlite3.connect(self.db_path) as conn:
                conn.execute('DELETE FROM sessions WHERE name = ?', (name,))
            
            self.logger.info(f"Destroyed session: {name}")
    
    def log_event(self, session_id: str, event_type: str, event_data: Dict):
        """Log session event."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                INSERT INTO session_events (session_id, event_type, event_data, timestamp)
                VALUES (?, ?, ?, ?)
            ''', (
                session_id,
                event_type,
                json.dumps(event_data),
                datetime.now()
            ))
    
    def get_session_history(self, name: str) -> List[Dict]:
        """Get session event history."""
        session = self.get_session(name)
        if not session:
            return []
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute('''
                SELECT event_type, event_data, timestamp
                FROM session_events
                WHERE session_id = ?
                ORDER BY timestamp DESC
            ''', (session.session_info.session_id,))
            
            events = []
            for row in cursor:
                events.append({
                    'type': row[0],
                    'data': json.loads(row[1]),
                    'timestamp': row[2]
                })
            
            return events
    
    def monitor_sessions(self) -> Dict[str, Dict]:
        """Get monitoring data for all sessions."""
        monitoring_data = {}
        
        for name, session in self.sessions.items():
            info = session.session_info
            monitoring_data[name] = {
                'state': info.state.value,
                'created_at': info.created_at.isoformat(),
                'last_used': info.last_used.isoformat(),
                'request_count': info.request_count,
                'error_count': info.error_count,
                'uptime': (datetime.now() - info.created_at).total_seconds(),
                'idle_time': (datetime.now() - info.last_used).total_seconds()
            }
        
        return monitoring_data
    
    def start_background_tasks(self):
        """Start background maintenance tasks."""
        def worker():
            while True:
                # Check for expired sessions
                for name, session in list(self.sessions.items()):
                    idle_time = (datetime.now() - session.session_info.last_used).total_seconds()
                    
                    # Auto-save after 5 minutes of idle
                    if idle_time > 300:
                        self.save_session(name)
                    
                    # Mark as expired after 30 minutes of idle
                    if idle_time > 1800 and session.session_info.state == SessionState.ACTIVE:
                        session.session_info.state = SessionState.EXPIRED
                        self.logger.info(f"Session {name} expired due to inactivity")
                
                time.sleep(60)  # Check every minute
        
        thread = threading.Thread(target=worker, daemon=True)
        thread.start()

class AuthenticationManager:
    """
    Handle various authentication methods.
    """
    
    def basic_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
        """Setup basic authentication."""
        username = credentials.get('username')
        password = credentials.get('password')
        
        if not username or not password:
            return False
        
        session.auth = (username, password)
        return True
    
    def bearer_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
        """Setup bearer token authentication."""
        token = credentials.get('token')
        
        if not token:
            return False
        
        session.headers['Authorization'] = f'Bearer {token}'
        return True
    
    def jwt_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
        """Setup JWT authentication."""
        token = credentials.get('token')
        
        if not token:
            # Generate JWT if credentials provided
            secret = credentials.get('secret')
            payload = credentials.get('payload', {})
            
            if secret and payload:
                token = jwt.encode(payload, secret, algorithm='HS256')
            else:
                return False
        
        session.headers['Authorization'] = f'Bearer {token}'
        session.session_info.data['jwt_token'] = token
        
        # Decode to get expiration
        try:
            decoded = jwt.decode(token, options={"verify_signature": False})
            if 'exp' in decoded:
                session.session_info.data['token_expires'] = datetime.fromtimestamp(decoded['exp'])
        except:
            pass
        
        return True
    
    def oauth2_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
        """Setup OAuth2 authentication."""
        client_id = credentials.get('client_id')
        client_secret = credentials.get('client_secret')
        token_url = credentials.get('token_url')
        
        if not all([client_id, client_secret, token_url]):
            return False
        
        # Request access token
        token_data = {
            'grant_type': 'client_credentials',
            'client_id': client_id,
            'client_secret': client_secret
        }
        
        response = requests.post(token_url, data=token_data)
        
        if response.status_code == 200:
            token_info = response.json()
            access_token = token_info.get('access_token')
            refresh_token = token_info.get('refresh_token')
            
            session.headers['Authorization'] = f'Bearer {access_token}'
            session._refresh_token = refresh_token
            session.session_info.data['oauth_token'] = token_info
            
            return True
        
        return False
    
    def refresh_oauth2_token(self, session: AdvancedSession, refresh_token: str) -> bool:
        """Refresh OAuth2 access token."""
        token_info = session.session_info.data.get('oauth_token', {})
        token_url = token_info.get('token_url')
        
        if not token_url:
            return False
        
        refresh_data = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token
        }
        
        response = requests.post(token_url, data=refresh_data)
        
        if response.status_code == 200:
            new_token_info = response.json()
            access_token = new_token_info.get('access_token')
            
            session.headers['Authorization'] = f'Bearer {access_token}'
            session.session_info.data['oauth_token'] = new_token_info
            
            return True
        
        return False
    
    def cookie_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
        """Setup cookie-based authentication."""
        login_url = credentials.get('login_url')
        username = credentials.get('username')
        password = credentials.get('password')
        
        if not all([login_url, username, password]):
            return False
        
        # Perform login
        login_data = {
            'username': username,
            'password': password
        }
        
        response = session.post(login_url, data=login_data)
        
        # Check if login successful (simple check)
        return response.status_code == 200 and session.cookies

class SessionPool:
    """
    Manage a pool of sessions for concurrent operations.
    """
    
    def __init__(self, config: SessionConfig, pool_size: int = 5):
        self.config = config
        self.pool_size = pool_size
        self.available = queue.Queue()
        self.in_use = set()
        self.lock = threading.Lock()
        
        # Initialize pool
        self.manager = SessionManager()
        self._initialize_pool()
    
    def _initialize_pool(self):
        """Initialize session pool."""
        for i in range(self.pool_size):
            config = SessionConfig(
                name=f"{self.config.name}_pool_{i}",
                base_url=self.config.base_url,
                auth_type=self.config.auth_type,
                timeout=self.config.timeout,
                max_retries=self.config.max_retries
            )
            
            session = self.manager.create_session(config)
            self.available.put(session)
    
    @contextmanager
    def get_session(self, timeout: int = 30):
        """Get a session from the pool."""
        session = None
        
        try:
            # Get available session
            session = self.available.get(timeout=timeout)
            
            with self.lock:
                self.in_use.add(session)
            
            yield session
            
        finally:
            # Return session to pool
            if session:
                with self.lock:
                    self.in_use.discard(session)
                
                self.available.put(session)
    
    def execute_parallel(self, func: Callable, items: List[Any]) -> List[Any]:
        """Execute function in parallel using session pool."""
        results = []
        threads = []
        
        def worker(item):
            with self.get_session() as session:
                result = func(session, item)
                results.append(result)
        
        for item in items:
            thread = threading.Thread(target=worker, args=(item,))
            thread.start()
            threads.append(thread)
        
        for thread in threads:
            thread.join()
        
        return results

# Example usage
if __name__ == "__main__":
    print("šŸ” Session Management Examples\n")
    
    # Initialize session manager
    manager = SessionManager()
    
    # Example 1: Create and configure session
    print("1ļøāƒ£ Creating Sessions:")
    
    config = SessionConfig(
        name="github_api",
        base_url="https://api.github.com",
        auth_type=AuthType.BEARER,
        timeout=30,
        max_retries=3
    )
    
    session = manager.create_session(config)
    print(f"   Created session: {config.name}")
    print(f"   Session ID: {session.session_info.session_id}")
    
    # Example 2: Authentication
    print("\n2ļøāƒ£ Authentication:")
    
    # Bearer token auth
    success = manager.authenticate_session(
        "github_api",
        {"token": "ghp_example_token_12345"}
    )
    print(f"   Authentication: {'āœ“ Success' if success else 'āœ— Failed'}")
    
    # Example 3: Making requests
    print("\n3ļøāƒ£ Making Requests:")
    
    try:
        # This would make a real request in production
        # response = session.get("/user")
        # print(f"   Response: {response.status_code}")
        print("   (Skipping actual request in example)")
    except:
        pass
    
    # Example 4: Session persistence
    print("\n4ļøāƒ£ Session Persistence:")
    
    # Save session
    manager.save_session("github_api")
    print("   Session saved to storage")
    
    # Destroy and reload
    manager.destroy_session("github_api")
    print("   Session destroyed")
    
    loaded_session = manager.load_session("github_api")
    if loaded_session:
        print("   Session loaded from storage")
    
    # Example 5: Multiple sessions
    print("\n5ļøāƒ£ Multiple Sessions:")
    
    # Create multiple sessions
    sites = [
        SessionConfig("site1", "https://example1.com", AuthType.COOKIE),
        SessionConfig("site2", "https://example2.com", AuthType.JWT),
        SessionConfig("site3", "https://example3.com", AuthType.OAUTH2)
    ]
    
    for site_config in sites:
        manager.create_session(site_config)
        print(f"   Created session: {site_config.name}")
    
    # Monitor all sessions
    monitoring = manager.monitor_sessions()
    for name, data in monitoring.items():
        print(f"   {name}: State={data['state']}, Requests={data['request_count']}")
    
    # Example 6: Session pool
    print("\n6ļøāƒ£ Session Pool:")
    
    pool_config = SessionConfig(
        name="api_pool",
        base_url="https://jsonplaceholder.typicode.com",
        auth_type=AuthType.BASIC
    )
    
    pool = SessionPool(pool_config, pool_size=3)
    print(f"   Created session pool with {pool.pool_size} sessions")
    
    # Use session from pool
    with pool.get_session() as pooled_session:
        print(f"   Got session from pool: {pooled_session.config.name}")
        # Use the session...
    
    print("   Session returned to pool")
    
    # Example 7: Session cloning
    print("\n7ļøāƒ£ Session Cloning:")
    
    # Create original session
    original_config = SessionConfig(
        name="original",
        base_url="https://httpbin.org",
        auth_type=AuthType.BASIC,
        headers={"X-Custom": "Header"}
    )
    
    original = manager.create_session(original_config)
    
    # Clone session
    cloned = manager.clone_session("original", "cloned")
    if cloned:
        print(f"   Cloned session: {cloned.config.name}")
        print(f"   Headers match: {original.headers == cloned.headers}")
    
    # Example 8: Session events
    print("\n8ļøāƒ£ Session Events:")
    
    # Get event history
    history = manager.get_session_history("original")
    for event in history[:5]:
        print(f"   {event['type']}: {event['timestamp']}")
    
    # Example 9: OAuth2 flow simulation
    print("\n9ļøāƒ£ OAuth2 Flow:")
    
    oauth_config = SessionConfig(
        name="oauth_service",
        base_url="https://oauth.example.com",
        auth_type=AuthType.OAUTH2
    )
    
    oauth_session = manager.create_session(oauth_config)
    
    # Simulate OAuth2 authentication
    oauth_creds = {
        'client_id': 'your_client_id',
        'client_secret': 'your_client_secret',
        'token_url': 'https://oauth.example.com/token'
    }
    
    # This would perform actual OAuth2 flow in production
    print("   OAuth2 authentication configured")
    
    # Example 10: Session health check
    print("\nšŸ”Ÿ Session Health:")
    
    for name, session in manager.sessions.items():
        info = session.get_info()
        health = "āœ“ Active" if session.is_active() else "āœ— Inactive"
        print(f"   {name}: {health}")
        print(f"     Requests: {info.request_count}, Errors: {info.error_count}")
    
    print("\nāœ… Session management demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Session Management Best Practices šŸ“‹

Pro Tip: Think of sessions as relationships - they need to be maintained, renewed, and sometimes ended gracefully. Always use session objects for multiple requests to the same site (connection pooling saves time and resources). Implement automatic session refresh for OAuth2 and JWT tokens. Use session pools for concurrent operations to avoid rate limiting. Store sessions securely - encrypt sensitive data and use secure storage. Monitor session health with metrics like request count, error rate, and idle time. Remember that sessions expire - implement detection and recovery mechanisms. Most importantly: respect rate limits and implement exponential backoff for retries. Good session management is invisible when it works and invaluable when things go wrong!

Session management mastery transforms you from a simple scraper to an enterprise-grade automation engineer. You can now maintain complex stateful interactions, handle authentication elegantly, recover from failures gracefully, and scale your operations efficiently. Whether you're building web scrapers, API clients, or automation platforms, these session management skills are your foundation for reliable web automation! šŸš€