š RESTful API Consumption: Master the Art of API Integration
RESTful APIs are the lingua franca of modern web services - they're how applications talk to each other across the internet. Like a skilled diplomat who speaks multiple languages, mastering API consumption allows your applications to integrate with any service, from social media platforms to payment gateways. Whether you're building integrations, automating workflows, or aggregating data, RESTful API consumption is your gateway to the connected world! š
The RESTful API Architecture
REST (Representational State Transfer) is like a universal protocol for web services - it uses standard HTTP methods, status codes, and conventions that everyone understands. Think of it as a well-organized library where every resource has an address (URL), and you interact with it using standard verbs (GET, POST, PUT, DELETE). Master these patterns, and you can integrate with virtually any modern API!
Real-World Scenario: The Universal API Client š
You're building a comprehensive API client that can integrate with multiple services - social media APIs for content posting, payment gateways for transactions, weather services for data, and internal microservices for business logic. Your client must handle authentication, rate limiting, retries, pagination, file uploads, webhooks, and provide detailed logging and monitoring. Let's build a production-ready API consumption framework!
# First, install required packages:
# pip install requests httpx aiohttp pydantic tenacity python-dotenv
import requests
import json
import time
import logging
from typing import Dict, List, Optional, Any, Union, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from enum import Enum
from urllib.parse import urljoin, urlencode, urlparse, parse_qs
import hashlib
import hmac
from datetime import datetime, timedelta
from pathlib import Path
import asyncio
import aiohttp
from functools import wraps
import backoff
from tenacity import retry, stop_after_attempt, wait_exponential
# Type hints
T = TypeVar('T')
# ==================== HTTP Methods & Status Codes ====================
class HTTPMethod(Enum):
"""HTTP methods for RESTful APIs."""
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
HEAD = "HEAD"
OPTIONS = "OPTIONS"
class HTTPStatus:
"""HTTP status codes with descriptions."""
# Success
OK = 200 # Success
CREATED = 201 # Resource created
ACCEPTED = 202 # Request accepted for processing
NO_CONTENT = 204 # Success with no content
# Redirection
MOVED_PERMANENTLY = 301
FOUND = 302
NOT_MODIFIED = 304
# Client errors
BAD_REQUEST = 400 # Invalid request
UNAUTHORIZED = 401 # Authentication required
FORBIDDEN = 403 # Access denied
NOT_FOUND = 404 # Resource not found
METHOD_NOT_ALLOWED = 405
CONFLICT = 409 # Resource conflict
UNPROCESSABLE_ENTITY = 422 # Validation error
TOO_MANY_REQUESTS = 429 # Rate limit exceeded
# Server errors
INTERNAL_SERVER_ERROR = 500
BAD_GATEWAY = 502
SERVICE_UNAVAILABLE = 503
GATEWAY_TIMEOUT = 504
# ==================== API Configuration ====================
@dataclass
class APIConfig:
"""Configuration for API client."""
base_url: str
timeout: int = 30
max_retries: int = 3
retry_backoff: float = 1.0
verify_ssl: bool = True
# Headers
headers: Dict[str, str] = field(default_factory=dict)
user_agent: str = "Python API Client/1.0"
# Authentication
auth_type: Optional[str] = None # 'bearer', 'api_key', 'basic', 'oauth2'
auth_credentials: Dict[str, str] = field(default_factory=dict)
# Rate limiting
rate_limit: Optional[int] = None # Requests per second
rate_limit_window: int = 1 # Window in seconds
# Logging
enable_logging: bool = True
log_level: str = "INFO"
log_requests: bool = True
log_responses: bool = True
# Proxy
proxy: Optional[str] = None
# Pagination
page_size: int = 100
max_pages: Optional[int] = None
# ==================== API Response ====================
@dataclass
class APIResponse:
"""Structured API response."""
status_code: int
headers: Dict[str, str]
body: Any
elapsed_time: float
request_url: str
request_method: str
@property
def is_success(self) -> bool:
"""Check if response is successful."""
return 200 <= self.status_code < 300
@property
def is_error(self) -> bool:
"""Check if response is an error."""
return self.status_code >= 400
@property
def json(self) -> Optional[Dict]:
"""Get JSON body if available."""
if isinstance(self.body, dict):
return self.body
try:
return json.loads(self.body) if self.body else None
except (json.JSONDecodeError, TypeError):
return None
@property
def text(self) -> str:
"""Get body as text."""
if isinstance(self.body, str):
return self.body
elif isinstance(self.body, bytes):
return self.body.decode('utf-8', errors='ignore')
else:
return str(self.body)
# ==================== Base API Client ====================
class BaseAPIClient:
"""
Base REST API client with common functionality.
"""
def __init__(self, config: APIConfig):
self.config = config
self.session = self._create_session()
self.logger = self._setup_logging()
# Rate limiting
self.last_request_time = 0
self.request_count = 0
# Metrics
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_time": 0,
"errors": []
}
def _create_session(self) -> requests.Session:
"""Create configured requests session."""
session = requests.Session()
# Set headers
session.headers.update({
"User-Agent": self.config.user_agent,
"Accept": "application/json",
"Content-Type": "application/json"
})
if self.config.headers:
session.headers.update(self.config.headers)
# Set authentication
self._setup_authentication(session)
# Set proxy
if self.config.proxy:
session.proxies = {
"http": self.config.proxy,
"https": self.config.proxy
}
# SSL verification
session.verify = self.config.verify_ssl
return session
def _setup_logging(self) -> logging.Logger:
"""Setup logging for API client."""
logger = logging.getLogger(self.__class__.__name__)
if self.config.enable_logging:
logger.setLevel(getattr(logging, self.config.log_level))
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _setup_authentication(self, session: requests.Session):
"""Setup authentication for session."""
auth_type = self.config.auth_type
creds = self.config.auth_credentials
if auth_type == "bearer":
token = creds.get("token")
if token:
session.headers["Authorization"] = f"Bearer {token}"
elif auth_type == "api_key":
key_name = creds.get("key_name", "X-API-Key")
key_value = creds.get("key_value")
if key_value:
session.headers[key_name] = key_value
elif auth_type == "basic":
username = creds.get("username")
password = creds.get("password")
if username and password:
session.auth = (username, password)
elif auth_type == "oauth2":
# OAuth2 is more complex, simplified here
access_token = creds.get("access_token")
if access_token:
session.headers["Authorization"] = f"Bearer {access_token}"
def _apply_rate_limiting(self):
"""Apply rate limiting if configured."""
if not self.config.rate_limit:
return
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.config.rate_limit_window:
self.request_count += 1
if self.request_count > self.config.rate_limit:
sleep_time = self.config.rate_limit_window - time_since_last
self.logger.debug(f"Rate limit reached, sleeping {sleep_time:.2f}s")
time.sleep(sleep_time)
self.request_count = 1
else:
self.request_count = 1
self.last_request_time = time.time()
def _build_url(self, endpoint: str, path_params: Optional[Dict] = None) -> str:
"""Build complete URL with path parameters."""
# Format path parameters
if path_params:
for key, value in path_params.items():
endpoint = endpoint.replace(f"{{{key}}}", str(value))
# Join with base URL
return urljoin(self.config.base_url, endpoint)
def _log_request(self, method: str, url: str, **kwargs):
"""Log outgoing request."""
if self.config.log_requests:
self.logger.info(f"ā {method} {url}")
if kwargs.get("params"):
self.logger.debug(f" Query params: {kwargs['params']}")
if kwargs.get("json"):
self.logger.debug(f" Body: {json.dumps(kwargs['json'], indent=2)}")
def _log_response(self, response: APIResponse):
"""Log incoming response."""
if self.config.log_responses:
status_emoji = "ā
" if response.is_success else "ā"
self.logger.info(
f"ā {status_emoji} {response.status_code} "
f"({response.elapsed_time:.2f}s)"
)
if response.json:
self.logger.debug(f" Response: {json.dumps(response.json, indent=2)}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def _make_request(self, method: HTTPMethod, endpoint: str,
path_params: Optional[Dict] = None,
query_params: Optional[Dict] = None,
body: Optional[Union[Dict, str]] = None,
headers: Optional[Dict] = None,
files: Optional[Dict] = None) -> APIResponse:
"""
Make HTTP request with retries and error handling.
"""
# Apply rate limiting
self._apply_rate_limiting()
# Build URL
url = self._build_url(endpoint, path_params)
# Prepare request
request_kwargs = {
"timeout": self.config.timeout,
"params": query_params
}
if headers:
request_kwargs["headers"] = {**self.session.headers, **headers}
if files:
request_kwargs["files"] = files
elif body:
if isinstance(body, dict):
request_kwargs["json"] = body
else:
request_kwargs["data"] = body
# Log request
self._log_request(method.value, url, **request_kwargs)
# Make request
start_time = time.time()
try:
response = self.session.request(
method.value,
url,
**request_kwargs
)
elapsed_time = time.time() - start_time
# Parse response
try:
response_body = response.json()
except:
response_body = response.text
api_response = APIResponse(
status_code=response.status_code,
headers=dict(response.headers),
body=response_body,
elapsed_time=elapsed_time,
request_url=url,
request_method=method.value
)
# Log response
self._log_response(api_response)
# Update metrics
self.metrics["total_requests"] += 1
self.metrics["total_time"] += elapsed_time
if api_response.is_success:
self.metrics["successful_requests"] += 1
else:
self.metrics["failed_requests"] += 1
# Raise for status if error
response.raise_for_status()
return api_response
except requests.exceptions.RequestException as e:
self.metrics["failed_requests"] += 1
self.metrics["errors"].append(str(e))
self.logger.error(f"Request failed: {e}")
raise
# -------------------- HTTP Methods --------------------
def get(self, endpoint: str, **kwargs) -> APIResponse:
"""Make GET request."""
return self._make_request(HTTPMethod.GET, endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> APIResponse:
"""Make POST request."""
return self._make_request(HTTPMethod.POST, endpoint, **kwargs)
def put(self, endpoint: str, **kwargs) -> APIResponse:
"""Make PUT request."""
return self._make_request(HTTPMethod.PUT, endpoint, **kwargs)
def patch(self, endpoint: str, **kwargs) -> APIResponse:
"""Make PATCH request."""
return self._make_request(HTTPMethod.PATCH, endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs) -> APIResponse:
"""Make DELETE request."""
return self._make_request(HTTPMethod.DELETE, endpoint, **kwargs)
# ==================== Pagination Handler ====================
class PaginationHandler:
"""
Handle various pagination strategies.
"""
def __init__(self, client: BaseAPIClient):
self.client = client
self.logger = logging.getLogger(__name__)
def paginate_offset(self, endpoint: str,
offset_param: str = "offset",
limit_param: str = "limit",
data_key: str = "data",
**kwargs) -> List[Any]:
"""
Handle offset-based pagination.
Example: ?offset=0&limit=100
"""
all_data = []
offset = 0
limit = self.client.config.page_size
page_count = 0
while True:
# Check max pages
if self.client.config.max_pages and page_count >= self.client.config.max_pages:
break
# Set pagination params
params = kwargs.get("query_params", {})
params[offset_param] = offset
params[limit_param] = limit
kwargs["query_params"] = params
# Make request
response = self.client.get(endpoint, **kwargs)
if not response.is_success:
break
# Extract data
data = response.json
if data_key:
page_data = data.get(data_key, [])
else:
page_data = data if isinstance(data, list) else [data]
if not page_data:
break
all_data.extend(page_data)
# Update offset
offset += limit
page_count += 1
# Check if last page
if len(page_data) < limit:
break
self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
return all_data
def paginate_page_number(self, endpoint: str,
page_param: str = "page",
size_param: str = "page_size",
data_key: str = "data",
total_pages_key: str = "total_pages",
**kwargs) -> List[Any]:
"""
Handle page number-based pagination.
Example: ?page=1&page_size=100
"""
all_data = []
page = 1
page_size = self.client.config.page_size
total_pages = None
while True:
# Check max pages
if self.client.config.max_pages and page > self.client.config.max_pages:
break
# Check total pages if known
if total_pages and page > total_pages:
break
# Set pagination params
params = kwargs.get("query_params", {})
params[page_param] = page
params[size_param] = page_size
kwargs["query_params"] = params
# Make request
response = self.client.get(endpoint, **kwargs)
if not response.is_success:
break
# Extract data
data = response.json
if data_key:
page_data = data.get(data_key, [])
else:
page_data = data if isinstance(data, list) else [data]
# Get total pages if available
if total_pages_key in data:
total_pages = data[total_pages_key]
if not page_data:
break
all_data.extend(page_data)
page += 1
self.logger.debug(f"Fetched page {page-1}, total items: {len(all_data)}")
return all_data
def paginate_cursor(self, endpoint: str,
cursor_param: str = "cursor",
next_cursor_key: str = "next_cursor",
data_key: str = "data",
**kwargs) -> List[Any]:
"""
Handle cursor-based pagination.
Example: ?cursor=eyJpZCI6MTAwfQ==
"""
all_data = []
cursor = None
page_count = 0
while True:
# Check max pages
if self.client.config.max_pages and page_count >= self.client.config.max_pages:
break
# Set cursor param
params = kwargs.get("query_params", {})
if cursor:
params[cursor_param] = cursor
kwargs["query_params"] = params
# Make request
response = self.client.get(endpoint, **kwargs)
if not response.is_success:
break
# Extract data
data = response.json
if data_key:
page_data = data.get(data_key, [])
else:
page_data = data if isinstance(data, list) else [data]
if not page_data:
break
all_data.extend(page_data)
# Get next cursor
cursor = data.get(next_cursor_key)
if not cursor:
break
page_count += 1
self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
return all_data
def paginate_link_header(self, endpoint: str,
data_key: Optional[str] = None,
**kwargs) -> List[Any]:
"""
Handle Link header-based pagination (GitHub style).
Example: Link: ; rel="next"
"""
all_data = []
url = self.client._build_url(endpoint)
page_count = 0
while url:
# Check max pages
if self.client.config.max_pages and page_count >= self.client.config.max_pages:
break
# Make request directly with URL
response = self.client.get(url, **kwargs)
if not response.is_success:
break
# Extract data
data = response.json
if data_key:
page_data = data.get(data_key, [])
else:
page_data = data if isinstance(data, list) else [data]
if page_data:
all_data.extend(page_data)
# Parse Link header for next URL
link_header = response.headers.get("Link", "")
url = self._parse_link_header(link_header, "next")
page_count += 1
self.logger.debug(f"Fetched page {page_count}, total items: {len(all_data)}")
return all_data
def _parse_link_header(self, link_header: str, rel: str) -> Optional[str]:
"""Parse Link header for specific rel."""
if not link_header:
return None
links = link_header.split(",")
for link in links:
parts = link.split(";")
if len(parts) == 2:
url = parts[0].strip()[1:-1] # Remove < >
rel_part = parts[1].strip()
if f'rel="{rel}"' in rel_part:
return url
return None
# ==================== Advanced API Client ====================
class AdvancedAPIClient(BaseAPIClient):
"""
Advanced API client with additional features.
"""
def __init__(self, config: APIConfig):
super().__init__(config)
self.pagination = PaginationHandler(self)
def batch_request(self, requests: List[Dict]) -> List[APIResponse]:
"""
Execute multiple requests in batch.
Args:
requests: List of request dictionaries with method, endpoint, etc.
"""
responses = []
for request in requests:
method = HTTPMethod(request.get("method", "GET"))
endpoint = request["endpoint"]
response = self._make_request(
method,
endpoint,
path_params=request.get("path_params"),
query_params=request.get("query_params"),
body=request.get("body"),
headers=request.get("headers")
)
responses.append(response)
return responses
def upload_file(self, endpoint: str, file_path: str,
field_name: str = "file",
additional_data: Optional[Dict] = None) -> APIResponse:
"""Upload file to API endpoint."""
with open(file_path, "rb") as f:
files = {field_name: f}
return self.post(
endpoint,
files=files,
body=additional_data
)
def download_file(self, endpoint: str, output_path: str, **kwargs) -> bool:
"""Download file from API endpoint."""
response = self.get(endpoint, **kwargs)
if response.is_success:
# Check if response is file
content_type = response.headers.get("Content-Type", "")
if "application/json" not in content_type:
# Save as file
with open(output_path, "wb") as f:
if isinstance(response.body, bytes):
f.write(response.body)
else:
f.write(response.body.encode())
self.logger.info(f"File downloaded to {output_path}")
return True
return False
def stream_response(self, endpoint: str, chunk_size: int = 1024, **kwargs):
"""Stream response for large data."""
url = self._build_url(endpoint)
with self.session.get(url, stream=True, **kwargs) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
yield chunk
# ==================== Async API Client ====================
class AsyncAPIClient:
"""
Asynchronous API client for high-performance operations.
"""
def __init__(self, config: APIConfig):
self.config = config
self.logger = logging.getLogger(__name__)
async def get(self, endpoint: str, **kwargs) -> Dict:
"""Async GET request."""
async with aiohttp.ClientSession() as session:
url = urljoin(self.config.base_url, endpoint)
async with session.get(
url,
headers=self.config.headers,
timeout=aiohttp.ClientTimeout(total=self.config.timeout),
**kwargs
) as response:
return await response.json()
async def post(self, endpoint: str, data: Dict, **kwargs) -> Dict:
"""Async POST request."""
async with aiohttp.ClientSession() as session:
url = urljoin(self.config.base_url, endpoint)
async with session.post(
url,
json=data,
headers=self.config.headers,
timeout=aiohttp.ClientTimeout(total=self.config.timeout),
**kwargs
) as response:
return await response.json()
async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
"""Execute multiple requests concurrently."""
tasks = []
async with aiohttp.ClientSession() as session:
for request in requests:
method = request.get("method", "GET").lower()
endpoint = request["endpoint"]
url = urljoin(self.config.base_url, endpoint)
if method == "get":
task = session.get(url, **request.get("kwargs", {}))
elif method == "post":
task = session.post(url, json=request.get("data"), **request.get("kwargs", {}))
else:
continue
tasks.append(task)
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
results.append(await response.json())
return results
# ==================== API Examples ====================
class GitHubAPIClient(AdvancedAPIClient):
"""Example: GitHub API client."""
def __init__(self, token: str):
config = APIConfig(
base_url="https://api.github.com",
auth_type="bearer",
auth_credentials={"token": token},
headers={"Accept": "application/vnd.github.v3+json"}
)
super().__init__(config)
def get_user(self, username: str) -> APIResponse:
"""Get user information."""
return self.get(f"/users/{username}")
def get_user_repos(self, username: str) -> List[Dict]:
"""Get all user repositories with pagination."""
return self.pagination.paginate_page_number(
f"/users/{username}/repos",
page_param="page",
size_param="per_page",
data_key=None # Response is directly a list
)
def create_repo(self, name: str, description: str = "", private: bool = False) -> APIResponse:
"""Create a new repository."""
return self.post("/user/repos", body={
"name": name,
"description": description,
"private": private
})
class JSONPlaceholderClient(AdvancedAPIClient):
"""Example: JSONPlaceholder API client."""
def __init__(self):
config = APIConfig(
base_url="https://jsonplaceholder.typicode.com"
)
super().__init__(config)
def get_posts(self) -> List[Dict]:
"""Get all posts."""
response = self.get("/posts")
return response.json if response.is_success else []
def get_post(self, post_id: int) -> Optional[Dict]:
"""Get single post."""
response = self.get(f"/posts/{post_id}")
return response.json if response.is_success else None
def create_post(self, title: str, body: str, user_id: int) -> APIResponse:
"""Create new post."""
return self.post("/posts", body={
"title": title,
"body": body,
"userId": user_id
})
def update_post(self, post_id: int, title: str, body: str, user_id: int) -> APIResponse:
"""Update existing post."""
return self.put(f"/posts/{post_id}", body={
"id": post_id,
"title": title,
"body": body,
"userId": user_id
})
def delete_post(self, post_id: int) -> APIResponse:
"""Delete post."""
return self.delete(f"/posts/{post_id}")
# Example usage
if __name__ == "__main__":
print("š RESTful API Consumption Examples\n")
# Example 1: Basic API setup
print("1ļøā£ Basic API Configuration:")
config = APIConfig(
base_url="https://api.example.com",
timeout=30,
auth_type="bearer",
auth_credentials={"token": "your-api-token"},
rate_limit=10, # 10 requests per second
enable_logging=True
)
print(f" Base URL: {config.base_url}")
print(f" Auth Type: {config.auth_type}")
print(f" Rate Limit: {config.rate_limit} req/s")
print(f" Timeout: {config.timeout}s")
# Example 2: HTTP methods
print("\n2ļøā£ RESTful HTTP Methods:")
methods = [
("GET", "Retrieve resource(s)", "/users/123"),
("POST", "Create new resource", "/users"),
("PUT", "Update entire resource", "/users/123"),
("PATCH", "Partial update", "/users/123"),
("DELETE", "Remove resource", "/users/123")
]
for method, description, example in methods:
print(f" {method}: {description}")
print(f" Example: {method} {example}")
# Example 3: Common status codes
print("\n3ļøā£ Common HTTP Status Codes:")
status_codes = [
(200, "OK", "Request successful"),
(201, "Created", "Resource created"),
(204, "No Content", "Success, no body"),
(400, "Bad Request", "Invalid request"),
(401, "Unauthorized", "Authentication required"),
(403, "Forbidden", "Access denied"),
(404, "Not Found", "Resource doesn't exist"),
(429, "Too Many Requests", "Rate limit exceeded"),
(500, "Server Error", "Internal error")
]
for code, name, description in status_codes:
print(f" {code} {name}: {description}")
# Example 4: JSONPlaceholder demo
print("\n4ļøā£ JSONPlaceholder API Demo:")
client = JSONPlaceholderClient()
# Get posts
print(" Fetching posts...")
posts = client.get_posts()
print(f" Found {len(posts)} posts")
if posts:
first_post = posts[0]
print(f" First post: '{first_post['title'][:50]}...'")
# Example 5: Pagination strategies
print("\n5ļøā£ Pagination Strategies:")
strategies = [
("Offset-based", "?offset=20&limit=10"),
("Page number", "?page=3&page_size=10"),
("Cursor-based", "?cursor=eyJpZCI6MTAwfQ=="),
("Link header", "Link: ; rel='next'"),
("Keyset", "?after_id=100&limit=10")
]
for strategy, example in strategies:
print(f" {strategy}:")
print(f" Example: {example}")
# Example 6: Authentication methods
print("\n6ļøā£ API Authentication Methods:")
auth_methods = [
("API Key", "X-API-Key: your-api-key"),
("Bearer Token", "Authorization: Bearer token"),
("Basic Auth", "Authorization: Basic base64(user:pass)"),
("OAuth 2.0", "Complex flow with tokens"),
("JWT", "JSON Web Tokens"),
("HMAC", "Request signing with secret")
]
for method, example in auth_methods:
print(f" {method}:")
print(f" {example}")
# Example 7: Error handling
print("\n7ļøā£ Error Handling Patterns:")
patterns = [
"Retry with exponential backoff",
"Circuit breaker for failing endpoints",
"Fallback to cached data",
"Graceful degradation",
"Detailed error logging",
"User-friendly error messages"
]
for pattern in patterns:
print(f" ⢠{pattern}")
# Example 8: Performance optimization
print("\n8ļøā£ Performance Optimization:")
optimizations = [
"Connection pooling with sessions",
"Async/concurrent requests",
"Response caching",
"Request batching",
"Compression (gzip)",
"Selective field retrieval"
]
for optimization in optimizations:
print(f" ⢠{optimization}")
# Example 9: Best practices
print("\n9ļøā£ API Consumption Best Practices:")
practices = [
"ā
Always handle errors gracefully",
"ā±ļø Set appropriate timeouts",
"š Implement retry logic",
"š Monitor API metrics",
"š Secure API credentials",
"š Log requests for debugging",
"š¾ Cache responses when possible",
"šÆ Use specific field selection",
"š¦ Batch requests when available",
"š Follow API documentation"
]
for practice in practices:
print(f" {practice}")
# Example 10: Async example
print("\nš Async API Client Example:")
print(" async def fetch_multiple_apis():")
print(" tasks = [")
print(" client.get('/endpoint1'),")
print(" client.get('/endpoint2'),")
print(" client.get('/endpoint3')")
print(" ]")
print(" results = await asyncio.gather(*tasks)")
print(" return results")
print("\nā
RESTful API consumption demonstration complete!")
Key Takeaways and Best Practices šÆ
- Use Session Objects: Reuse connections for better performance.
- Implement Retry Logic: Handle transient failures gracefully.
- Respect Rate Limits: Implement proper rate limiting to avoid being blocked.
- Handle Pagination: Support multiple pagination strategies.
- Log Everything: Detailed logging helps debugging API issues.
- Use Async for Performance: Concurrent requests for better throughput.
- Cache Responses: Reduce API calls with intelligent caching.
- Validate Responses: Always check status codes and response structure.
RESTful API Best Practices š
Mastering RESTful API consumption opens up a world of possibilities for integration and automation. You can now connect to any web service, aggregate data from multiple sources, and build powerful integrations. Whether you're building microservices, automating workflows, or creating data pipelines, these API skills are your passport to the connected world! š
Pro Tip: Think of API consumption as having a conversation with a remote service - you need to speak the same language, follow the protocol, and handle misunderstandings gracefully. Always use session objects to reuse connections - creating new connections for each request is like hanging up and calling back for every sentence. Implement exponential backoff for retries - if the API doesn't respond, don't keep knocking frantically. Respect rate limits like speed limits on a highway - going too fast gets you blocked. Handle pagination properly - APIs give you data in pages for a reason. Log requests and responses for debugging, but be careful with sensitive data. Use async operations when making multiple API calls - why wait in line when you can do things in parallel? Cache responses intelligently to reduce API calls and improve performance. Most importantly: read the API documentation thoroughly - every API has its quirks and special requirements!