Skip to main content

ā±ļø Rate Limiting: Respect API Boundaries

Rate limiting is the traffic control system of the API world - it ensures fair usage, prevents abuse, and keeps services running smoothly for everyone. Like a highway with speed limits and lane management, rate limiting helps you navigate API restrictions while maximizing throughput. Master these patterns to build respectful, efficient API clients that never get blocked! 🚦

The Rate Limiting Ecosystem

APIs implement rate limits to protect their infrastructure and ensure quality of service. Think of it as a restaurant that can only serve so many customers at once - respecting these limits ensures everyone gets served. Understanding different rate limiting strategies, implementing intelligent throttling, and handling limit violations gracefully are essential skills for robust API automation!

graph TB A[Rate Limiting] --> B[Limit Types] A --> C[Detection Methods] A --> D[Handling Strategies] A --> E[Implementation Patterns] B --> F[Requests/Second] B --> G[Requests/Minute] B --> H[Daily Quotas] B --> I[Concurrent Limits] B --> J[Burst Limits] C --> K[Response Headers] C --> L[Status Codes] C --> M[Error Messages] C --> N[Documentation] D --> O[Throttling] D --> P[Backoff] D --> Q[Queuing] D --> R[Circuit Breaker] E --> S[Token Bucket] E --> T[Sliding Window] E --> U[Fixed Window] E --> V[Leaky Bucket] W[Monitoring] --> X[Metrics] W --> Y[Alerts] W --> Z[Analytics] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style W fill:#51cf66

Real-World Scenario: The Multi-API Rate Manager šŸŽÆ

You're building a rate limiting system that manages multiple APIs with different limits - some allow 100 requests/minute, others 1000/hour, some have burst allowances, others strict limits. Your system must track usage across multiple API keys, implement various throttling algorithms, handle 429 responses gracefully, provide detailed metrics, and optimize throughput while never exceeding limits. Let's build a comprehensive rate limiting framework!

# First, install required packages:
# pip install requests aiohttp asyncio-throttle ratelimit backoff redis

import time
import asyncio
import threading
import logging
from typing import Dict, List, Optional, Any, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from collections import deque, defaultdict
import heapq
from functools import wraps
import json
import redis

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ==================== Rate Limit Types ====================

class RateLimitType(Enum):
    """Types of rate limits."""
    FIXED_WINDOW = "fixed_window"
    SLIDING_WINDOW = "sliding_window"
    TOKEN_BUCKET = "token_bucket"
    LEAKY_BUCKET = "leaky_bucket"
    CONCURRENT = "concurrent"
    BURST = "burst"

@dataclass
class RateLimitConfig:
    """Configuration for rate limiting."""
    limit_type: RateLimitType = RateLimitType.SLIDING_WINDOW
    requests_per_second: Optional[float] = None
    requests_per_minute: Optional[int] = None
    requests_per_hour: Optional[int] = None
    requests_per_day: Optional[int] = None
    
    # Burst configuration
    burst_size: Optional[int] = None
    burst_duration: Optional[float] = None
    
    # Concurrent limits
    max_concurrent: Optional[int] = None
    
    # Backoff configuration
    enable_backoff: bool = True
    initial_backoff: float = 1.0
    max_backoff: float = 60.0
    backoff_factor: float = 2.0
    
    # Monitoring
    track_metrics: bool = True
    alert_threshold: float = 0.9  # Alert at 90% of limit

# ==================== Rate Limit Tracker ====================

