Skip to main content

šŸŖ Webhook Implementation: Real-time Event Processing

Webhooks are the push notifications of the API world - instead of constantly polling for updates, webhooks deliver events directly to you in real-time. Like having a personal assistant who taps you on the shoulder whenever something important happens, webhooks enable event-driven architectures that are efficient, scalable, and responsive. Master webhook implementation to build real-time integrations that react instantly to changes! ⚔

The Webhook Architecture

Think of webhooks as a subscription service for events - you tell an API where to send notifications, and it pushes updates to you as they happen. This reversal of the traditional request-response pattern requires careful implementation of receivers, validators, processors, and reliability mechanisms. Understanding these patterns is crucial for building robust event-driven systems!

graph TB A[Webhooks] --> B[Webhook Server] A --> C[Event Processing] A --> D[Security] A --> E[Reliability] B --> F[HTTP Endpoints] B --> G[Request Handling] B --> H[Response Codes] B --> I[Async Processing] C --> J[Event Parsing] C --> K[Validation] C --> L[Routing] C --> M[Actions] D --> N[Signature Verification] D --> O[IP Whitelisting] D --> P[Token Auth] D --> Q[HTTPS/TLS] E --> R[Retry Logic] E --> S[Idempotency] E --> T[Dead Letter Queue] E --> U[Circuit Breaker] V[Monitoring] --> W[Metrics] V --> X[Logging] V --> Y[Alerting] V --> Z[Debugging] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Event Processing Hub šŸŽÆ

You're building a webhook processing system that handles events from multiple sources - payment notifications from Stripe, repository events from GitHub, messaging from Slack, and order updates from e-commerce platforms. Your system must validate signatures, handle retries, process events asynchronously, maintain delivery guarantees, scale to thousands of events per second, and provide detailed monitoring. Let's build a production-ready webhook framework!

# First, install required packages:
# pip install flask fastapi uvicorn celery redis pydantic cryptography ngrok

import os
import json
import hmac
import hashlib
import time
import logging
import asyncio
from typing import Dict, List, Optional, Any, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import threading
from queue import Queue, Empty
import uuid
from functools import wraps

from flask import Flask, request, jsonify, abort
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
import uvicorn
from celery import Celery
import redis
import requests

# ==================== Webhook Event Models ====================

class EventType(Enum):
    """Common webhook event types."""
    CREATED = "created"
    UPDATED = "updated"
    DELETED = "deleted"
    PAYMENT_RECEIVED = "payment.received"
    PAYMENT_FAILED = "payment.failed"
    USER_SIGNUP = "user.signup"
    ORDER_PLACED = "order.placed"
    MESSAGE_RECEIVED = "message.received"
    STATUS_CHANGED = "status.changed"
    CUSTOM = "custom"

class EventStatus(Enum):
    """Event processing status."""
    PENDING = "pending"
    PROCESSING = "processing"
    PROCESSED = "processed"
    FAILED = "failed"
    RETRYING = "retrying"
    DEAD_LETTER = "dead_letter"

@dataclass
class WebhookEvent:
    """Webhook event data structure."""
    id: str
    source: str
    type: EventType
    payload: Dict[str, Any]
    timestamp: datetime
    signature: Optional[str] = None
    headers: Dict[str, str] = field(default_factory=dict)
    status: EventStatus = EventStatus.PENDING
    attempts: int = 0
    max_attempts: int = 3
    metadata: Dict[str, Any] = field(default_factory=dict)

# ==================== Webhook Security ====================

