š Session Management: Maintain State Like a Pro
Sessions are the memory of web interactions - they remember who you are, what you've done, and where you've been. Like a skilled diplomat who maintains relationships across multiple meetings, session management lets you maintain complex, stateful interactions with websites over time. Let's master the art of digital persistence! š
The Session Management Architecture
Think of session management as conducting multiple conversations simultaneously. Each website is a different person you're talking to, and you need to remember the context of each conversation. Sessions keep track of your authentication, preferences, shopping carts, and navigation history. Master them, and you can automate the most complex web workflows!
Real-World Scenario: The Multi-Platform Orchestrator š
You're building an automation platform that manages accounts across dozens of services - social media, e-commerce, banking, and enterprise systems. Each has different session requirements, timeout policies, security measures, and state management needs. You need to handle OAuth flows, maintain multiple concurrent sessions, recover from failures, and ensure security. Let's build a robust session management system!
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from typing import Dict, List, Optional, Any, Callable, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import pickle
import time
import threading
import queue
from pathlib import Path
import hashlib
import base64
import jwt
import logging
from urllib.parse import urlparse, parse_qs, urlencode
from bs4 import BeautifulSoup
import re
from cryptography.fernet import Fernet
import sqlite3
from contextlib import contextmanager
import os
class SessionState(Enum):
"""Session states."""
ACTIVE = "active"
EXPIRED = "expired"
INVALID = "invalid"
LOCKED = "locked"
REFRESHING = "refreshing"
class AuthType(Enum):
"""Authentication types."""
BASIC = "basic"
BEARER = "bearer"
OAUTH1 = "oauth1"
OAUTH2 = "oauth2"
API_KEY = "api_key"
COOKIE = "cookie"
JWT = "jwt"
CUSTOM = "custom"
@dataclass
class SessionConfig:
"""Session configuration."""
name: str
base_url: str
auth_type: AuthType
timeout: int = 30
max_retries: int = 3
retry_backoff: float = 1.0
verify_ssl: bool = True
proxy: Optional[Dict[str, str]] = None
headers: Dict[str, str] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SessionInfo:
"""Session information and metrics."""
session_id: str
created_at: datetime
last_used: datetime
state: SessionState
request_count: int = 0
error_count: int = 0
data: Dict[str, Any] = field(default_factory=dict)
class AdvancedSession(requests.Session):
"""
Enhanced session with advanced features.
"""
def __init__(self, config: SessionConfig):
super().__init__()
self.config = config
self.session_info = SessionInfo(
session_id=self._generate_session_id(),
created_at=datetime.now(),
last_used=datetime.now(),
state=SessionState.ACTIVE
)
# Setup session
self._setup_session()
self._setup_retries()
# Thread safety
self.lock = threading.Lock()
# Logging
self.logger = logging.getLogger(f"Session_{self.config.name}")
def _generate_session_id(self) -> str:
"""Generate unique session ID."""
data = f"{self.config.name}_{datetime.now().isoformat()}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def _setup_session(self):
"""Setup session configuration."""
# Set default headers
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/json,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
default_headers.update(self.config.headers)
self.headers.update(default_headers)
# Set SSL verification
self.verify = self.config.verify_ssl
# Set proxy if provided
if self.config.proxy:
self.proxies = self.config.proxy
# Set timeout
self.timeout = self.config.timeout
def _setup_retries(self):
"""Setup retry strategy."""
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=self.config.retry_backoff,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount("http://", adapter)
self.mount("https://", adapter)
def request(self, method, url, **kwargs):
"""Override request to add session management."""
with self.lock:
# Update session info
self.session_info.last_used = datetime.now()
self.session_info.request_count += 1
# Add base URL if relative
if not url.startswith('http'):
url = self.config.base_url.rstrip('/') + '/' + url.lstrip('/')
# Set default timeout if not provided
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
try:
response = super().request(method, url, **kwargs)
# Check for session expiration
self._check_session_status(response)
return response
except Exception as e:
self.session_info.error_count += 1
self.logger.error(f"Request failed: {e}")
raise
def _check_session_status(self, response):
"""Check if session is still valid."""
# Check for common session expiration indicators
if response.status_code == 401:
self.session_info.state = SessionState.EXPIRED
self.logger.warning("Session expired (401 Unauthorized)")
# Check for login page redirect
if 'login' in response.url.lower() and 'login' not in response.request.url.lower():
self.session_info.state = SessionState.EXPIRED
self.logger.warning("Session expired (redirected to login)")
def is_active(self) -> bool:
"""Check if session is active."""
return self.session_info.state == SessionState.ACTIVE
def get_info(self) -> SessionInfo:
"""Get session information."""
return self.session_info
class SessionManager:
"""
Comprehensive session management system.
"""
def __init__(self, storage_path: str = "./sessions"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
self.sessions: Dict[str, AdvancedSession] = {}
self.session_configs: Dict[str, SessionConfig] = {}
# Setup components
self.setup_logging()
self.setup_database()
self.auth_manager = AuthenticationManager()
# Encryption for sensitive data
self.cipher_key = Fernet.generate_key()
self.cipher = Fernet(self.cipher_key)
# Background tasks
self.task_queue = queue.Queue()
self.start_background_tasks()
def setup_logging(self):
"""Setup logging configuration."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def setup_database(self):
"""Setup SQLite database for session storage."""
self.db_path = self.storage_path / "sessions.db"
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
config TEXT,
cookies TEXT,
created_at TIMESTAMP,
last_used TIMESTAMP,
state TEXT,
metadata TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS session_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
event_type TEXT,
event_data TEXT,
timestamp TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (session_id)
)
''')
def create_session(self, config: SessionConfig) -> AdvancedSession:
"""Create a new managed session."""
session = AdvancedSession(config)
self.sessions[config.name] = session
self.session_configs[config.name] = config
# Log session creation
self.log_event(session.session_info.session_id, "created", {"config": config.name})
self.logger.info(f"Created session: {config.name}")
return session
def get_session(self, name: str) -> Optional[AdvancedSession]:
"""Get session by name."""
return self.sessions.get(name)
def authenticate_session(self, name: str, credentials: Dict[str, Any]) -> bool:
"""Authenticate a session."""
session = self.get_session(name)
if not session:
self.logger.error(f"Session not found: {name}")
return False
config = self.session_configs[name]
# Perform authentication based on type
if config.auth_type == AuthType.BASIC:
success = self.auth_manager.basic_auth(session, credentials)
elif config.auth_type == AuthType.BEARER:
success = self.auth_manager.bearer_auth(session, credentials)
elif config.auth_type == AuthType.OAUTH2:
success = self.auth_manager.oauth2_auth(session, credentials)
elif config.auth_type == AuthType.JWT:
success = self.auth_manager.jwt_auth(session, credentials)
elif config.auth_type == AuthType.COOKIE:
success = self.auth_manager.cookie_auth(session, credentials)
else:
self.logger.warning(f"Unsupported auth type: {config.auth_type}")
return False
if success:
self.log_event(session.session_info.session_id, "authenticated", {"method": config.auth_type.value})
return success
def save_session(self, name: str) -> bool:
"""Save session to persistent storage."""
session = self.get_session(name)
if not session:
return False
try:
# Serialize cookies
cookies_data = []
for cookie in session.cookies:
cookies_data.append({
'name': cookie.name,
'value': cookie.value,
'domain': cookie.domain,
'path': cookie.path,
'expires': cookie.expires
})
# Encrypt sensitive data
encrypted_cookies = self.cipher.encrypt(
json.dumps(cookies_data).encode()
).decode()
# Save to database
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO sessions
(session_id, name, config, cookies, created_at, last_used, state, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
session.session_info.session_id,
name,
json.dumps(self.session_configs[name].__dict__, default=str),
encrypted_cookies,
session.session_info.created_at,
session.session_info.last_used,
session.session_info.state.value,
json.dumps(session.session_info.data)
))
# Save to file backup
session_file = self.storage_path / f"{name}.session"
with open(session_file, 'wb') as f:
pickle.dump(session.cookies, f)
self.logger.info(f"Saved session: {name}")
return True
except Exception as e:
self.logger.error(f"Failed to save session: {e}")
return False
def load_session(self, name: str) -> Optional[AdvancedSession]:
"""Load session from persistent storage."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT * FROM sessions WHERE name = ?',
(name,)
)
row = cursor.fetchone()
if not row:
self.logger.warning(f"Session not found in storage: {name}")
return None
# Recreate session config
config_data = json.loads(row[2])
config = SessionConfig(
name=config_data['name'],
base_url=config_data['base_url'],
auth_type=AuthType(config_data['auth_type']),
timeout=config_data.get('timeout', 30),
max_retries=config_data.get('max_retries', 3)
)
# Create new session
session = self.create_session(config)
# Restore cookies
encrypted_cookies = row[3].encode()
cookies_data = json.loads(
self.cipher.decrypt(encrypted_cookies).decode()
)
for cookie_data in cookies_data:
session.cookies.set(
cookie_data['name'],
cookie_data['value'],
domain=cookie_data.get('domain'),
path=cookie_data.get('path')
)
# Restore session info
session.session_info.created_at = datetime.fromisoformat(row[4])
session.session_info.last_used = datetime.fromisoformat(row[5])
session.session_info.state = SessionState(row[6])
session.session_info.data = json.loads(row[7])
self.logger.info(f"Loaded session: {name}")
return session
except Exception as e:
self.logger.error(f"Failed to load session: {e}")
return None
def refresh_session(self, name: str) -> bool:
"""Refresh an expired session."""
session = self.get_session(name)
if not session:
return False
session.session_info.state = SessionState.REFRESHING
try:
# Re-authenticate if needed
if hasattr(session, '_refresh_token'):
# OAuth2 refresh
success = self.auth_manager.refresh_oauth2_token(
session,
session._refresh_token
)
else:
# Re-login or other refresh mechanism
success = self._relogin_session(session)
if success:
session.session_info.state = SessionState.ACTIVE
self.log_event(session.session_info.session_id, "refreshed", {})
return True
else:
session.session_info.state = SessionState.INVALID
return False
except Exception as e:
self.logger.error(f"Failed to refresh session: {e}")
session.session_info.state = SessionState.INVALID
return False
def _relogin_session(self, session: AdvancedSession) -> bool:
"""Attempt to re-login for a session."""
# This would be implemented based on specific login requirements
# For now, return False to indicate manual intervention needed
return False
def clone_session(self, name: str, new_name: str) -> Optional[AdvancedSession]:
"""Clone an existing session."""
original = self.get_session(name)
if not original:
return None
# Create new session with same config
config = self.session_configs[name]
new_config = SessionConfig(
name=new_name,
base_url=config.base_url,
auth_type=config.auth_type,
timeout=config.timeout,
max_retries=config.max_retries,
verify_ssl=config.verify_ssl,
proxy=config.proxy,
headers=config.headers.copy()
)
new_session = self.create_session(new_config)
# Copy cookies
for cookie in original.cookies:
new_session.cookies.set_cookie(cookie)
self.logger.info(f"Cloned session {name} to {new_name}")
return new_session
def destroy_session(self, name: str):
"""Destroy a session and clean up resources."""
session = self.get_session(name)
if session:
# Clear cookies
session.cookies.clear()
# Close connections
session.close()
# Remove from storage
del self.sessions[name]
del self.session_configs[name]
# Remove from database
with sqlite3.connect(self.db_path) as conn:
conn.execute('DELETE FROM sessions WHERE name = ?', (name,))
self.logger.info(f"Destroyed session: {name}")
def log_event(self, session_id: str, event_type: str, event_data: Dict):
"""Log session event."""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO session_events (session_id, event_type, event_data, timestamp)
VALUES (?, ?, ?, ?)
''', (
session_id,
event_type,
json.dumps(event_data),
datetime.now()
))
def get_session_history(self, name: str) -> List[Dict]:
"""Get session event history."""
session = self.get_session(name)
if not session:
return []
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
SELECT event_type, event_data, timestamp
FROM session_events
WHERE session_id = ?
ORDER BY timestamp DESC
''', (session.session_info.session_id,))
events = []
for row in cursor:
events.append({
'type': row[0],
'data': json.loads(row[1]),
'timestamp': row[2]
})
return events
def monitor_sessions(self) -> Dict[str, Dict]:
"""Get monitoring data for all sessions."""
monitoring_data = {}
for name, session in self.sessions.items():
info = session.session_info
monitoring_data[name] = {
'state': info.state.value,
'created_at': info.created_at.isoformat(),
'last_used': info.last_used.isoformat(),
'request_count': info.request_count,
'error_count': info.error_count,
'uptime': (datetime.now() - info.created_at).total_seconds(),
'idle_time': (datetime.now() - info.last_used).total_seconds()
}
return monitoring_data
def start_background_tasks(self):
"""Start background maintenance tasks."""
def worker():
while True:
# Check for expired sessions
for name, session in list(self.sessions.items()):
idle_time = (datetime.now() - session.session_info.last_used).total_seconds()
# Auto-save after 5 minutes of idle
if idle_time > 300:
self.save_session(name)
# Mark as expired after 30 minutes of idle
if idle_time > 1800 and session.session_info.state == SessionState.ACTIVE:
session.session_info.state = SessionState.EXPIRED
self.logger.info(f"Session {name} expired due to inactivity")
time.sleep(60) # Check every minute
thread = threading.Thread(target=worker, daemon=True)
thread.start()
class AuthenticationManager:
"""
Handle various authentication methods.
"""
def basic_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
"""Setup basic authentication."""
username = credentials.get('username')
password = credentials.get('password')
if not username or not password:
return False
session.auth = (username, password)
return True
def bearer_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
"""Setup bearer token authentication."""
token = credentials.get('token')
if not token:
return False
session.headers['Authorization'] = f'Bearer {token}'
return True
def jwt_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
"""Setup JWT authentication."""
token = credentials.get('token')
if not token:
# Generate JWT if credentials provided
secret = credentials.get('secret')
payload = credentials.get('payload', {})
if secret and payload:
token = jwt.encode(payload, secret, algorithm='HS256')
else:
return False
session.headers['Authorization'] = f'Bearer {token}'
session.session_info.data['jwt_token'] = token
# Decode to get expiration
try:
decoded = jwt.decode(token, options={"verify_signature": False})
if 'exp' in decoded:
session.session_info.data['token_expires'] = datetime.fromtimestamp(decoded['exp'])
except:
pass
return True
def oauth2_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
"""Setup OAuth2 authentication."""
client_id = credentials.get('client_id')
client_secret = credentials.get('client_secret')
token_url = credentials.get('token_url')
if not all([client_id, client_secret, token_url]):
return False
# Request access token
token_data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
}
response = requests.post(token_url, data=token_data)
if response.status_code == 200:
token_info = response.json()
access_token = token_info.get('access_token')
refresh_token = token_info.get('refresh_token')
session.headers['Authorization'] = f'Bearer {access_token}'
session._refresh_token = refresh_token
session.session_info.data['oauth_token'] = token_info
return True
return False
def refresh_oauth2_token(self, session: AdvancedSession, refresh_token: str) -> bool:
"""Refresh OAuth2 access token."""
token_info = session.session_info.data.get('oauth_token', {})
token_url = token_info.get('token_url')
if not token_url:
return False
refresh_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
response = requests.post(token_url, data=refresh_data)
if response.status_code == 200:
new_token_info = response.json()
access_token = new_token_info.get('access_token')
session.headers['Authorization'] = f'Bearer {access_token}'
session.session_info.data['oauth_token'] = new_token_info
return True
return False
def cookie_auth(self, session: AdvancedSession, credentials: Dict) -> bool:
"""Setup cookie-based authentication."""
login_url = credentials.get('login_url')
username = credentials.get('username')
password = credentials.get('password')
if not all([login_url, username, password]):
return False
# Perform login
login_data = {
'username': username,
'password': password
}
response = session.post(login_url, data=login_data)
# Check if login successful (simple check)
return response.status_code == 200 and session.cookies
class SessionPool:
"""
Manage a pool of sessions for concurrent operations.
"""
def __init__(self, config: SessionConfig, pool_size: int = 5):
self.config = config
self.pool_size = pool_size
self.available = queue.Queue()
self.in_use = set()
self.lock = threading.Lock()
# Initialize pool
self.manager = SessionManager()
self._initialize_pool()
def _initialize_pool(self):
"""Initialize session pool."""
for i in range(self.pool_size):
config = SessionConfig(
name=f"{self.config.name}_pool_{i}",
base_url=self.config.base_url,
auth_type=self.config.auth_type,
timeout=self.config.timeout,
max_retries=self.config.max_retries
)
session = self.manager.create_session(config)
self.available.put(session)
@contextmanager
def get_session(self, timeout: int = 30):
"""Get a session from the pool."""
session = None
try:
# Get available session
session = self.available.get(timeout=timeout)
with self.lock:
self.in_use.add(session)
yield session
finally:
# Return session to pool
if session:
with self.lock:
self.in_use.discard(session)
self.available.put(session)
def execute_parallel(self, func: Callable, items: List[Any]) -> List[Any]:
"""Execute function in parallel using session pool."""
results = []
threads = []
def worker(item):
with self.get_session() as session:
result = func(session, item)
results.append(result)
for item in items:
thread = threading.Thread(target=worker, args=(item,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return results
# Example usage
if __name__ == "__main__":
print("š Session Management Examples\n")
# Initialize session manager
manager = SessionManager()
# Example 1: Create and configure session
print("1ļøā£ Creating Sessions:")
config = SessionConfig(
name="github_api",
base_url="https://api.github.com",
auth_type=AuthType.BEARER,
timeout=30,
max_retries=3
)
session = manager.create_session(config)
print(f" Created session: {config.name}")
print(f" Session ID: {session.session_info.session_id}")
# Example 2: Authentication
print("\n2ļøā£ Authentication:")
# Bearer token auth
success = manager.authenticate_session(
"github_api",
{"token": "ghp_example_token_12345"}
)
print(f" Authentication: {'ā Success' if success else 'ā Failed'}")
# Example 3: Making requests
print("\n3ļøā£ Making Requests:")
try:
# This would make a real request in production
# response = session.get("/user")
# print(f" Response: {response.status_code}")
print(" (Skipping actual request in example)")
except:
pass
# Example 4: Session persistence
print("\n4ļøā£ Session Persistence:")
# Save session
manager.save_session("github_api")
print(" Session saved to storage")
# Destroy and reload
manager.destroy_session("github_api")
print(" Session destroyed")
loaded_session = manager.load_session("github_api")
if loaded_session:
print(" Session loaded from storage")
# Example 5: Multiple sessions
print("\n5ļøā£ Multiple Sessions:")
# Create multiple sessions
sites = [
SessionConfig("site1", "https://example1.com", AuthType.COOKIE),
SessionConfig("site2", "https://example2.com", AuthType.JWT),
SessionConfig("site3", "https://example3.com", AuthType.OAUTH2)
]
for site_config in sites:
manager.create_session(site_config)
print(f" Created session: {site_config.name}")
# Monitor all sessions
monitoring = manager.monitor_sessions()
for name, data in monitoring.items():
print(f" {name}: State={data['state']}, Requests={data['request_count']}")
# Example 6: Session pool
print("\n6ļøā£ Session Pool:")
pool_config = SessionConfig(
name="api_pool",
base_url="https://jsonplaceholder.typicode.com",
auth_type=AuthType.BASIC
)
pool = SessionPool(pool_config, pool_size=3)
print(f" Created session pool with {pool.pool_size} sessions")
# Use session from pool
with pool.get_session() as pooled_session:
print(f" Got session from pool: {pooled_session.config.name}")
# Use the session...
print(" Session returned to pool")
# Example 7: Session cloning
print("\n7ļøā£ Session Cloning:")
# Create original session
original_config = SessionConfig(
name="original",
base_url="https://httpbin.org",
auth_type=AuthType.BASIC,
headers={"X-Custom": "Header"}
)
original = manager.create_session(original_config)
# Clone session
cloned = manager.clone_session("original", "cloned")
if cloned:
print(f" Cloned session: {cloned.config.name}")
print(f" Headers match: {original.headers == cloned.headers}")
# Example 8: Session events
print("\n8ļøā£ Session Events:")
# Get event history
history = manager.get_session_history("original")
for event in history[:5]:
print(f" {event['type']}: {event['timestamp']}")
# Example 9: OAuth2 flow simulation
print("\n9ļøā£ OAuth2 Flow:")
oauth_config = SessionConfig(
name="oauth_service",
base_url="https://oauth.example.com",
auth_type=AuthType.OAUTH2
)
oauth_session = manager.create_session(oauth_config)
# Simulate OAuth2 authentication
oauth_creds = {
'client_id': 'your_client_id',
'client_secret': 'your_client_secret',
'token_url': 'https://oauth.example.com/token'
}
# This would perform actual OAuth2 flow in production
print(" OAuth2 authentication configured")
# Example 10: Session health check
print("\nš Session Health:")
for name, session in manager.sessions.items():
info = session.get_info()
health = "ā Active" if session.is_active() else "ā Inactive"
print(f" {name}: {health}")
print(f" Requests: {info.request_count}, Errors: {info.error_count}")
print("\nā
Session management demonstration complete!")
Key Takeaways and Best Practices šÆ
- Use Session Objects: Always use session objects instead of individual requests for efficiency.
- Implement Retry Logic: Network requests fail - always have retry strategies.
- Persist Sessions: Save and restore sessions to avoid re-authentication.
- Monitor Session Health: Track metrics and detect expired sessions proactively.
- Handle Authentication Properly: Support multiple auth methods and token refresh.
- Use Connection Pooling: Reuse connections for better performance.
- Implement Timeouts: Always set timeouts to prevent hanging requests.
Session Management Best Practices š
Session management mastery transforms you from a simple scraper to an enterprise-grade automation engineer. You can now maintain complex stateful interactions, handle authentication elegantly, recover from failures gracefully, and scale your operations efficiently. Whether you're building web scrapers, API clients, or automation platforms, these session management skills are your foundation for reliable web automation! š
Pro Tip: Think of sessions as relationships - they need to be maintained, renewed, and sometimes ended gracefully. Always use session objects for multiple requests to the same site (connection pooling saves time and resources). Implement automatic session refresh for OAuth2 and JWT tokens. Use session pools for concurrent operations to avoid rate limiting. Store sessions securely - encrypt sensitive data and use secure storage. Monitor session health with metrics like request count, error rate, and idle time. Remember that sessions expire - implement detection and recovery mechanisms. Most importantly: respect rate limits and implement exponential backoff for retries. Good session management is invisible when it works and invaluable when things go wrong!