class RateLimitTracker:
    """
    Track API rate limit usage from response headers.
    """
    
    def __init__(self):
        self.limits = {}
        self.logger = logging.getLogger(__name__)
    
    def update_from_headers(self, api_name: str, headers: Dict[str, str]):
        """
        Update rate limit info from response headers.
        
        Common header patterns:
        - X-RateLimit-Limit: Maximum requests
        - X-RateLimit-Remaining: Requests left
        - X-RateLimit-Reset: Reset timestamp
        - Retry-After: Seconds to wait
        """
        
        limit_info = {}
        
        # Standard headers
        if "X-RateLimit-Limit" in headers:
            limit_info["limit"] = int(headers["X-RateLimit-Limit"])
        
        if "X-RateLimit-Remaining" in headers:
            limit_info["remaining"] = int(headers["X-RateLimit-Remaining"])
        
        if "X-RateLimit-Reset" in headers:
            reset_time = int(headers["X-RateLimit-Reset"])
            limit_info["reset"] = datetime.fromtimestamp(reset_time)
        
        if "Retry-After" in headers:
            retry_after = headers["Retry-After"]
            if retry_after.isdigit():
                limit_info["retry_after"] = int(retry_after)
            else:
                # HTTP date format
                limit_info["retry_after_date"] = retry_after
        
        # GitHub style
        if "X-RateLimit-Used" in headers:
            limit_info["used"] = int(headers["X-RateLimit-Used"])
        
        # Alternative patterns
        if "RateLimit-Limit" in headers:
            limit_info["limit"] = int(headers["RateLimit-Limit"])
        
        if "RateLimit-Remaining" in headers:
            limit_info["remaining"] = int(headers["RateLimit-Remaining"])
        
        # Store info
        if limit_info:
            self.limits[api_name] = {
                "info": limit_info,
                "timestamp": datetime.now()
            }
            
            self.logger.debug(f"Rate limit for {api_name}: {limit_info}")
            
            # Check if approaching limit
            if "limit" in limit_info and "remaining" in limit_info:
                usage_percent = 1 - (limit_info["remaining"] / limit_info["limit"])
                
                if usage_percent > 0.8:
                    self.logger.warning(
                        f"Rate limit warning for {api_name}: "
                        f"{usage_percent:.1%} used"
                    )
    
    def get_wait_time(self, api_name: str) -> float:
        """Get wait time until rate limit resets."""
        if api_name not in self.limits:
            return 0
        
        limit_info = self.limits[api_name]["info"]
        
        # Check Retry-After
        if "retry_after" in limit_info:
            return limit_info["retry_after"]
        
        # Check reset time
        if "reset" in limit_info:
            reset_time = limit_info["reset"]
            wait_time = (reset_time - datetime.now()).total_seconds()
            return max(0, wait_time)
        
        return 0
    
    def is_limit_exceeded(self, api_name: str) -> bool:
        """Check if rate limit is exceeded."""
        if api_name not in self.limits:
            return False
        
        limit_info = self.limits[api_name]["info"]
        
        # Check remaining requests
        if "remaining" in limit_info and limit_info["remaining"] <= 0:
            return True
        
        return False

# ==================== Token Bucket Algorithm ====================