class WebhookSecurity:
    """
    Security utilities for webhook validation.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.secrets = {}  # Store webhook secrets by source
    
    def register_secret(self, source: str, secret: str):
        """Register webhook secret for a source."""
        self.secrets[source] = secret
        self.logger.info(f"Registered webhook secret for {source}")
    
    def verify_signature(self, source: str, payload: bytes, 
                        signature: str, algorithm: str = "sha256") -> bool:
        """
        Verify webhook signature.
        
        Common patterns:
        - GitHub: sha256=hash
        - Stripe: base64(hmac-sha256)
        - Slack: v0=hash
        """
        if source not in self.secrets:
            self.logger.warning(f"No secret registered for {source}")
            return False
        
        secret = self.secrets[source]
        
        try:
            # GitHub style: sha256=hash
            if signature.startswith("sha256=") or signature.startswith("sha1="):
                algorithm = signature.split("=")[0]
                expected_signature = signature.split("=")[1]
                
                mac = hmac.new(
                    secret.encode(),
                    payload,
                    getattr(hashlib, algorithm)
                )
                
                return hmac.compare_digest(mac.hexdigest(), expected_signature)
            
            # Stripe style: direct comparison
            elif source == "stripe":
                import base64
                
                mac = hmac.new(
                    secret.encode(),
                    payload,
                    hashlib.sha256
                )
                
                expected = base64.b64encode(mac.digest()).decode()
                return hmac.compare_digest(expected, signature)
            
            # Slack style: v0=hash
            elif signature.startswith("v0="):
                timestamp = self._extract_timestamp(payload)
                sig_basestring = f"v0:{timestamp}:{payload.decode()}"
                
                mac = hmac.new(
                    secret.encode(),
                    sig_basestring.encode(),
                    hashlib.sha256
                )
                
                expected = f"v0={mac.hexdigest()}"
                return hmac.compare_digest(expected, signature)
            
            # Generic HMAC
            else:
                mac = hmac.new(
                    secret.encode(),
                    payload,
                    getattr(hashlib, algorithm)
                )
                
                return hmac.compare_digest(mac.hexdigest(), signature)
                
        except Exception as e:
            self.logger.error(f"Signature verification failed: {e}")
            return False
    
    def _extract_timestamp(self, payload: bytes) -> str:
        """Extract timestamp from payload for Slack verification."""
        try:
            data = json.loads(payload)
            return str(data.get("timestamp", ""))
        except:
            return ""
    
    def verify_ip_whitelist(self, ip: str, whitelist: List[str]) -> bool:
        """Verify request IP against whitelist."""
        import ipaddress
        
        try:
            request_ip = ipaddress.ip_address(ip)
            
            for allowed in whitelist:
                # Check if CIDR notation
                if "/" in allowed:
                    network = ipaddress.ip_network(allowed)
                    if request_ip in network:
                        return True
                else:
                    if str(request_ip) == allowed:
                        return True
            
            return False
            
        except Exception as e:
            self.logger.error(f"IP verification failed: {e}")
            return False
    
    def generate_webhook_secret(self, length: int = 32) -> str:
        """Generate a secure webhook secret."""
        import secrets
        return secrets.token_hex(length)

# ==================== Flask Webhook Server ====================

def create_flask_webhook_server(security: WebhookSecurity):
    """Create Flask webhook server."""
    app = Flask(__name__)
    app.config['SECRET_KEY'] = os.urandom(32)
    
    # Event queue
    event_queue = Queue()
    
    @app.route('/webhook/', methods=['POST'])
    def webhook_endpoint(source):
        """Generic webhook endpoint."""
        try:
            # Get request data
            payload = request.get_data()
            headers = dict(request.headers)
            
            # Verify signature if present
            signature_header = headers.get('X-Hub-Signature-256') or \
                             headers.get('X-Signature') or \
                             headers.get('Stripe-Signature')
            
            if signature_header:
                if not security.verify_signature(source, payload, signature_header):
                    app.logger.warning(f"Invalid signature from {source}")
                    abort(401, "Invalid signature")
            
            # Parse event
            try:
                data = request.get_json()
            except:
                data = {"raw": payload.decode()}
            
            # Create event
            event = WebhookEvent(
                id=str(uuid.uuid4()),
                source=source,
                type=EventType.CUSTOM,
                payload=data,
                timestamp=datetime.now(),
                signature=signature_header,
                headers=headers
            )
            
            # Queue for processing
            event_queue.put(event)
            
            app.logger.info(f"Received webhook from {source}: {event.id}")
            
            # Return success immediately
            return jsonify({"status": "accepted", "event_id": event.id}), 200
            
        except Exception as e:
            app.logger.error(f"Webhook processing error: {e}")
            return jsonify({"error": str(e)}), 500
    
    @app.route('/webhook/github', methods=['POST'])
    def github_webhook():
        """GitHub-specific webhook endpoint."""
        event_type = request.headers.get('X-GitHub-Event')
        signature = request.headers.get('X-Hub-Signature-256')
        
        if not signature:
            abort(401, "No signature provided")
        
        payload = request.get_data()
        
        if not security.verify_signature('github', payload, signature):
            abort(401, "Invalid signature")
        
        data = request.get_json()
        
        # Process based on event type
        if event_type == 'push':
            # Handle push event
            commits = data.get('commits', [])
            app.logger.info(f"GitHub push with {len(commits)} commits")
        elif event_type == 'pull_request':
            # Handle PR event
            action = data.get('action')
            app.logger.info(f"GitHub PR {action}")
        
        return jsonify({"status": "processed"}), 200
    
    @app.route('/webhook/stripe', methods=['POST'])
    def stripe_webhook():
        """Stripe-specific webhook endpoint."""
        signature = request.headers.get('Stripe-Signature')
        
        if not signature:
            abort(401, "No signature provided")
        
        payload = request.get_data()
        
        # Stripe signature verification
        import stripe
        
        try:
            event = stripe.Webhook.construct_event(
                payload, signature, security.secrets.get('stripe')
            )
        except ValueError:
            abort(400, "Invalid payload")
        except stripe.error.SignatureVerificationError:
            abort(401, "Invalid signature")
        
        # Process Stripe event
        if event['type'] == 'payment_intent.succeeded':
            payment_intent = event['data']['object']
            app.logger.info(f"Payment succeeded: {payment_intent['id']}")
        elif event['type'] == 'customer.subscription.deleted':
            subscription = event['data']['object']
            app.logger.info(f"Subscription cancelled: {subscription['id']}")
        
        return jsonify({"status": "processed"}), 200
    
    @app.route('/health', methods=['GET'])
    def health_check():
        """Health check endpoint."""
        return jsonify({"status": "healthy", "queue_size": event_queue.qsize()}), 200
    
    return app

# ==================== FastAPI Webhook Server ====================

def create_fastapi_webhook_server(security: WebhookSecurity):
    """Create FastAPI webhook server."""
    app = FastAPI(title="Webhook Server")
    
    # Event store
    event_store = []
    
    class WebhookPayload(BaseModel):
        """Generic webhook payload."""
        event_type: str
        data: Dict[str, Any]
        timestamp: Optional[datetime] = Field(default_factory=datetime.now)
    
    @app.post("/webhook/{source}")
    async def webhook_endpoint(
        source: str,
        request: Request,
        background_tasks: BackgroundTasks
    ):
        """Generic webhook endpoint."""
        # Get request data
        body = await request.body()
        headers = dict(request.headers)
        
        # Verify signature
        signature = headers.get('x-signature')
        
        if signature and not security.verify_signature(source, body, signature):
            raise HTTPException(status_code=401, detail="Invalid signature")
        
        # Parse payload
        try:
            payload = await request.json()
        except:
            payload = {"raw": body.decode()}
        
        # Create event
        event = WebhookEvent(
            id=str(uuid.uuid4()),
            source=source,
            type=EventType.CUSTOM,
            payload=payload,
            timestamp=datetime.now(),
            signature=signature,
            headers=headers
        )
        
        # Store event
        event_store.append(event)
        
        # Process async in background
        background_tasks.add_task(process_webhook_async, event)
        
        return {"status": "accepted", "event_id": event.id}
    
    async def process_webhook_async(event: WebhookEvent):
        """Process webhook asynchronously."""
        # Simulate processing
        await asyncio.sleep(0.1)
        
        logging.info(f"Processed event {event.id} from {event.source}")
        event.status = EventStatus.PROCESSED
    
    @app.get("/events")
    async def get_events(limit: int = 100):
        """Get recent events."""
        return event_store[-limit:]
    
    @app.get("/health")
    async def health_check():
        """Health check endpoint."""
        return {
            "status": "healthy",
            "events_processed": len([e for e in event_store if e.status == EventStatus.PROCESSED]),
            "events_pending": len([e for e in event_store if e.status == EventStatus.PENDING])
        }
    
    return app

# ==================== Event Processor ====================

class WebhookProcessor:
    """
    Process webhook events with retries and error handling.
    """
    
    def __init__(self, redis_client: Optional[redis.Redis] = None):
        self.handlers = {}
        self.logger = logging.getLogger(__name__)
        self.redis = redis_client
        
        # Metrics
        self.metrics = {
            "processed": 0,
            "failed": 0,
            "retried": 0
        }
    
    def register_handler(self, source: str, event_type: EventType, 
                        handler: Callable):
        """Register event handler."""
        key = f"{source}:{event_type.value}"
        self.handlers[key] = handler
        self.logger.info(f"Registered handler for {key}")
    
    def process_event(self, event: WebhookEvent) -> bool:
        """
        Process webhook event.
        Returns True if successful, False otherwise.
        """
        try:
            # Find handler
            key = f"{event.source}:{event.type.value}"
            handler = self.handlers.get(key)
            
            if not handler:
                # Try generic handler for source
                handler = self.handlers.get(f"{event.source}:*")
            
            if not handler:
                self.logger.warning(f"No handler for {key}")
                return False
            
            # Update status
            event.status = EventStatus.PROCESSING
            
            # Execute handler
            result = handler(event)
            
            # Update status
            event.status = EventStatus.PROCESSED
            self.metrics["processed"] += 1
            
            self.logger.info(f"Processed event {event.id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Event processing failed: {e}")
            
            event.status = EventStatus.FAILED
            event.attempts += 1
            
            # Check if should retry
            if event.attempts < event.max_attempts:
                event.status = EventStatus.RETRYING
                self.metrics["retried"] += 1
                
                # Queue for retry with exponential backoff
                retry_delay = 2 ** event.attempts
                
                if self.redis:
                    # Use Redis for distributed retry queue
                    self._queue_retry(event, retry_delay)
                else:
                    # Simple in-memory retry
                    threading.Timer(retry_delay, self.process_event, [event]).start()
            else:
                # Move to dead letter queue
                event.status = EventStatus.DEAD_LETTER
                self.metrics["failed"] += 1
                self._handle_dead_letter(event)
            
            return False
    
    def _queue_retry(self, event: WebhookEvent, delay: int):
        """Queue event for retry in Redis."""
        if not self.redis:
            return
        
        # Serialize event
        event_data = {
            "id": event.id,
            "source": event.source,
            "type": event.type.value,
            "payload": event.payload,
            "timestamp": event.timestamp.isoformat(),
            "attempts": event.attempts
        }
        
        # Add to delayed queue
        score = time.time() + delay
        self.redis.zadd("webhook:retry_queue", {json.dumps(event_data): score})
        
        self.logger.info(f"Queued event {event.id} for retry in {delay}s")
    
    def _handle_dead_letter(self, event: WebhookEvent):
        """Handle events that failed all retries."""
        self.logger.error(f"Event {event.id} moved to dead letter queue")
        
        if self.redis:
            # Store in dead letter queue
            self.redis.lpush("webhook:dead_letter", json.dumps({
                "id": event.id,
                "source": event.source,
                "type": event.type.value,
                "payload": event.payload,
                "timestamp": event.timestamp.isoformat(),
                "attempts": event.attempts,
                "failed_at": datetime.now().isoformat()
            }))
        
        # Could also send alert, store in database, etc.
    
    def process_retry_queue(self):
        """Process events from retry queue."""
        if not self.redis:
            return
        
        while True:
            # Get events ready for retry
            now = time.time()
            events = self.redis.zrangebyscore("webhook:retry_queue", 0, now, start=0, num=10)
            
            if not events:
                time.sleep(1)
                continue
            
            for event_data in events:
                # Remove from queue
                self.redis.zrem("webhook:retry_queue", event_data)
                
                # Deserialize and process
                data = json.loads(event_data)
                event = WebhookEvent(
                    id=data["id"],
                    source=data["source"],
                    type=EventType(data["type"]),
                    payload=data["payload"],
                    timestamp=datetime.fromisoformat(data["timestamp"]),
                    attempts=data["attempts"]
                )
                
                self.process_event(event)

# ==================== Webhook Client (Sender) ====================

class WebhookClient:
    """
    Client for sending webhooks.
    """
    
    def __init__(self, secret: Optional[str] = None):
        self.secret = secret
        self.logger = logging.getLogger(__name__)
        self.session = requests.Session()
    
    def send_webhook(self, url: str, event_type: str, 
                     payload: Dict[str, Any], 
                     max_retries: int = 3) -> bool:
        """Send webhook with retries."""
        event = {
            "id": str(uuid.uuid4()),
            "type": event_type,
            "timestamp": datetime.now().isoformat(),
            "payload": payload
        }
        
        # Create signature if secret provided
        headers = {"Content-Type": "application/json"}
        
        if self.secret:
            body = json.dumps(event).encode()
            signature = hmac.new(
                self.secret.encode(),
                body,
                hashlib.sha256
            ).hexdigest()
            headers["X-Webhook-Signature"] = f"sha256={signature}"
        
        # Send with retries
        for attempt in range(max_retries):
            try:
                response = self.session.post(
                    url,
                    json=event,
                    headers=headers,
                    timeout=30
                )
                
                if response.status_code in [200, 201, 202, 204]:
                    self.logger.info(f"Webhook sent successfully to {url}")
                    return True
                elif response.status_code >= 500:
                    # Server error, retry
                    self.logger.warning(f"Server error {response.status_code}, retrying...")
                    time.sleep(2 ** attempt)
                else:
                    # Client error, don't retry
                    self.logger.error(f"Client error {response.status_code}: {response.text}")
                    return False
                    
            except requests.exceptions.RequestException as e:
                self.logger.error(f"Request failed: {e}")
                
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)
                    continue
                    
                return False
        
        return False
    
    def register_webhook(self, api_url: str, webhook_url: str, 
                        events: List[str]) -> bool:
        """Register webhook with an API."""
        registration = {
            "url": webhook_url,
            "events": events,
            "secret": self.secret
        }
        
        try:
            response = self.session.post(
                f"{api_url}/webhooks",
                json=registration
            )
            
            if response.status_code in [200, 201]:
                webhook_id = response.json().get("id")
                self.logger.info(f"Webhook registered: {webhook_id}")
                return True
                
        except Exception as e:
            self.logger.error(f"Registration failed: {e}")
        
        return False

# ==================== Webhook Testing ====================

class WebhookTester:
    """
    Utilities for testing webhooks.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def create_test_payload(self, event_type: str) -> Dict[str, Any]:
        """Create test webhook payload."""
        payloads = {
            "payment.success": {
                "amount": 1000,
                "currency": "USD",
                "customer_id": "cust_123",
                "payment_method": "card"
            },
            "order.created": {
                "order_id": "ord_456",
                "total": 150.00,
                "items": [
                    {"sku": "ITEM1", "quantity": 2, "price": 75.00}
                ]
            },
            "user.signup": {
                "user_id": "usr_789",
                "email": "test@example.com",
                "plan": "premium"
            }
        }
        
        return payloads.get(event_type, {"test": True})
    
    def simulate_webhook(self, url: str, event_type: str, 
                        secret: Optional[str] = None):
        """Simulate webhook for testing."""
        client = WebhookClient(secret)
        payload = self.create_test_payload(event_type)
        
        return client.send_webhook(url, event_type, payload)
    
    def verify_endpoint(self, url: str) -> bool:
        """Verify webhook endpoint is reachable."""
        try:
            response = requests.get(url, timeout=5)
            return response.status_code < 500
        except:
            return False

