Webhooks are the push notifications of the API world - instead of constantly polling for updates, webhooks deliver events directly to you in real-time. Like having a personal assistant who taps you on the shoulder whenever something important happens, webhooks enable event-driven architectures that are efficient, scalable, and responsive. Master webhook implementation to build real-time integrations that react instantly to changes! ā”
The Webhook Architecture
Think of webhooks as a subscription service for events - you tell an API where to send notifications, and it pushes updates to you as they happen. This reversal of the traditional request-response pattern requires careful implementation of receivers, validators, processors, and reliability mechanisms. Understanding these patterns is crucial for building robust event-driven systems!
graph TB
A[Webhooks] --> B[Webhook Server]
A --> C[Event Processing]
A --> D[Security]
A --> E[Reliability]
B --> F[HTTP Endpoints]
B --> G[Request Handling]
B --> H[Response Codes]
B --> I[Async Processing]
C --> J[Event Parsing]
C --> K[Validation]
C --> L[Routing]
C --> M[Actions]
D --> N[Signature Verification]
D --> O[IP Whitelisting]
D --> P[Token Auth]
D --> Q[HTTPS/TLS]
E --> R[Retry Logic]
E --> S[Idempotency]
E --> T[Dead Letter Queue]
E --> U[Circuit Breaker]
V[Monitoring] --> W[Metrics]
V --> X[Logging]
V --> Y[Alerting]
V --> Z[Debugging]
style A fill:#ff6b6b
style B fill:#51cf66
style C fill:#339af0
style D fill:#ffd43b
style E fill:#ff6b6b
style V fill:#51cf66
Real-World Scenario: The Event Processing Hub šÆ
You're building a webhook processing system that handles events from multiple sources - payment notifications from Stripe, repository events from GitHub, messaging from Slack, and order updates from e-commerce platforms. Your system must validate signatures, handle retries, process events asynchronously, maintain delivery guarantees, scale to thousands of events per second, and provide detailed monitoring. Let's build a production-ready webhook framework!
# First, install required packages:
# pip install flask fastapi uvicorn celery redis pydantic cryptography ngrok
import os
import json
import hmac
import hashlib
import time
import logging
import asyncio
from typing import Dict, List, Optional, Any, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import threading
from queue import Queue, Empty
import uuid
from functools import wraps
from flask import Flask, request, jsonify, abort
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
import uvicorn
from celery import Celery
import redis
import requests
# ==================== Webhook Event Models ====================
class EventType(Enum):
"""Common webhook event types."""
CREATED = "created"
UPDATED = "updated"
DELETED = "deleted"
PAYMENT_RECEIVED = "payment.received"
PAYMENT_FAILED = "payment.failed"
USER_SIGNUP = "user.signup"
ORDER_PLACED = "order.placed"
MESSAGE_RECEIVED = "message.received"
STATUS_CHANGED = "status.changed"
CUSTOM = "custom"
class EventStatus(Enum):
"""Event processing status."""
PENDING = "pending"
PROCESSING = "processing"
PROCESSED = "processed"
FAILED = "failed"
RETRYING = "retrying"
DEAD_LETTER = "dead_letter"
@dataclass
class WebhookEvent:
"""Webhook event data structure."""
id: str
source: str
type: EventType
payload: Dict[str, Any]
timestamp: datetime
signature: Optional[str] = None
headers: Dict[str, str] = field(default_factory=dict)
status: EventStatus = EventStatus.PENDING
attempts: int = 0
max_attempts: int = 3
metadata: Dict[str, Any] = field(default_factory=dict)
# ==================== Webhook Security ====================
class WebhookSecurity:
"""
Security utilities for webhook validation.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.secrets = {} # Store webhook secrets by source
def register_secret(self, source: str, secret: str):
"""Register webhook secret for a source."""
self.secrets[source] = secret
self.logger.info(f"Registered webhook secret for {source}")
def verify_signature(self, source: str, payload: bytes,
signature: str, algorithm: str = "sha256") -> bool:
"""
Verify webhook signature.
Common patterns:
- GitHub: sha256=hash
- Stripe: base64(hmac-sha256)
- Slack: v0=hash
"""
if source not in self.secrets:
self.logger.warning(f"No secret registered for {source}")
return False
secret = self.secrets[source]
try:
# GitHub style: sha256=hash
if signature.startswith("sha256=") or signature.startswith("sha1="):
algorithm = signature.split("=")[0]
expected_signature = signature.split("=")[1]
mac = hmac.new(
secret.encode(),
payload,
getattr(hashlib, algorithm)
)
return hmac.compare_digest(mac.hexdigest(), expected_signature)
# Stripe style: direct comparison
elif source == "stripe":
import base64
mac = hmac.new(
secret.encode(),
payload,
hashlib.sha256
)
expected = base64.b64encode(mac.digest()).decode()
return hmac.compare_digest(expected, signature)
# Slack style: v0=hash
elif signature.startswith("v0="):
timestamp = self._extract_timestamp(payload)
sig_basestring = f"v0:{timestamp}:{payload.decode()}"
mac = hmac.new(
secret.encode(),
sig_basestring.encode(),
hashlib.sha256
)
expected = f"v0={mac.hexdigest()}"
return hmac.compare_digest(expected, signature)
# Generic HMAC
else:
mac = hmac.new(
secret.encode(),
payload,
getattr(hashlib, algorithm)
)
return hmac.compare_digest(mac.hexdigest(), signature)
except Exception as e:
self.logger.error(f"Signature verification failed: {e}")
return False
def _extract_timestamp(self, payload: bytes) -> str:
"""Extract timestamp from payload for Slack verification."""
try:
data = json.loads(payload)
return str(data.get("timestamp", ""))
except:
return ""
def verify_ip_whitelist(self, ip: str, whitelist: List[str]) -> bool:
"""Verify request IP against whitelist."""
import ipaddress
try:
request_ip = ipaddress.ip_address(ip)
for allowed in whitelist:
# Check if CIDR notation
if "/" in allowed:
network = ipaddress.ip_network(allowed)
if request_ip in network:
return True
else:
if str(request_ip) == allowed:
return True
return False
except Exception as e:
self.logger.error(f"IP verification failed: {e}")
return False
def generate_webhook_secret(self, length: int = 32) -> str:
"""Generate a secure webhook secret."""
import secrets
return secrets.token_hex(length)
# ==================== Flask Webhook Server ====================
def create_flask_webhook_server(security: WebhookSecurity):
"""Create Flask webhook server."""
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(32)
# Event queue
event_queue = Queue()
@app.route('/webhook/', methods=['POST'])
def webhook_endpoint(source):
"""Generic webhook endpoint."""
try:
# Get request data
payload = request.get_data()
headers = dict(request.headers)
# Verify signature if present
signature_header = headers.get('X-Hub-Signature-256') or \
headers.get('X-Signature') or \
headers.get('Stripe-Signature')
if signature_header:
if not security.verify_signature(source, payload, signature_header):
app.logger.warning(f"Invalid signature from {source}")
abort(401, "Invalid signature")
# Parse event
try:
data = request.get_json()
except:
data = {"raw": payload.decode()}
# Create event
event = WebhookEvent(
id=str(uuid.uuid4()),
source=source,
type=EventType.CUSTOM,
payload=data,
timestamp=datetime.now(),
signature=signature_header,
headers=headers
)
# Queue for processing
event_queue.put(event)
app.logger.info(f"Received webhook from {source}: {event.id}")
# Return success immediately
return jsonify({"status": "accepted", "event_id": event.id}), 200
except Exception as e:
app.logger.error(f"Webhook processing error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/webhook/github', methods=['POST'])
def github_webhook():
"""GitHub-specific webhook endpoint."""
event_type = request.headers.get('X-GitHub-Event')
signature = request.headers.get('X-Hub-Signature-256')
if not signature:
abort(401, "No signature provided")
payload = request.get_data()
if not security.verify_signature('github', payload, signature):
abort(401, "Invalid signature")
data = request.get_json()
# Process based on event type
if event_type == 'push':
# Handle push event
commits = data.get('commits', [])
app.logger.info(f"GitHub push with {len(commits)} commits")
elif event_type == 'pull_request':
# Handle PR event
action = data.get('action')
app.logger.info(f"GitHub PR {action}")
return jsonify({"status": "processed"}), 200
@app.route('/webhook/stripe', methods=['POST'])
def stripe_webhook():
"""Stripe-specific webhook endpoint."""
signature = request.headers.get('Stripe-Signature')
if not signature:
abort(401, "No signature provided")
payload = request.get_data()
# Stripe signature verification
import stripe
try:
event = stripe.Webhook.construct_event(
payload, signature, security.secrets.get('stripe')
)
except ValueError:
abort(400, "Invalid payload")
except stripe.error.SignatureVerificationError:
abort(401, "Invalid signature")
# Process Stripe event
if event['type'] == 'payment_intent.succeeded':
payment_intent = event['data']['object']
app.logger.info(f"Payment succeeded: {payment_intent['id']}")
elif event['type'] == 'customer.subscription.deleted':
subscription = event['data']['object']
app.logger.info(f"Subscription cancelled: {subscription['id']}")
return jsonify({"status": "processed"}), 200
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint."""
return jsonify({"status": "healthy", "queue_size": event_queue.qsize()}), 200
return app
# ==================== FastAPI Webhook Server ====================
def create_fastapi_webhook_server(security: WebhookSecurity):
"""Create FastAPI webhook server."""
app = FastAPI(title="Webhook Server")
# Event store
event_store = []
class WebhookPayload(BaseModel):
"""Generic webhook payload."""
event_type: str
data: Dict[str, Any]
timestamp: Optional[datetime] = Field(default_factory=datetime.now)
@app.post("/webhook/{source}")
async def webhook_endpoint(
source: str,
request: Request,
background_tasks: BackgroundTasks
):
"""Generic webhook endpoint."""
# Get request data
body = await request.body()
headers = dict(request.headers)
# Verify signature
signature = headers.get('x-signature')
if signature and not security.verify_signature(source, body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
try:
payload = await request.json()
except:
payload = {"raw": body.decode()}
# Create event
event = WebhookEvent(
id=str(uuid.uuid4()),
source=source,
type=EventType.CUSTOM,
payload=payload,
timestamp=datetime.now(),
signature=signature,
headers=headers
)
# Store event
event_store.append(event)
# Process async in background
background_tasks.add_task(process_webhook_async, event)
return {"status": "accepted", "event_id": event.id}
async def process_webhook_async(event: WebhookEvent):
"""Process webhook asynchronously."""
# Simulate processing
await asyncio.sleep(0.1)
logging.info(f"Processed event {event.id} from {event.source}")
event.status = EventStatus.PROCESSED
@app.get("/events")
async def get_events(limit: int = 100):
"""Get recent events."""
return event_store[-limit:]
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"events_processed": len([e for e in event_store if e.status == EventStatus.PROCESSED]),
"events_pending": len([e for e in event_store if e.status == EventStatus.PENDING])
}
return app
# ==================== Event Processor ====================
class WebhookProcessor:
"""
Process webhook events with retries and error handling.
"""
def __init__(self, redis_client: Optional[redis.Redis] = None):
self.handlers = {}
self.logger = logging.getLogger(__name__)
self.redis = redis_client
# Metrics
self.metrics = {
"processed": 0,
"failed": 0,
"retried": 0
}
def register_handler(self, source: str, event_type: EventType,
handler: Callable):
"""Register event handler."""
key = f"{source}:{event_type.value}"
self.handlers[key] = handler
self.logger.info(f"Registered handler for {key}")
def process_event(self, event: WebhookEvent) -> bool:
"""
Process webhook event.
Returns True if successful, False otherwise.
"""
try:
# Find handler
key = f"{event.source}:{event.type.value}"
handler = self.handlers.get(key)
if not handler:
# Try generic handler for source
handler = self.handlers.get(f"{event.source}:*")
if not handler:
self.logger.warning(f"No handler for {key}")
return False
# Update status
event.status = EventStatus.PROCESSING
# Execute handler
result = handler(event)
# Update status
event.status = EventStatus.PROCESSED
self.metrics["processed"] += 1
self.logger.info(f"Processed event {event.id}")
return True
except Exception as e:
self.logger.error(f"Event processing failed: {e}")
event.status = EventStatus.FAILED
event.attempts += 1
# Check if should retry
if event.attempts < event.max_attempts:
event.status = EventStatus.RETRYING
self.metrics["retried"] += 1
# Queue for retry with exponential backoff
retry_delay = 2 ** event.attempts
if self.redis:
# Use Redis for distributed retry queue
self._queue_retry(event, retry_delay)
else:
# Simple in-memory retry
threading.Timer(retry_delay, self.process_event, [event]).start()
else:
# Move to dead letter queue
event.status = EventStatus.DEAD_LETTER
self.metrics["failed"] += 1
self._handle_dead_letter(event)
return False
def _queue_retry(self, event: WebhookEvent, delay: int):
"""Queue event for retry in Redis."""
if not self.redis:
return
# Serialize event
event_data = {
"id": event.id,
"source": event.source,
"type": event.type.value,
"payload": event.payload,
"timestamp": event.timestamp.isoformat(),
"attempts": event.attempts
}
# Add to delayed queue
score = time.time() + delay
self.redis.zadd("webhook:retry_queue", {json.dumps(event_data): score})
self.logger.info(f"Queued event {event.id} for retry in {delay}s")
def _handle_dead_letter(self, event: WebhookEvent):
"""Handle events that failed all retries."""
self.logger.error(f"Event {event.id} moved to dead letter queue")
if self.redis:
# Store in dead letter queue
self.redis.lpush("webhook:dead_letter", json.dumps({
"id": event.id,
"source": event.source,
"type": event.type.value,
"payload": event.payload,
"timestamp": event.timestamp.isoformat(),
"attempts": event.attempts,
"failed_at": datetime.now().isoformat()
}))
# Could also send alert, store in database, etc.
def process_retry_queue(self):
"""Process events from retry queue."""
if not self.redis:
return
while True:
# Get events ready for retry
now = time.time()
events = self.redis.zrangebyscore("webhook:retry_queue", 0, now, start=0, num=10)
if not events:
time.sleep(1)
continue
for event_data in events:
# Remove from queue
self.redis.zrem("webhook:retry_queue", event_data)
# Deserialize and process
data = json.loads(event_data)
event = WebhookEvent(
id=data["id"],
source=data["source"],
type=EventType(data["type"]),
payload=data["payload"],
timestamp=datetime.fromisoformat(data["timestamp"]),
attempts=data["attempts"]
)
self.process_event(event)
# ==================== Webhook Client (Sender) ====================
class WebhookClient:
"""
Client for sending webhooks.
"""
def __init__(self, secret: Optional[str] = None):
self.secret = secret
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
def send_webhook(self, url: str, event_type: str,
payload: Dict[str, Any],
max_retries: int = 3) -> bool:
"""Send webhook with retries."""
event = {
"id": str(uuid.uuid4()),
"type": event_type,
"timestamp": datetime.now().isoformat(),
"payload": payload
}
# Create signature if secret provided
headers = {"Content-Type": "application/json"}
if self.secret:
body = json.dumps(event).encode()
signature = hmac.new(
self.secret.encode(),
body,
hashlib.sha256
).hexdigest()
headers["X-Webhook-Signature"] = f"sha256={signature}"
# Send with retries
for attempt in range(max_retries):
try:
response = self.session.post(
url,
json=event,
headers=headers,
timeout=30
)
if response.status_code in [200, 201, 202, 204]:
self.logger.info(f"Webhook sent successfully to {url}")
return True
elif response.status_code >= 500:
# Server error, retry
self.logger.warning(f"Server error {response.status_code}, retrying...")
time.sleep(2 ** attempt)
else:
# Client error, don't retry
self.logger.error(f"Client error {response.status_code}: {response.text}")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"Request failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return False
return False
def register_webhook(self, api_url: str, webhook_url: str,
events: List[str]) -> bool:
"""Register webhook with an API."""
registration = {
"url": webhook_url,
"events": events,
"secret": self.secret
}
try:
response = self.session.post(
f"{api_url}/webhooks",
json=registration
)
if response.status_code in [200, 201]:
webhook_id = response.json().get("id")
self.logger.info(f"Webhook registered: {webhook_id}")
return True
except Exception as e:
self.logger.error(f"Registration failed: {e}")
return False
# ==================== Webhook Testing ====================
class WebhookTester:
"""
Utilities for testing webhooks.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def create_test_payload(self, event_type: str) -> Dict[str, Any]:
"""Create test webhook payload."""
payloads = {
"payment.success": {
"amount": 1000,
"currency": "USD",
"customer_id": "cust_123",
"payment_method": "card"
},
"order.created": {
"order_id": "ord_456",
"total": 150.00,
"items": [
{"sku": "ITEM1", "quantity": 2, "price": 75.00}
]
},
"user.signup": {
"user_id": "usr_789",
"email": "test@example.com",
"plan": "premium"
}
}
return payloads.get(event_type, {"test": True})
def simulate_webhook(self, url: str, event_type: str,
secret: Optional[str] = None):
"""Simulate webhook for testing."""
client = WebhookClient(secret)
payload = self.create_test_payload(event_type)
return client.send_webhook(url, event_type, payload)
def verify_endpoint(self, url: str) -> bool:
"""Verify webhook endpoint is reachable."""
try:
response = requests.get(url, timeout=5)
return response.status_code < 500
except:
return False
# ==================== Webhook Monitoring ====================
class WebhookMonitor:
"""
Monitor webhook health and performance.
"""
def __init__(self):
self.metrics = defaultdict(lambda: {
"received": 0,
"processed": 0,
"failed": 0,
"avg_processing_time": 0,
"last_received": None,
"last_error": None
})
self.logger = logging.getLogger(__name__)
def record_received(self, source: str):
"""Record webhook received."""
self.metrics[source]["received"] += 1
self.metrics[source]["last_received"] = datetime.now()
def record_processed(self, source: str, processing_time: float):
"""Record successful processing."""
metrics = self.metrics[source]
metrics["processed"] += 1
# Update average processing time
avg = metrics["avg_processing_time"]
count = metrics["processed"]
metrics["avg_processing_time"] = (avg * (count - 1) + processing_time) / count
def record_failure(self, source: str, error: str):
"""Record processing failure."""
self.metrics[source]["failed"] += 1
self.metrics[source]["last_error"] = {
"error": error,
"timestamp": datetime.now()
}
def get_metrics(self, source: Optional[str] = None) -> Dict:
"""Get webhook metrics."""
if source:
return dict(self.metrics[source])
return dict(self.metrics)
def check_health(self, source: str,
max_failure_rate: float = 0.1,
max_silence_minutes: int = 60) -> Dict[str, bool]:
"""Check webhook health."""
metrics = self.metrics[source]
health = {
"is_healthy": True,
"receiving_events": True,
"processing_successfully": True
}
# Check failure rate
total = metrics["received"]
if total > 0:
failure_rate = metrics["failed"] / total
if failure_rate > max_failure_rate:
health["is_healthy"] = False
health["processing_successfully"] = False
# Check silence period
last_received = metrics["last_received"]
if last_received:
silence_duration = (datetime.now() - last_received).total_seconds() / 60
if silence_duration > max_silence_minutes:
health["is_healthy"] = False
health["receiving_events"] = False
return health
# Example usage
if __name__ == "__main__":
print("šŖ Webhook Implementation Examples\n")
# Example 1: Webhook flow
print("1ļøā£ Webhook Flow:")
flow_steps = [
"1. External service triggers event",
"2. Service sends HTTP POST to your webhook URL",
"3. Your server receives and validates request",
"4. Quick response (200 OK) to acknowledge",
"5. Async processing of event",
"6. Retry on failure with backoff"
]
for step in flow_steps:
print(f" {step}")
# Example 2: Security setup
print("\n2ļøā£ Webhook Security:")
security = WebhookSecurity()
# Generate secret
secret = security.generate_webhook_secret()
print(f" Generated secret: {secret[:20]}...")
# Register secrets
security.register_secret("github", "webhook_secret_123")
security.register_secret("stripe", "whsec_test_456")
print(" Security measures:")
print(" ā HMAC signature verification")
print(" ā IP whitelist validation")
print(" ā HTTPS/TLS encryption")
print(" ā Timestamp validation")
# Example 3: Flask server setup
print("\n3ļøā£ Flask Webhook Server:")
print(" Endpoints:")
print(" POST /webhook/ - Generic endpoint")
print(" POST /webhook/github - GitHub events")
print(" POST /webhook/stripe - Stripe events")
print(" GET /health - Health check")
# Example 4: Event processing
print("\n4ļøā£ Event Processing:")
processor = WebhookProcessor()
# Register handlers
def handle_payment(event: WebhookEvent):
print(f" Processing payment: {event.payload}")
def handle_user_signup(event: WebhookEvent):
print(f" Processing signup: {event.payload}")
processor.register_handler("stripe", EventType.PAYMENT_RECEIVED, handle_payment)
processor.register_handler("app", EventType.USER_SIGNUP, handle_user_signup)
print(" Registered handlers:")
print(" ⢠stripe:payment.received ā handle_payment")
print(" ⢠app:user.signup ā handle_user_signup")
# Example 5: Retry strategy
print("\n5ļøā£ Retry Strategy:")
retry_config = {
"max_attempts": 3,
"backoff": "exponential",
"delays": [1, 2, 4],
"dead_letter_after": 3
}
print(" Configuration:")
for key, value in retry_config.items():
print(f" {key}: {value}")
# Example 6: Common webhook providers
print("\n6ļøā£ Common Webhook Providers:")
providers = [
("GitHub", "Repository events", "X-Hub-Signature-256"),
("Stripe", "Payment events", "Stripe-Signature"),
("Slack", "Workspace events", "X-Slack-Signature"),
("Twilio", "SMS/Call events", "X-Twilio-Signature"),
("SendGrid", "Email events", "X-Twilio-Email-Event-Webhook-Signature"),
("Shopify", "Store events", "X-Shopify-Hmac-Sha256")
]
for provider, description, header in providers:
print(f" {provider}:")
print(f" Events: {description}")
print(f" Signature: {header}")
# Example 7: Testing webhooks
print("\n7ļøā£ Testing Webhooks:")
tester = WebhookTester()
print(" Using ngrok for local testing:")
print(" 1. Install: pip install pyngrok")
print(" 2. Run: ngrok http 5000")
print(" 3. Use URL: https://abc123.ngrok.io/webhook")
print("\n Test tools:")
print(" ⢠RequestBin - Inspect webhooks")
print(" ⢠Webhook.site - Test endpoints")
print(" ⢠Postman - Send test webhooks")
# Example 8: Monitoring
print("\n8ļøā£ Webhook Monitoring:")
monitor = WebhookMonitor()
# Simulate metrics
monitor.record_received("github")
monitor.record_processed("github", 0.5)
monitor.record_received("stripe")
monitor.record_failure("stripe", "Signature validation failed")
metrics = monitor.get_metrics()
print(" Sample metrics:")
for source, data in metrics.items():
print(f" {source}:")
print(f" Received: {data['received']}")
print(f" Processed: {data['processed']}")
print(f" Failed: {data['failed']}")
# Example 9: Best practices
print("\n9ļøā£ Webhook Best Practices:")
practices = [
"š Always verify signatures",
"ā” Respond quickly (< 3s)",
"š Process asynchronously",
"š¾ Store events before processing",
"š Implement idempotency",
"š Log everything",
"šÆ Use specific endpoints",
"š”ļø Validate payloads",
"š Monitor performance",
"š Alert on failures"
]
for practice in practices:
print(f" {practice}")
# Example 10: Common pitfalls
print("\nš Common Pitfalls:")
pitfalls = [
("Timeout", "Processing takes too long", "Process async"),
("No retry", "Single failure loses event", "Implement retries"),
("No signature", "Security vulnerability", "Verify signatures"),
("Memory issues", "Storing all events", "Use queues/database"),
("Order dependency", "Events out of order", "Design for any order"),
("Duplicate events", "Processing twice", "Implement idempotency")
]
for issue, problem, solution in pitfalls:
print(f" {issue}:")
print(f" Problem: {problem}")
print(f" Solution: {solution}")
print("\nā Webhook implementation demonstration complete!")
Pro Tip: Think of webhooks as phone calls from APIs - you need to answer quickly, verify who's calling, and handle the message properly. Always verify signatures first - accepting unsigned webhooks is like answering calls from unknown numbers without checking. Respond immediately with a 200 OK and process asynchronously - holding up the response is like keeping someone on hold. Implement proper retry logic with exponential backoff - if processing fails, you want another chance. Design for idempotency - webhooks might be sent multiple times, so processing should be safe to repeat. Use message queues (Redis, RabbitMQ, SQS) for reliable processing at scale. Monitor everything: response times, success rates, and error patterns. Test locally with ngrok before deploying. Store raw events before processing for debugging and replay. Most importantly: webhooks are about real-time event processing - design your system to handle events as they come, in any order, at any scale!
Mastering webhook implementation enables you to build real-time, event-driven systems that react instantly to changes. You can now receive events from any service, process them reliably, handle failures gracefully, and scale to thousands of events per second. Whether you're building payment systems, CI/CD pipelines, or real-time notifications, these webhook skills power your event-driven architecture! š
Pro Tip: Think of webhooks as phone calls from APIs - you need to answer quickly, verify who's calling, and handle the message properly. Always verify signatures first - accepting unsigned webhooks is like answering calls from unknown numbers without checking. Respond immediately with a 200 OK and process asynchronously - holding up the response is like keeping someone on hold. Implement proper retry logic with exponential backoff - if processing fails, you want another chance. Design for idempotency - webhooks might be sent multiple times, so processing should be safe to repeat. Use message queues (Redis, RabbitMQ, SQS) for reliable processing at scale. Monitor everything: response times, success rates, and error patterns. Test locally with ngrok before deploying. Store raw events before processing for debugging and replay. Most importantly: webhooks are about real-time event processing - design your system to handle events as they come, in any order, at any scale!