class TokenBucket:
    """
    Token bucket rate limiter.
    Allows burst traffic while maintaining average rate.
    """
    
    def __init__(self, capacity: int, refill_rate: float):
        """
        Args:
            capacity: Maximum number of tokens (burst size)
            refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        
        # Add tokens based on refill rate
        tokens_to_add = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now
    
    def consume(self, tokens: int = 1) -> bool:
        """
        Try to consume tokens.
        Returns True if successful, False if not enough tokens.
        """
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def wait_time(self, tokens: int = 1) -> float:
        """Get wait time until tokens are available."""
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                return 0
            
            tokens_needed = tokens - self.tokens
            wait_time = tokens_needed / self.refill_rate
            
            return wait_time
    
    def wait_and_consume(self, tokens: int = 1) -> bool:
        """Wait for tokens and consume them."""
        wait_time = self.wait_time(tokens)
        
        if wait_time > 0:
            time.sleep(wait_time)
        
        return self.consume(tokens)

# ==================== Sliding Window Algorithm ====================

class SlidingWindow:
    """
    Sliding window rate limiter.
    Tracks requests over a moving time window.
    """
    
    def __init__(self, window_size: int, max_requests: int):
        """
        Args:
            window_size: Window size in seconds
            max_requests: Maximum requests in window
        """
        self.window_size = window_size
        self.max_requests = max_requests
        self.requests = deque()
        self.lock = threading.Lock()
    
    def _clean_old_requests(self):
        """Remove requests outside the window."""
        now = time.time()
        cutoff = now - self.window_size
        
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
    
    def allow_request(self) -> bool:
        """Check if request is allowed."""
        with self.lock:
            now = time.time()
            self._clean_old_requests()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            
            return False
    
    def wait_time(self) -> float:
        """Get wait time until next request is allowed."""
        with self.lock:
            self._clean_old_requests()
            
            if len(self.requests) < self.max_requests:
                return 0
            
            # Wait until oldest request expires
            oldest = self.requests[0]
            wait_time = self.window_size - (time.time() - oldest)
            
            return max(0, wait_time)
    
    def get_usage(self) -> Dict[str, Any]:
        """Get current usage statistics."""
        with self.lock:
            self._clean_old_requests()
            
            return {
                "current_requests": len(self.requests),
                "max_requests": self.max_requests,
                "usage_percent": len(self.requests) / self.max_requests * 100,
                "window_size": self.window_size
            }

# ==================== Leaky Bucket Algorithm ====================

class LeakyBucket:
    """
    Leaky bucket rate limiter.
    Processes requests at a constant rate.
    """
    
    def __init__(self, capacity: int, leak_rate: float):
        """
        Args:
            capacity: Maximum bucket size
            leak_rate: Requests processed per second
        """
        self.capacity = capacity
        self.leak_rate = leak_rate
        self.queue = deque()
        self.last_leak = time.time()
        self.lock = threading.Lock()
        self.processing = False
    
    def _leak(self):
        """Process queued requests."""
        now = time.time()
        elapsed = now - self.last_leak
        
        # Calculate how many requests to process
        to_process = int(elapsed * self.leak_rate)
        
        for _ in range(min(to_process, len(self.queue))):
            if self.queue:
                self.queue.popleft()
        
        self.last_leak = now
    
    def add_request(self) -> bool:
        """Add request to bucket."""
        with self.lock:
            self._leak()
            
            if len(self.queue) < self.capacity:
                self.queue.append(time.time())
                return True
            
            return False
    
    def wait_time(self) -> float:
        """Get wait time for next available slot."""
        with self.lock:
            self._leak()
            
            if len(self.queue) < self.capacity:
                return 0
            
            # Calculate when next slot will be available
            wait_time = len(self.queue) / self.leak_rate
            
            return wait_time

# ==================== Concurrent Request Limiter ====================

class ConcurrentLimiter:
    """
    Limit concurrent requests.
    """
    
    def __init__(self, max_concurrent: int):
        self.max_concurrent = max_concurrent
        self.current = 0
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
    
    def acquire(self, timeout: Optional[float] = None) -> bool:
        """Acquire a slot for concurrent execution."""
        with self.condition:
            end_time = time.time() + timeout if timeout else None
            
            while self.current >= self.max_concurrent:
                if timeout:
                    remaining = end_time - time.time()
                    if remaining <= 0:
                        return False
                    
                    if not self.condition.wait(remaining):
                        return False
                else:
                    self.condition.wait()
            
            self.current += 1
            return True
    
    def release(self):
        """Release a concurrent execution slot."""
        with self.condition:
            self.current = max(0, self.current - 1)
            self.condition.notify()
    
    def __enter__(self):
        """Context manager entry."""
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()

# ==================== Distributed Rate Limiter ====================

class DistributedRateLimiter:
    """
    Distributed rate limiter using Redis.
    Supports multiple processes/servers.
    """
    
    def __init__(self, redis_client: redis.Redis, 
                 key_prefix: str = "rate_limit"):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.logger = logging.getLogger(__name__)
    
    def sliding_window_allow(self, identifier: str, 
                            window_seconds: int, 
                            max_requests: int) -> bool:
        """
        Check if request is allowed using sliding window in Redis.
        """
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()
        pipeline = self.redis.pipeline()
        
        try:
            # Remove old entries
            pipeline.zremrangebyscore(key, 0, now - window_seconds)
            
            # Count current entries
            pipeline.zcard(key)
            
            # Add current request
            pipeline.zadd(key, {str(now): now})
            
            # Set expiry
            pipeline.expire(key, window_seconds + 1)
            
            results = pipeline.execute()
            
            current_requests = results[1]
            
            # Check if within limit (we already added, so check +1)
            if current_requests < max_requests:
                return True
            else:
                # Remove the request we just added
                self.redis.zrem(key, str(now))
                return False
                
        except Exception as e:
            self.logger.error(f"Redis rate limit error: {e}")
            return True  # Fail open
    
    def get_usage(self, identifier: str, window_seconds: int) -> int:
        """Get current usage count."""
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()
        
        # Remove old entries and count
        pipeline = self.redis.pipeline()
        pipeline.zremrangebyscore(key, 0, now - window_seconds)
        pipeline.zcard(key)
        
        results = pipeline.execute()
        return results[1]

# ==================== Rate Limit Manager ====================

class RateLimitManager:
    """
    Comprehensive rate limit management system.
    """
    
    def __init__(self):
        self.limiters = {}
        self.tracker = RateLimitTracker()
        self.metrics = defaultdict(lambda: {
            "requests": 0,
            "throttled": 0,
            "errors": 0,
            "total_wait_time": 0
        })
        self.logger = logging.getLogger(__name__)
    
    def register_api(self, api_name: str, config: RateLimitConfig):
        """Register API with rate limit configuration."""
        
        # Create appropriate limiter
        if config.limit_type == RateLimitType.TOKEN_BUCKET:
            if config.requests_per_second:
                limiter = TokenBucket(
                    capacity=config.burst_size or int(config.requests_per_second * 10),
                    refill_rate=config.requests_per_second
                )
            else:
                raise ValueError("Token bucket requires requests_per_second")
                
        elif config.limit_type == RateLimitType.SLIDING_WINDOW:
            if config.requests_per_minute:
                limiter = SlidingWindow(
                    window_size=60,
                    max_requests=config.requests_per_minute
                )
            elif config.requests_per_hour:
                limiter = SlidingWindow(
                    window_size=3600,
                    max_requests=config.requests_per_hour
                )
            else:
                raise ValueError("Sliding window requires time-based limit")
                
        elif config.limit_type == RateLimitType.LEAKY_BUCKET:
            if config.requests_per_second:
                limiter = LeakyBucket(
                    capacity=config.burst_size or 100,
                    leak_rate=config.requests_per_second
                )
            else:
                raise ValueError("Leaky bucket requires requests_per_second")
                
        elif config.limit_type == RateLimitType.CONCURRENT:
            if config.max_concurrent:
                limiter = ConcurrentLimiter(config.max_concurrent)
            else:
                raise ValueError("Concurrent limiter requires max_concurrent")
        else:
            raise ValueError(f"Unsupported limit type: {config.limit_type}")
        
        self.limiters[api_name] = {
            "limiter": limiter,
            "config": config
        }
        
        self.logger.info(f"Registered {api_name} with {config.limit_type.value} rate limiting")
    
    def check_limit(self, api_name: str) -> bool:
        """Check if request is allowed."""
        if api_name not in self.limiters:
            return True
        
        limiter_info = self.limiters[api_name]
        limiter = limiter_info["limiter"]
        
        # Check based on limiter type
        if isinstance(limiter, TokenBucket):
            return limiter.consume()
        elif isinstance(limiter, SlidingWindow):
            return limiter.allow_request()
        elif isinstance(limiter, LeakyBucket):
            return limiter.add_request()
        elif isinstance(limiter, ConcurrentLimiter):
            return limiter.acquire(timeout=0)
        
        return True
    
    def wait_if_needed(self, api_name: str) -> float:
        """Wait if rate limit requires it."""
        if api_name not in self.limiters:
            return 0
        
        limiter_info = self.limiters[api_name]
        limiter = limiter_info["limiter"]
        config = limiter_info["config"]
        
        wait_time = 0
        
        # Get wait time based on limiter type
        if isinstance(limiter, TokenBucket):
            wait_time = limiter.wait_time()
        elif isinstance(limiter, SlidingWindow):
            wait_time = limiter.wait_time()
        elif isinstance(limiter, LeakyBucket):
            wait_time = limiter.wait_time()
        
        # Apply backoff if configured
        if wait_time > 0 and config.enable_backoff:
            wait_time = min(wait_time * config.backoff_factor, config.max_backoff)
        
        if wait_time > 0:
            self.logger.info(f"Rate limit wait for {api_name}: {wait_time:.2f}s")
            self.metrics[api_name]["throttled"] += 1
            self.metrics[api_name]["total_wait_time"] += wait_time
            time.sleep(wait_time)
        
        return wait_time
    
    def handle_response(self, api_name: str, response: requests.Response):
        """Handle response and update rate limit tracking."""
        # Update tracker from headers
        self.tracker.update_from_headers(api_name, dict(response.headers))
        
        # Update metrics
        self.metrics[api_name]["requests"] += 1
        
        # Check for rate limit error
        if response.status_code == 429:
            self.metrics[api_name]["errors"] += 1
            
            # Get wait time from tracker
            wait_time = self.tracker.get_wait_time(api_name)
            
            if wait_time > 0:
                self.logger.warning(f"Rate limit hit for {api_name}, waiting {wait_time}s")
                time.sleep(wait_time)
    
    def get_metrics(self, api_name: Optional[str] = None) -> Dict:
        """Get rate limiting metrics."""
        if api_name:
            return dict(self.metrics[api_name])
        return dict(self.metrics)

# ==================== Rate Limit Decorators ====================

def rate_limit(requests_per_second: float):
    """
    Decorator to rate limit function calls.
    """
    min_interval = 1.0 / requests_per_second
    last_called = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed
            
            if left_to_wait > 0:
                time.sleep(left_to_wait)
            
            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            
            return ret
        return wrapper
    return decorator

def retry_on_rate_limit(max_retries: int = 3, backoff_factor: float = 2.0):
    """
    Decorator to retry on rate limit errors.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            backoff = 1.0
            
            while retries < max_retries:
                try:
                    response = func(*args, **kwargs)
                    
                    if hasattr(response, 'status_code') and response.status_code == 429:
                        # Rate limited
                        retry_after = response.headers.get('Retry-After')
                        
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = backoff
                        
                        logging.warning(f"Rate limited, waiting {wait_time}s")
                        time.sleep(wait_time)
                        
                        retries += 1
                        backoff *= backoff_factor
                    else:
                        return response
                        
                except Exception as e:
                    logging.error(f"Error in rate limited call: {e}")
                    raise
            
            raise Exception(f"Max retries ({max_retries}) exceeded")
        return wrapper
    return decorator