# ==================== Webhook Monitoring ====================

class WebhookMonitor:
    """
    Monitor webhook health and performance.
    """
    
    def __init__(self):
        self.metrics = defaultdict(lambda: {
            "received": 0,
            "processed": 0,
            "failed": 0,
            "avg_processing_time": 0,
            "last_received": None,
            "last_error": None
        })
        self.logger = logging.getLogger(__name__)
    
    def record_received(self, source: str):
        """Record webhook received."""
        self.metrics[source]["received"] += 1
        self.metrics[source]["last_received"] = datetime.now()
    
    def record_processed(self, source: str, processing_time: float):
        """Record successful processing."""
        metrics = self.metrics[source]
        metrics["processed"] += 1
        
        # Update average processing time
        avg = metrics["avg_processing_time"]
        count = metrics["processed"]
        metrics["avg_processing_time"] = (avg * (count - 1) + processing_time) / count
    
    def record_failure(self, source: str, error: str):
        """Record processing failure."""
        self.metrics[source]["failed"] += 1
        self.metrics[source]["last_error"] = {
            "error": error,
            "timestamp": datetime.now()
        }
    
    def get_metrics(self, source: Optional[str] = None) -> Dict:
        """Get webhook metrics."""
        if source:
            return dict(self.metrics[source])
        return dict(self.metrics)
    
    def check_health(self, source: str, 
                    max_failure_rate: float = 0.1,
                    max_silence_minutes: int = 60) -> Dict[str, bool]:
        """Check webhook health."""
        metrics = self.metrics[source]
        
        health = {
            "is_healthy": True,
            "receiving_events": True,
            "processing_successfully": True
        }
        
        # Check failure rate
        total = metrics["received"]
        if total > 0:
            failure_rate = metrics["failed"] / total
            if failure_rate > max_failure_rate:
                health["is_healthy"] = False
                health["processing_successfully"] = False
        
        # Check silence period
        last_received = metrics["last_received"]
        if last_received:
            silence_duration = (datetime.now() - last_received).total_seconds() / 60
            if silence_duration > max_silence_minutes:
                health["is_healthy"] = False
                health["receiving_events"] = False
        
        return health

