Skip to main content

🌐 RESTful API Consumption: Master the Art of API Integration

RESTful APIs are the lingua franca of modern web services - they're how applications talk to each other across the internet. Like a skilled diplomat who speaks multiple languages, mastering API consumption allows your applications to integrate with any service, from social media platforms to payment gateways. Whether you're building integrations, automating workflows, or aggregating data, RESTful API consumption is your gateway to the connected world! šŸ”—

The RESTful API Architecture

REST (Representational State Transfer) is like a universal protocol for web services - it uses standard HTTP methods, status codes, and conventions that everyone understands. Think of it as a well-organized library where every resource has an address (URL), and you interact with it using standard verbs (GET, POST, PUT, DELETE). Master these patterns, and you can integrate with virtually any modern API!

graph TB A[RESTful API] --> B[HTTP Methods] A --> C[Resources & URLs] A --> D[Request Components] A --> E[Response Handling] B --> F[GET - Retrieve] B --> G[POST - Create] B --> H[PUT - Update] B --> I[DELETE - Remove] B --> J[PATCH - Partial Update] C --> K[Endpoints] C --> L[Path Parameters] C --> M[Query Parameters] C --> N[Resource Nesting] D --> O[Headers] D --> P[Body/Payload] D --> Q[Authentication] D --> R[Content Types] E --> S[Status Codes] E --> T[Response Body] E --> U[Error Handling] E --> V[Pagination] W[API Patterns] --> X[CRUD Operations] W --> Y[Filtering & Sorting] W --> Z[Versioning] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style W fill:#ff6b6b

Real-World Scenario: The Universal API Client šŸš€

You're building a comprehensive API client that can integrate with multiple services - social media APIs for content posting, payment gateways for transactions, weather services for data, and internal microservices for business logic. Your client must handle authentication, rate limiting, retries, pagination, file uploads, webhooks, and provide detailed logging and monitoring. Let's build a production-ready API consumption framework!

# First, install required packages:
# pip install requests httpx aiohttp pydantic tenacity python-dotenv

import requests
import json
import time
import logging
from typing import Dict, List, Optional, Any, Union, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from enum import Enum
from urllib.parse import urljoin, urlencode, urlparse, parse_qs
import hashlib
import hmac
from datetime import datetime, timedelta
from pathlib import Path
import asyncio
import aiohttp
from functools import wraps
import backoff
from tenacity import retry, stop_after_attempt, wait_exponential

# Type hints
T = TypeVar('T')

# ==================== HTTP Methods & Status Codes ====================

class HTTPMethod(Enum):
    """HTTP methods for RESTful APIs."""
    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
    PATCH = "PATCH"
    HEAD = "HEAD"
    OPTIONS = "OPTIONS"

class HTTPStatus:
    """HTTP status codes with descriptions."""
    # Success
    OK = 200  # Success
    CREATED = 201  # Resource created
    ACCEPTED = 202  # Request accepted for processing
    NO_CONTENT = 204  # Success with no content
    
    # Redirection
    MOVED_PERMANENTLY = 301
    FOUND = 302
    NOT_MODIFIED = 304
    
    # Client errors
    BAD_REQUEST = 400  # Invalid request
    UNAUTHORIZED = 401  # Authentication required
    FORBIDDEN = 403  # Access denied
    NOT_FOUND = 404  # Resource not found
    METHOD_NOT_ALLOWED = 405
    CONFLICT = 409  # Resource conflict
    UNPROCESSABLE_ENTITY = 422  # Validation error
    TOO_MANY_REQUESTS = 429  # Rate limit exceeded
    
    # Server errors
    INTERNAL_SERVER_ERROR = 500
    BAD_GATEWAY = 502
    SERVICE_UNAVAILABLE = 503
    GATEWAY_TIMEOUT = 504

# ==================== API Configuration ====================

@dataclass
class APIConfig:
    """Configuration for API client."""
    base_url: str
    timeout: int = 30
    max_retries: int = 3
    retry_backoff: float = 1.0
    verify_ssl: bool = True
    
    # Headers
    headers: Dict[str, str] = field(default_factory=dict)
    user_agent: str = "Python API Client/1.0"
    
    # Authentication
    auth_type: Optional[str] = None  # 'bearer', 'api_key', 'basic', 'oauth2'
    auth_credentials: Dict[str, str] = field(default_factory=dict)
    
    # Rate limiting
    rate_limit: Optional[int] = None  # Requests per second
    rate_limit_window: int = 1  # Window in seconds
    
    # Logging
    enable_logging: bool = True
    log_level: str = "INFO"
    log_requests: bool = True
    log_responses: bool = True
    
    # Proxy
    proxy: Optional[str] = None
    
    # Pagination
    page_size: int = 100
    max_pages: Optional[int] = None