# ==================== Async Rate Limiter ====================

class AsyncRateLimiter:
    """
    Asynchronous rate limiter for async/await code.
    """
    
    def __init__(self, rate: float, per: float = 1.0):
        """
        Args:
            rate: Number of requests
            per: Time period in seconds
        """
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """Acquire permission to make a request."""
        async with self.lock:
            current = time.time()
            time_passed = current - self.last_check
            self.last_check = current
            
            self.allowance += time_passed * (self.rate / self.per)
            
            if self.allowance > self.rate:
                self.allowance = self.rate
            
            if self.allowance < 1.0:
                sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
                await asyncio.sleep(sleep_time)
                self.allowance = 0.0
            else:
                self.allowance -= 1.0

# Example usage
if __name__ == "__main__":
    print("ā±ļø Rate Limiting Examples\n")
    
    # Example 1: Rate limit types
    print("1ļøāƒ£ Rate Limit Types:")
    
    limit_types = [
        ("Fixed Window", "Reset at fixed intervals", "100 req/hour from 2:00-3:00"),
        ("Sliding Window", "Rolling time window", "100 req in last 60 minutes"),
        ("Token Bucket", "Burst with average rate", "10 req/s with burst of 50"),
        ("Leaky Bucket", "Constant processing rate", "Process 5 req/s steadily"),
        ("Concurrent", "Parallel request limit", "Max 10 concurrent requests")
    ]
    
    for limit_type, description, example in limit_types:
        print(f"   {limit_type}:")
        print(f"     {description}")
        print(f"     Example: {example}\n")
    
    # Example 2: Token bucket demo
    print("2ļøāƒ£ Token Bucket Example:")
    
    bucket = TokenBucket(capacity=10, refill_rate=2)  # 2 tokens/second, burst of 10
    
    print("   Initial tokens: 10")
    print("   Refill rate: 2 tokens/second")
    print("   Making 5 rapid requests...")
    
    for i in range(5):
        if bucket.consume():
            print(f"     Request {i+1}: āœ… Allowed")
        else:
            print(f"     Request {i+1}: āŒ Denied")
    
    print(f"   Remaining tokens: {bucket.tokens:.1f}")
    
    # Example 3: Sliding window demo
    print("\n3ļøāƒ£ Sliding Window Example:")
    
    window = SlidingWindow(window_size=10, max_requests=5)  # 5 requests per 10 seconds
    
    print("   Window: 10 seconds")
    print("   Max requests: 5")
    
    for i in range(7):
        if window.allow_request():
            print(f"     Request {i+1}: āœ… Allowed")
        else:
            wait = window.wait_time()
            print(f"     Request {i+1}: āŒ Denied (wait {wait:.1f}s)")
    
    usage = window.get_usage()
    print(f"   Current usage: {usage['usage_percent']:.0f}%")
    
    # Example 4: Rate limit headers
    print("\n4ļøāƒ£ Common Rate Limit Headers:")
    
    headers = [
        ("X-RateLimit-Limit", "Maximum requests allowed"),
        ("X-RateLimit-Remaining", "Requests remaining in window"),
        ("X-RateLimit-Reset", "Unix timestamp when limit resets"),
        ("Retry-After", "Seconds to wait before retry"),
        ("X-RateLimit-Used", "Requests used in current window")
    ]
    
    for header, description in headers:
        print(f"   {header}:")
        print(f"     {description}")
    
    # Example 5: Rate limit manager
    print("\n5ļøāƒ£ Rate Limit Manager:")
    
    manager = RateLimitManager()
    
    # Register APIs with different limits
    manager.register_api("github", RateLimitConfig(
        limit_type=RateLimitType.SLIDING_WINDOW,
        requests_per_hour=5000
    ))
    
    manager.register_api("twitter", RateLimitConfig(
        limit_type=RateLimitType.TOKEN_BUCKET,
        requests_per_second=1,
        burst_size=10
    ))
    
    manager.register_api("stripe", RateLimitConfig(
        limit_type=RateLimitType.CONCURRENT,
        max_concurrent=25
    ))
    
    print("   Registered APIs:")
    print("     • GitHub: 5000 req/hour (sliding window)")
    print("     • Twitter: 1 req/s with burst of 10 (token bucket)")
    print("     • Stripe: Max 25 concurrent requests")
    
    # Example 6: Backoff strategies
    print("\n6ļøāƒ£ Backoff Strategies:")
    
    strategies = [
        ("Linear", "Wait fixed time", "1s, 1s, 1s..."),
        ("Exponential", "Double wait time", "1s, 2s, 4s, 8s..."),
        ("Fibonacci", "Fibonacci sequence", "1s, 1s, 2s, 3s, 5s..."),
        ("Jittered", "Random variation", "1-2s, 2-4s, 4-8s..."),
        ("Decorrelated", "Based on previous", "Random based on last")
    ]
    
    for strategy, description, example in strategies:
        print(f"   {strategy}:")
        print(f"     {description}")
        print(f"     Pattern: {example}")
    
    # Example 7: Handling 429 errors
    print("\n7ļøāƒ£ Handling 429 (Too Many Requests):")
    
    print("   @retry_on_rate_limit(max_retries=3)")
    print("   def make_api_call():")
    print("       response = requests.get(url)")
    print("       if response.status_code == 429:")
    print("           # Automatic retry with backoff")
    print("       return response")
    
    # Example 8: Best practices
    print("\n8ļøāƒ£ Rate Limiting Best Practices:")
    
    practices = [
        "šŸ“Š Monitor rate limit headers",
        "ā° Implement proper backoff",
        "šŸ”„ Use retry logic wisely",
        "šŸ’¾ Cache responses when possible",
        "šŸŽÆ Batch requests when allowed",
        "šŸ“ˆ Track usage metrics",
        "šŸ”” Set up alerts for limits",
        "🌐 Use distributed limiting for scale",
        "šŸ“ Log all rate limit events",
        "šŸ›”ļø Fail gracefully when limited"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 9: Optimization tips
    print("\n9ļøāƒ£ Rate Limit Optimization:")
    
    tips = [
        "Prioritize critical requests",
        "Use webhooks instead of polling",
        "Implement request queuing",
        "Spread requests over time",
        "Use multiple API keys/tokens",
        "Cache frequently accessed data",
        "Batch operations when possible",
        "Use conditional requests (ETags)"
    ]
    
    for tip in tips:
        print(f"   • {tip}")
    
    # Example 10: Metrics and monitoring
    print("\nšŸ”Ÿ Rate Limit Metrics:")
    
    sample_metrics = {
        "github": {
            "requests": 1523,
            "throttled": 12,
            "errors": 2,
            "total_wait_time": 45.3,
            "avg_wait": 3.78
        }
    }
    
    print("   Sample metrics for GitHub API:")
    for metric, value in sample_metrics["github"].items():
        print(f"     {metric}: {value}")
    
    print("\nāœ… Rate limiting demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Rate Limiting Best Practices šŸ“‹

Pro Tip: Think of rate limiting as driving in traffic - you need to maintain safe distances, respect speed limits, and adjust to conditions. Always check response headers for rate limit information - APIs tell you their limits if you listen. Implement proper backoff strategies - hammering a rate-limited API is like honking in traffic, it doesn't help and annoys everyone. Choose the right algorithm: token bucket for burst traffic, sliding window for steady rates, concurrent limits for parallel requests. Cache aggressively to reduce API calls - why ask for the same data twice? Queue non-urgent requests to spread load over time. Monitor your usage patterns to identify optimization opportunities. When you hit limits, fail gracefully with user-friendly messages. Consider using multiple API keys or upgrading your plan if you consistently hit limits. Most importantly: rate limits exist to ensure fair usage and service stability - respect them as a good API citizen!

Mastering rate limiting ensures your API integrations are reliable, respectful, and resilient. You can now build systems that maximize throughput while staying within limits, handle rate limit responses gracefully, and optimize API usage patterns. Whether you're building high-volume data pipelines or real-time applications, these rate limiting skills keep your API connections running smoothly! ⚔