# Example usage
if __name__ == "__main__":
    print("šŸŖ Webhook Implementation Examples\n")
    
    # Example 1: Webhook flow
    print("1ļøāƒ£ Webhook Flow:")
    
    flow_steps = [
        "1. External service triggers event",
        "2. Service sends HTTP POST to your webhook URL",
        "3. Your server receives and validates request",
        "4. Quick response (200 OK) to acknowledge",
        "5. Async processing of event",
        "6. Retry on failure with backoff"
    ]
    
    for step in flow_steps:
        print(f"   {step}")
    
    # Example 2: Security setup
    print("\n2ļøāƒ£ Webhook Security:")
    
    security = WebhookSecurity()
    
    # Generate secret
    secret = security.generate_webhook_secret()
    print(f"   Generated secret: {secret[:20]}...")
    
    # Register secrets
    security.register_secret("github", "webhook_secret_123")
    security.register_secret("stripe", "whsec_test_456")
    
    print("   Security measures:")
    print("     āœ“ HMAC signature verification")
    print("     āœ“ IP whitelist validation")
    print("     āœ“ HTTPS/TLS encryption")
    print("     āœ“ Timestamp validation")
    
    # Example 3: Flask server setup
    print("\n3ļøāƒ£ Flask Webhook Server:")
    
    print("   Endpoints:")
    print("     POST /webhook/ - Generic endpoint")
    print("     POST /webhook/github - GitHub events")
    print("     POST /webhook/stripe - Stripe events")
    print("     GET /health - Health check")
    
    # Example 4: Event processing
    print("\n4ļøāƒ£ Event Processing:")
    
    processor = WebhookProcessor()
    
    # Register handlers
    def handle_payment(event: WebhookEvent):
        print(f"   Processing payment: {event.payload}")
    
    def handle_user_signup(event: WebhookEvent):
        print(f"   Processing signup: {event.payload}")
    
    processor.register_handler("stripe", EventType.PAYMENT_RECEIVED, handle_payment)
    processor.register_handler("app", EventType.USER_SIGNUP, handle_user_signup)
    
    print("   Registered handlers:")
    print("     • stripe:payment.received → handle_payment")
    print("     • app:user.signup → handle_user_signup")
    
    # Example 5: Retry strategy
    print("\n5ļøāƒ£ Retry Strategy:")
    
    retry_config = {
        "max_attempts": 3,
        "backoff": "exponential",
        "delays": [1, 2, 4],
        "dead_letter_after": 3
    }
    
    print("   Configuration:")
    for key, value in retry_config.items():
        print(f"     {key}: {value}")
    
    # Example 6: Common webhook providers
    print("\n6ļøāƒ£ Common Webhook Providers:")
    
    providers = [
        ("GitHub", "Repository events", "X-Hub-Signature-256"),
        ("Stripe", "Payment events", "Stripe-Signature"),
        ("Slack", "Workspace events", "X-Slack-Signature"),
        ("Twilio", "SMS/Call events", "X-Twilio-Signature"),
        ("SendGrid", "Email events", "X-Twilio-Email-Event-Webhook-Signature"),
        ("Shopify", "Store events", "X-Shopify-Hmac-Sha256")
    ]
    
    for provider, description, header in providers:
        print(f"   {provider}:")
        print(f"     Events: {description}")
        print(f"     Signature: {header}")
    
    # Example 7: Testing webhooks
    print("\n7ļøāƒ£ Testing Webhooks:")
    
    tester = WebhookTester()
    
    print("   Using ngrok for local testing:")
    print("     1. Install: pip install pyngrok")
    print("     2. Run: ngrok http 5000")
    print("     3. Use URL: https://abc123.ngrok.io/webhook")
    
    print("\n   Test tools:")
    print("     • RequestBin - Inspect webhooks")
    print("     • Webhook.site - Test endpoints")
    print("     • Postman - Send test webhooks")
    
    # Example 8: Monitoring
    print("\n8ļøāƒ£ Webhook Monitoring:")
    
    monitor = WebhookMonitor()
    
    # Simulate metrics
    monitor.record_received("github")
    monitor.record_processed("github", 0.5)
    monitor.record_received("stripe")
    monitor.record_failure("stripe", "Signature validation failed")
    
    metrics = monitor.get_metrics()
    print("   Sample metrics:")
    for source, data in metrics.items():
        print(f"     {source}:")
        print(f"       Received: {data['received']}")
        print(f"       Processed: {data['processed']}")
        print(f"       Failed: {data['failed']}")
    
    # Example 9: Best practices
    print("\n9ļøāƒ£ Webhook Best Practices:")
    
    practices = [
        "šŸ” Always verify signatures",
        "⚔ Respond quickly (< 3s)",
        "šŸ”„ Process asynchronously",
        "šŸ’¾ Store events before processing",
        "šŸ” Implement idempotency",
        "šŸ“ Log everything",
        "šŸŽÆ Use specific endpoints",
        "šŸ›”ļø Validate payloads",
        "šŸ“Š Monitor performance",
        "šŸ”” Alert on failures"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 10: Common pitfalls
    print("\nšŸ”Ÿ Common Pitfalls:")
    
    pitfalls = [
        ("Timeout", "Processing takes too long", "Process async"),
        ("No retry", "Single failure loses event", "Implement retries"),
        ("No signature", "Security vulnerability", "Verify signatures"),
        ("Memory issues", "Storing all events", "Use queues/database"),
        ("Order dependency", "Events out of order", "Design for any order"),
        ("Duplicate events", "Processing twice", "Implement idempotency")
    ]
    
    for issue, problem, solution in pitfalls:
        print(f"   {issue}:")
        print(f"     Problem: {problem}")
        print(f"     Solution: {solution}")
    
    print("\nāœ… Webhook implementation demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Webhook Implementation Best Practices šŸ“‹

Pro Tip: Think of webhooks as phone calls from APIs - you need to answer quickly, verify who's calling, and handle the message properly. Always verify signatures first - accepting unsigned webhooks is like answering calls from unknown numbers without checking. Respond immediately with a 200 OK and process asynchronously - holding up the response is like keeping someone on hold. Implement proper retry logic with exponential backoff - if processing fails, you want another chance. Design for idempotency - webhooks might be sent multiple times, so processing should be safe to repeat. Use message queues (Redis, RabbitMQ, SQS) for reliable processing at scale. Monitor everything: response times, success rates, and error patterns. Test locally with ngrok before deploying. Store raw events before processing for debugging and replay. Most importantly: webhooks are about real-time event processing - design your system to handle events as they come, in any order, at any scale!

Mastering webhook implementation enables you to build real-time, event-driven systems that react instantly to changes. You can now receive events from any service, process them reliably, handle failures gracefully, and scale to thousands of events per second. Whether you're building payment systems, CI/CD pipelines, or real-time notifications, these webhook skills power your event-driven architecture! šŸš€