# ==================== API Response ====================

@dataclass
class APIResponse:
    """Structured API response."""
    status_code: int
    headers: Dict[str, str]
    body: Any
    elapsed_time: float
    request_url: str
    request_method: str
    
    @property
    def is_success(self) -> bool:
        """Check if response is successful."""
        return 200 <= self.status_code < 300
    
    @property
    def is_error(self) -> bool:
        """Check if response is an error."""
        return self.status_code >= 400
    
    @property
    def json(self) -> Optional[Dict]:
        """Get JSON body if available."""
        if isinstance(self.body, dict):
            return self.body
        try:
            return json.loads(self.body) if self.body else None
        except (json.JSONDecodeError, TypeError):
            return None
    
    @property
    def text(self) -> str:
        """Get body as text."""
        if isinstance(self.body, str):
            return self.body
        elif isinstance(self.body, bytes):
            return self.body.decode('utf-8', errors='ignore')
        else:
            return str(self.body)

# ==================== Base API Client ====================

class BaseAPIClient:
    """
    Base REST API client with common functionality.
    """
    
    def __init__(self, config: APIConfig):
        self.config = config
        self.session = self._create_session()
        self.logger = self._setup_logging()
        
        # Rate limiting
        self.last_request_time = 0
        self.request_count = 0
        
        # Metrics
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_time": 0,
            "errors": []
        }
    
    def _create_session(self) -> requests.Session:
        """Create configured requests session."""
        session = requests.Session()
        
        # Set headers
        session.headers.update({
            "User-Agent": self.config.user_agent,
            "Accept": "application/json",
            "Content-Type": "application/json"
        })
        
        if self.config.headers:
            session.headers.update(self.config.headers)
        
        # Set authentication
        self._setup_authentication(session)
        
        # Set proxy
        if self.config.proxy:
            session.proxies = {
                "http": self.config.proxy,
                "https": self.config.proxy
            }
        
        # SSL verification
        session.verify = self.config.verify_ssl
        
        return session
    
    def _setup_logging(self) -> logging.Logger:
        """Setup logging for API client."""
        logger = logging.getLogger(self.__class__.__name__)
        
        if self.config.enable_logging:
            logger.setLevel(getattr(logging, self.config.log_level))
            
            if not logger.handlers:
                handler = logging.StreamHandler()
                formatter = logging.Formatter(
                    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
                )
                handler.setFormatter(formatter)
                logger.addHandler(handler)
        
        return logger
    
    def _setup_authentication(self, session: requests.Session):
        """Setup authentication for session."""
        auth_type = self.config.auth_type
        creds = self.config.auth_credentials
        
        if auth_type == "bearer":
            token = creds.get("token")
            if token:
                session.headers["Authorization"] = f"Bearer {token}"
        
        elif auth_type == "api_key":
            key_name = creds.get("key_name", "X-API-Key")
            key_value = creds.get("key_value")
            if key_value:
                session.headers[key_name] = key_value
        
        elif auth_type == "basic":
            username = creds.get("username")
            password = creds.get("password")
            if username and password:
                session.auth = (username, password)
        
        elif auth_type == "oauth2":
            # OAuth2 is more complex, simplified here
            access_token = creds.get("access_token")
            if access_token:
                session.headers["Authorization"] = f"Bearer {access_token}"
    
    def _apply_rate_limiting(self):
        """Apply rate limiting if configured."""
        if not self.config.rate_limit:
            return
        
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        
        if time_since_last < self.config.rate_limit_window:
            self.request_count += 1
            
            if self.request_count > self.config.rate_limit:
                sleep_time = self.config.rate_limit_window - time_since_last
                self.logger.debug(f"Rate limit reached, sleeping {sleep_time:.2f}s")
                time.sleep(sleep_time)
                self.request_count = 1
        else:
            self.request_count = 1
        
        self.last_request_time = time.time()
    
    def _build_url(self, endpoint: str, path_params: Optional[Dict] = None) -> str:
        """Build complete URL with path parameters."""
        # Format path parameters
        if path_params:
            for key, value in path_params.items():
                endpoint = endpoint.replace(f"{{{key}}}", str(value))
        
        # Join with base URL
        return urljoin(self.config.base_url, endpoint)
    
    def _log_request(self, method: str, url: str, **kwargs):
        """Log outgoing request."""
        if self.config.log_requests:
            self.logger.info(f"→ {method} {url}")
            
            if kwargs.get("params"):
                self.logger.debug(f"  Query params: {kwargs['params']}")
            
            if kwargs.get("json"):
                self.logger.debug(f"  Body: {json.dumps(kwargs['json'], indent=2)}")
    
    def _log_response(self, response: APIResponse):
        """Log incoming response."""
        if self.config.log_responses:
            status_emoji = "āœ…" if response.is_success else "āŒ"
            self.logger.info(
                f"← {status_emoji} {response.status_code} "
                f"({response.elapsed_time:.2f}s)"
            )
            
            if response.json:
                self.logger.debug(f"  Response: {json.dumps(response.json, indent=2)}")
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10)
    )
    def _make_request(self, method: HTTPMethod, endpoint: str, 
                     path_params: Optional[Dict] = None,
                     query_params: Optional[Dict] = None,
                     body: Optional[Union[Dict, str]] = None,
                     headers: Optional[Dict] = None,
                     files: Optional[Dict] = None) -> APIResponse:
        """
        Make HTTP request with retries and error handling.
        """
        # Apply rate limiting
        self._apply_rate_limiting()
        
        # Build URL
        url = self._build_url(endpoint, path_params)
        
        # Prepare request
        request_kwargs = {
            "timeout": self.config.timeout,
            "params": query_params
        }
        
        if headers:
            request_kwargs["headers"] = {**self.session.headers, **headers}
        
        if files:
            request_kwargs["files"] = files
        elif body:
            if isinstance(body, dict):
                request_kwargs["json"] = body
            else:
                request_kwargs["data"] = body
        
        # Log request
        self._log_request(method.value, url, **request_kwargs)
        
        # Make request
        start_time = time.time()
        
        try:
            response = self.session.request(
                method.value,
                url,
                **request_kwargs
            )
            
            elapsed_time = time.time() - start_time
            
            # Parse response
            try:
                response_body = response.json()
            except:
                response_body = response.text
            
            api_response = APIResponse(
                status_code=response.status_code,
                headers=dict(response.headers),
                body=response_body,
                elapsed_time=elapsed_time,
                request_url=url,
                request_method=method.value
            )
            
            # Log response
            self._log_response(api_response)
            
            # Update metrics
            self.metrics["total_requests"] += 1
            self.metrics["total_time"] += elapsed_time
            
            if api_response.is_success:
                self.metrics["successful_requests"] += 1
            else:
                self.metrics["failed_requests"] += 1
                
                # Raise for status if error
                response.raise_for_status()
            
            return api_response
            
        except requests.exceptions.RequestException as e:
            self.metrics["failed_requests"] += 1
            self.metrics["errors"].append(str(e))
            
            self.logger.error(f"Request failed: {e}")
            raise
    
    # -------------------- HTTP Methods --------------------
    
    def get(self, endpoint: str, **kwargs) -> APIResponse:
        """Make GET request."""
        return self._make_request(HTTPMethod.GET, endpoint, **kwargs)
    
    def post(self, endpoint: str, **kwargs) -> APIResponse:
        """Make POST request."""
        return self._make_request(HTTPMethod.POST, endpoint, **kwargs)
    
    def put(self, endpoint: str, **kwargs) -> APIResponse:
        """Make PUT request."""
        return self._make_request(HTTPMethod.PUT, endpoint, **kwargs)
    
    def patch(self, endpoint: str, **kwargs) -> APIResponse:
        """Make PATCH request."""
        return self._make_request(HTTPMethod.PATCH, endpoint, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> APIResponse:
        """Make DELETE request."""
        return self._make_request(HTTPMethod.DELETE, endpoint, **kwargs)

# ==================== Pagination Handler ====================

class PaginationHandler:
    """
    Handle various pagination strategies.
    """
    
    def __init__(self, client: BaseAPIClient):
        self.client = client
        self.logger = logging.getLogger(__name__)
    
    def paginate_offset(self, endpoint: str, 
                        offset_param: str = "offset",
                        limit_param: str = "limit",
                        data_key: str = "data",
                        **kwargs) -> List[Any]:
        """
        Handle offset-based pagination.
        
        Example: ?offset=0&limit=100
        """
        all_data = []
        offset = 0
        limit = self.client.config.page_size
        page_count = 0
        
        while True:
            # Check max pages
            if self.client.config.max_pages and page_count >= self.client.config.max_pages:
                break
            
            # Set pagination params
            params = kwargs.get("query_params", {})
            params[offset_param] = offset
            params[limit_param] = limit
            kwargs["query_params"] = params
            
            # Make request
            response = self.client.get(endpoint, **kwargs)
            
            if not response.is_success:
                break
            
            # Extract data
            data = response.json
            if data_key:
                page_data = data.get(data_key, [])
            else:
                page_data = data if isinstance(data, list) else [data]
            
            if not page_data:
                break
            
            all_data.extend(page_data)
            
            # Update offset
            offset += limit
            page_count += 1
            
            # Check if last page
            if len(page_data) < limit:
                break
            
            self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
        
        return all_data
    
    def paginate_page_number(self, endpoint: str,
                            page_param: str = "page",
                            size_param: str = "page_size",
                            data_key: str = "data",
                            total_pages_key: str = "total_pages",
                            **kwargs) -> List[Any]:
        """
        Handle page number-based pagination.
        
        Example: ?page=1&page_size=100
        """
        all_data = []
        page = 1
        page_size = self.client.config.page_size
        total_pages = None
        
        while True:
            # Check max pages
            if self.client.config.max_pages and page > self.client.config.max_pages:
                break
            
            # Check total pages if known
            if total_pages and page > total_pages:
                break
            
            # Set pagination params
            params = kwargs.get("query_params", {})
            params[page_param] = page
            params[size_param] = page_size
            kwargs["query_params"] = params
            
            # Make request
            response = self.client.get(endpoint, **kwargs)
            
            if not response.is_success:
                break
            
            # Extract data
            data = response.json
            if data_key:
                page_data = data.get(data_key, [])
            else:
                page_data = data if isinstance(data, list) else [data]
            
            # Get total pages if available
            if total_pages_key in data:
                total_pages = data[total_pages_key]
            
            if not page_data:
                break
            
            all_data.extend(page_data)
            page += 1
            
            self.logger.debug(f"Fetched page {page-1}, total items: {len(all_data)}")
        
        return all_data
    
    def paginate_cursor(self, endpoint: str,
                       cursor_param: str = "cursor",
                       next_cursor_key: str = "next_cursor",
                       data_key: str = "data",
                       **kwargs) -> List[Any]:
        """
        Handle cursor-based pagination.
        
        Example: ?cursor=eyJpZCI6MTAwfQ==
        """
        all_data = []
        cursor = None
        page_count = 0
        
        while True:
            # Check max pages
            if self.client.config.max_pages and page_count >= self.client.config.max_pages:
                break
            
            # Set cursor param
            params = kwargs.get("query_params", {})
            if cursor:
                params[cursor_param] = cursor
            kwargs["query_params"] = params
            
            # Make request
            response = self.client.get(endpoint, **kwargs)
            
            if not response.is_success:
                break
            
            # Extract data
            data = response.json
            if data_key:
                page_data = data.get(data_key, [])
            else:
                page_data = data if isinstance(data, list) else [data]
            
            if not page_data:
                break
            
            all_data.extend(page_data)
            
            # Get next cursor
            cursor = data.get(next_cursor_key)
            if not cursor:
                break
            
            page_count += 1
            
            self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
        
        return all_data
    
    def paginate_link_header(self, endpoint: str,
                            data_key: Optional[str] = None,
                            **kwargs) -> List[Any]:
        """
        Handle Link header-based pagination (GitHub style).
        
        Example: Link: ; rel="next"
        """
        all_data = []
        url = self.client._build_url(endpoint)
        page_count = 0
        
        while url:
            # Check max pages
            if self.client.config.max_pages and page_count >= self.client.config.max_pages:
                break
            
            # Make request directly with URL
            response = self.client.get(url, **kwargs)
            
            if not response.is_success:
                break
            
            # Extract data
            data = response.json
            if data_key:
                page_data = data.get(data_key, [])
            else:
                page_data = data if isinstance(data, list) else [data]
            
            if page_data:
                all_data.extend(page_data)
            
            # Parse Link header for next URL
            link_header = response.headers.get("Link", "")
            url = self._parse_link_header(link_header, "next")
            
            page_count += 1
            
            self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
        
        return all_data
    
    def _parse_link_header(self, link_header: str, rel: str) -> Optional[str]:
        """Parse Link header for specific rel."""
        if not link_header:
            return None
        
        links = link_header.split(",")
        for link in links:
            parts = link.split(";")
            if len(parts) == 2:
                url = parts[0].strip()[1:-1]  # Remove < >
                rel_part = parts[1].strip()
                if f'rel="{rel}"' in rel_part:
                    return url
        
        return None

# ==================== Advanced API Client ====================

class AdvancedAPIClient(BaseAPIClient):
    """
    Advanced API client with additional features.
    """
    
    def __init__(self, config: APIConfig):
        super().__init__(config)
        self.pagination = PaginationHandler(self)
    
    def batch_request(self, requests: List[Dict]) -> List[APIResponse]:
        """
        Execute multiple requests in batch.
        
        Args:
            requests: List of request dictionaries with method, endpoint, etc.
        """
        responses = []
        
        for request in requests:
            method = HTTPMethod(request.get("method", "GET"))
            endpoint = request["endpoint"]
            
            response = self._make_request(
                method,
                endpoint,
                path_params=request.get("path_params"),
                query_params=request.get("query_params"),
                body=request.get("body"),
                headers=request.get("headers")
            )
            
            responses.append(response)
        
        return responses
    
    def upload_file(self, endpoint: str, file_path: str, 
                    field_name: str = "file",
                    additional_data: Optional[Dict] = None) -> APIResponse:
        """Upload file to API endpoint."""
        with open(file_path, "rb") as f:
            files = {field_name: f}
            
            return self.post(
                endpoint,
                files=files,
                body=additional_data
            )
    
    def download_file(self, endpoint: str, output_path: str, **kwargs) -> bool:
        """Download file from API endpoint."""
        response = self.get(endpoint, **kwargs)
        
        if response.is_success:
            # Check if response is file
            content_type = response.headers.get("Content-Type", "")
            
            if "application/json" not in content_type:
                # Save as file
                with open(output_path, "wb") as f:
                    if isinstance(response.body, bytes):
                        f.write(response.body)
                    else:
                        f.write(response.body.encode())
                
                self.logger.info(f"File downloaded to {output_path}")
                return True
        
        return False
    
    def stream_response(self, endpoint: str, chunk_size: int = 1024, **kwargs):
        """Stream response for large data."""
        url = self._build_url(endpoint)
        
        with self.session.get(url, stream=True, **kwargs) as response:
            response.raise_for_status()
            
            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    yield chunk

# ==================== Async API Client ====================

class AsyncAPIClient:
    """
    Asynchronous API client for high-performance operations.
    """
    
    def __init__(self, config: APIConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    async def get(self, endpoint: str, **kwargs) -> Dict:
        """Async GET request."""
        async with aiohttp.ClientSession() as session:
            url = urljoin(self.config.base_url, endpoint)
            
            async with session.get(
                url,
                headers=self.config.headers,
                timeout=aiohttp.ClientTimeout(total=self.config.timeout),
                **kwargs
            ) as response:
                return await response.json()
    
    async def post(self, endpoint: str, data: Dict, **kwargs) -> Dict:
        """Async POST request."""
        async with aiohttp.ClientSession() as session:
            url = urljoin(self.config.base_url, endpoint)
            
            async with session.post(
                url,
                json=data,
                headers=self.config.headers,
                timeout=aiohttp.ClientTimeout(total=self.config.timeout),
                **kwargs
            ) as response:
                return await response.json()
    
    async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
        """Execute multiple requests concurrently."""
        tasks = []
        
        async with aiohttp.ClientSession() as session:
            for request in requests:
                method = request.get("method", "GET").lower()
                endpoint = request["endpoint"]
                url = urljoin(self.config.base_url, endpoint)
                
                if method == "get":
                    task = session.get(url, **request.get("kwargs", {}))
                elif method == "post":
                    task = session.post(url, json=request.get("data"), **request.get("kwargs", {}))
                else:
                    continue
                
                tasks.append(task)
            
            responses = await asyncio.gather(*tasks)
            
            results = []
            for response in responses:
                results.append(await response.json())
            
            return results

# ==================== API Examples ====================

class GitHubAPIClient(AdvancedAPIClient):
    """Example: GitHub API client."""
    
    def __init__(self, token: str):
        config = APIConfig(
            base_url="https://api.github.com",
            auth_type="bearer",
            auth_credentials={"token": token},
            headers={"Accept": "application/vnd.github.v3+json"}
        )
        super().__init__(config)
    
    def get_user(self, username: str) -> APIResponse:
        """Get user information."""
        return self.get(f"/users/{username}")
    
    def get_user_repos(self, username: str) -> List[Dict]:
        """Get all user repositories with pagination."""
        return self.pagination.paginate_page_number(
            f"/users/{username}/repos",
            page_param="page",
            size_param="per_page",
            data_key=None  # Response is directly a list
        )
    
    def create_repo(self, name: str, description: str = "", private: bool = False) -> APIResponse:
        """Create a new repository."""
        return self.post("/user/repos", body={
            "name": name,
            "description": description,
            "private": private
        })

class JSONPlaceholderClient(AdvancedAPIClient):
    """Example: JSONPlaceholder API client."""
    
    def __init__(self):
        config = APIConfig(
            base_url="https://jsonplaceholder.typicode.com"
        )
        super().__init__(config)
    
    def get_posts(self) -> List[Dict]:
        """Get all posts."""
        response = self.get("/posts")
        return response.json if response.is_success else []
    
    def get_post(self, post_id: int) -> Optional[Dict]:
        """Get single post."""
        response = self.get(f"/posts/{post_id}")
        return response.json if response.is_success else None
    
    def create_post(self, title: str, body: str, user_id: int) -> APIResponse:
        """Create new post."""
        return self.post("/posts", body={
            "title": title,
            "body": body,
            "userId": user_id
        })
    
    def update_post(self, post_id: int, title: str, body: str, user_id: int) -> APIResponse:
        """Update existing post."""
        return self.put(f"/posts/{post_id}", body={
            "id": post_id,
            "title": title,
            "body": body,
            "userId": user_id
        })
    
    def delete_post(self, post_id: int) -> APIResponse:
        """Delete post."""
        return self.delete(f"/posts/{post_id}")

# Example usage
if __name__ == "__main__":
    print("🌐 RESTful API Consumption Examples\n")
    
    # Example 1: Basic API setup
    print("1ļøāƒ£ Basic API Configuration:")
    
    config = APIConfig(
        base_url="https://api.example.com",
        timeout=30,
        auth_type="bearer",
        auth_credentials={"token": "your-api-token"},
        rate_limit=10,  # 10 requests per second
        enable_logging=True
    )
    
    print(f"   Base URL: {config.base_url}")
    print(f"   Auth Type: {config.auth_type}")
    print(f"   Rate Limit: {config.rate_limit} req/s")
    print(f"   Timeout: {config.timeout}s")
    
    # Example 2: HTTP methods
    print("\n2ļøāƒ£ RESTful HTTP Methods:")
    
    methods = [
        ("GET", "Retrieve resource(s)", "/users/123"),
        ("POST", "Create new resource", "/users"),
        ("PUT", "Update entire resource", "/users/123"),
        ("PATCH", "Partial update", "/users/123"),
        ("DELETE", "Remove resource", "/users/123")
    ]
    
    for method, description, example in methods:
        print(f"   {method}: {description}")
        print(f"          Example: {method} {example}")
    
    # Example 3: Common status codes
    print("\n3ļøāƒ£ Common HTTP Status Codes:")
    
    status_codes = [
        (200, "OK", "Request successful"),
        (201, "Created", "Resource created"),
        (204, "No Content", "Success, no body"),
        (400, "Bad Request", "Invalid request"),
        (401, "Unauthorized", "Authentication required"),
        (403, "Forbidden", "Access denied"),
        (404, "Not Found", "Resource doesn't exist"),
        (429, "Too Many Requests", "Rate limit exceeded"),
        (500, "Server Error", "Internal error")
    ]
    
    for code, name, description in status_codes:
        print(f"   {code} {name}: {description}")
    
    # Example 4: JSONPlaceholder demo
    print("\n4ļøāƒ£ JSONPlaceholder API Demo:")
    
    client = JSONPlaceholderClient()
    
    # Get posts
    print("   Fetching posts...")
    posts = client.get_posts()
    print(f"   Found {len(posts)} posts")
    
    if posts:
        first_post = posts[0]
        print(f"   First post: '{first_post['title'][:50]}...'")
    
    # Example 5: Pagination strategies
    print("\n5ļøāƒ£ Pagination Strategies:")
    
    strategies = [
        ("Offset-based", "?offset=20&limit=10"),
        ("Page number", "?page=3&page_size=10"),
        ("Cursor-based", "?cursor=eyJpZCI6MTAwfQ=="),
        ("Link header", "Link: ; rel='next'"),
        ("Keyset", "?after_id=100&limit=10")
    ]
    
    for strategy, example in strategies:
        print(f"   {strategy}:")
        print(f"     Example: {example}")
    
    # Example 6: Authentication methods
    print("\n6ļøāƒ£ API Authentication Methods:")
    
    auth_methods = [
        ("API Key", "X-API-Key: your-api-key"),
        ("Bearer Token", "Authorization: Bearer token"),
        ("Basic Auth", "Authorization: Basic base64(user:pass)"),
        ("OAuth 2.0", "Complex flow with tokens"),
        ("JWT", "JSON Web Tokens"),
        ("HMAC", "Request signing with secret")
    ]
    
    for method, example in auth_methods:
        print(f"   {method}:")
        print(f"     {example}")
    
    # Example 7: Error handling
    print("\n7ļøāƒ£ Error Handling Patterns:")
    
    patterns = [
        "Retry with exponential backoff",
        "Circuit breaker for failing endpoints",
        "Fallback to cached data",
        "Graceful degradation",
        "Detailed error logging",
        "User-friendly error messages"
    ]
    
    for pattern in patterns:
        print(f"   • {pattern}")
    
    # Example 8: Performance optimization
    print("\n8ļøāƒ£ Performance Optimization:")
    
    optimizations = [
        "Connection pooling with sessions",
        "Async/concurrent requests",
        "Response caching",
        "Request batching",
        "Compression (gzip)",
        "Selective field retrieval"
    ]
    
    for optimization in optimizations:
        print(f"   • {optimization}")
    
    # Example 9: Best practices
    print("\n9ļøāƒ£ API Consumption Best Practices:")
    
    practices = [
        "āœ… Always handle errors gracefully",
        "ā±ļø Set appropriate timeouts",
        "šŸ”„ Implement retry logic",
        "šŸ“Š Monitor API metrics",
        "šŸ” Secure API credentials",
        "šŸ“ Log requests for debugging",
        "šŸ’¾ Cache responses when possible",
        "šŸŽÆ Use specific field selection",
        "šŸ“¦ Batch requests when available",
        "šŸ“– Follow API documentation"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 10: Async example
    print("\nšŸ”Ÿ Async API Client Example:")
    
    print("   async def fetch_multiple_apis():")
    print("       tasks = [")
    print("           client.get('/endpoint1'),")
    print("           client.get('/endpoint2'),")
    print("           client.get('/endpoint3')")
    print("       ]")
    print("       results = await asyncio.gather(*tasks)")
    print("       return results")
    
    print("\nāœ… RESTful API consumption demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

RESTful API Best Practices šŸ“‹

Pro Tip: Think of API consumption as having a conversation with a remote service - you need to speak the same language, follow the protocol, and handle misunderstandings gracefully. Always use session objects to reuse connections - creating new connections for each request is like hanging up and calling back for every sentence. Implement exponential backoff for retries - if the API doesn't respond, don't keep knocking frantically. Respect rate limits like speed limits on a highway - going too fast gets you blocked. Handle pagination properly - APIs give you data in pages for a reason. Log requests and responses for debugging, but be careful with sensitive data. Use async operations when making multiple API calls - why wait in line when you can do things in parallel? Cache responses intelligently to reduce API calls and improve performance. Most importantly: read the API documentation thoroughly - every API has its quirks and special requirements!

Mastering RESTful API consumption opens up a world of possibilities for integration and automation. You can now connect to any web service, aggregate data from multiple sources, and build powerful integrations. Whether you're building microservices, automating workflows, or creating data pipelines, these API skills are your passport to the connected world! šŸŒ