Skip to main content

šŸš€ Data Pipelines: Process and Store Data at Scale

Data pipelines are the highways of information - transforming raw scraped data into valuable insights. Like a sophisticated factory assembly line, they take messy input, clean it, enrich it, validate it, and deliver it exactly where it needs to go. Whether you're handling gigabytes or terabytes, streaming or batch processing, a well-designed pipeline is the difference between data chaos and data mastery. Let's build industrial-strength data pipelines! šŸ­

The Data Pipeline Architecture

Think of a data pipeline as a river system where data flows from multiple sources, gets filtered through various processing stages, and eventually reaches its destination lakes and oceans (databases, warehouses, APIs). Each component must handle failures gracefully, scale elastically, and maintain data quality throughout the journey!

graph TB A[Data Sources] --> B[Ingestion] B --> C[Validation] C --> D[Transformation] D --> E[Enrichment] E --> F[Storage] A --> G[Web Scraping] A --> H[APIs] A --> I[Files] A --> J[Streams] C --> K[Schema Validation] C --> L[Data Quality] C --> M[Deduplication] D --> N[Cleaning] D --> O[Normalization] D --> P[Aggregation] D --> Q[Feature Engineering] E --> R[Geocoding] E --> S[Sentiment Analysis] E --> T[Classification] F --> U[Databases] F --> V[Data Lakes] F --> W[Message Queues] F --> X[APIs] Y[Monitoring] --> Z[Metrics] Y --> AA[Alerts] Y --> AB[Logging] style A fill:#ff6b6b style B fill:#51cf66 style D fill:#339af0 style F fill:#ffd43b style Y fill:#ff6b6b

Real-World Scenario: The Market Intelligence Platform šŸ“ˆ

You're building a market intelligence platform that processes millions of product listings, reviews, and price changes daily from hundreds of sources. The system must handle different data formats, validate quality, enrich with ML predictions, detect anomalies, and serve real-time analytics to thousands of users. Your pipeline must be bulletproof, scalable, and maintainable!

# First, install required packages:
# pip install apache-beam pandas sqlalchemy psycopg2 redis kafka-python elasticsearch pymongo pydantic luigi airflow

import json
import logging
import time
from typing import List, Dict, Optional, Any, Iterator, Tuple, Union, Generator
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import hashlib
import uuid
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from queue import Queue, Empty
import threading

import pandas as pd
import numpy as np
from pydantic import BaseModel, validator, Field
from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import redis
import pymongo
from elasticsearch import Elasticsearch
from kafka import KafkaProducer, KafkaConsumer
import luigi

# ==================== Data Models ====================

class DataQuality(Enum):
    """Data quality levels."""
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INVALID = "invalid"

class ProcessingStatus(Enum):
    """Processing status."""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"

# Pydantic models for validation
class ProductData(BaseModel):
    """Product data model with validation."""
    id: str
    name: str
    price: float = Field(gt=0, le=1000000)
    currency: str = Field(regex="^[A-Z]{3}$")
    description: Optional[str] = None
    category: Optional[str] = None
    brand: Optional[str] = None
    in_stock: bool = True
    rating: Optional[float] = Field(None, ge=0, le=5)
    review_count: int = Field(default=0, ge=0)
    image_urls: List[str] = []
    scraped_at: datetime
    source_url: str
    
    @validator('price')
    def price_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError('Price must be positive')
        return round(v, 2)
    
    @validator('name')
    def name_not_empty(cls, v):
        if not v or not v.strip():
            raise ValueError('Name cannot be empty')
        return v.strip()
    
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

@dataclass
class PipelineMetrics:
    """Pipeline performance metrics."""
    items_processed: int = 0
    items_failed: int = 0
    processing_time: float = 0
    throughput: float = 0
    error_rate: float = 0
    last_updated: datetime = field(default_factory=datetime.now)

# ==================== Data Pipeline Base ====================

class DataPipeline:
    """
    Base class for data pipelines.
    """
    
    def __init__(self, name: str, config: Dict[str, Any] = None):
        self.name = name
        self.config = config or {}
        self.metrics = PipelineMetrics()
        self.logger = self._setup_logging()
        
    def _setup_logging(self) -> logging.Logger:
        """Setup pipeline logging."""
        logger = logging.getLogger(f"pipeline.{self.name}")
        logger.setLevel(logging.INFO)
        
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        
        return logger
    
    def process(self, data: Any) -> Any:
        """Process data through pipeline."""
        raise NotImplementedError
    
    def validate(self, data: Any) -> bool:
        """Validate data."""
        raise NotImplementedError
    
    def transform(self, data: Any) -> Any:
        """Transform data."""
        raise NotImplementedError
    
    def store(self, data: Any) -> bool:
        """Store processed data."""
        raise NotImplementedError

# ==================== ETL Pipeline ====================

class ETLPipeline(DataPipeline):
    """
    Extract, Transform, Load pipeline.
    """
    
    def __init__(self, name: str, 
                 extractors: List[Any] = None,
                 transformers: List[Any] = None,
                 loaders: List[Any] = None):
        super().__init__(name)
        self.extractors = extractors or []
        self.transformers = transformers or []
        self.loaders = loaders or []
    
    def extract(self, source: str) -> Generator[Dict, None, None]:
        """Extract data from source."""
        for extractor in self.extractors:
            try:
                for item in extractor.extract(source):
                    yield item
            except Exception as e:
                self.logger.error(f"Extraction failed: {e}")
    
    def transform(self, data: Iterator[Dict]) -> Generator[Dict, None, None]:
        """Transform extracted data."""
        for item in data:
            try:
                # Apply transformations in sequence
                transformed = item
                for transformer in self.transformers:
                    transformed = transformer.transform(transformed)
                
                if transformed:
                    yield transformed
                    
            except Exception as e:
                self.logger.error(f"Transformation failed for item: {e}")
                self.metrics.items_failed += 1
    
    def load(self, data: Iterator[Dict]) -> int:
        """Load transformed data."""
        loaded_count = 0
        
        for item in data:
            try:
                # Load to all configured destinations
                for loader in self.loaders:
                    if loader.load(item):
                        loaded_count += 1
                        
            except Exception as e:
                self.logger.error(f"Loading failed: {e}")
        
        return loaded_count
    
    def run(self, source: str) -> PipelineMetrics:
        """Run complete ETL pipeline."""
        start_time = time.time()
        
        try:
            # Extract
            extracted = self.extract(source)
            
            # Transform
            transformed = self.transform(extracted)
            
            # Load
            loaded = self.load(transformed)
            
            # Update metrics
            self.metrics.items_processed = loaded
            self.metrics.processing_time = time.time() - start_time
            self.metrics.throughput = loaded / self.metrics.processing_time if self.metrics.processing_time > 0 else 0
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {e}")
        
        return self.metrics

# ==================== Stream Processing ====================

class StreamProcessor:
    """
    Real-time stream processing.
    """
    
    def __init__(self, name: str, 
                 batch_size: int = 100,
                 flush_interval: int = 10):
        self.name = name
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.last_flush = time.time()
        self.running = False
        self.logger = logging.getLogger(f"stream.{name}")
        
        # Threading
        self.lock = threading.Lock()
        self.flush_thread = None
    
    def start(self):
        """Start stream processor."""
        self.running = True
        
        # Start flush thread
        self.flush_thread = threading.Thread(target=self._flush_worker)
        self.flush_thread.daemon = True
        self.flush_thread.start()
        
        self.logger.info("Stream processor started")
    
    def stop(self):
        """Stop stream processor."""
        self.running = False
        
        # Final flush
        self.flush()
        
        if self.flush_thread:
            self.flush_thread.join(timeout=5)
        
        self.logger.info("Stream processor stopped")
    
    def process(self, item: Dict) -> bool:
        """Process single item."""
        with self.lock:
            # Validate item
            if not self.validate(item):
                return False
            
            # Add to buffer
            self.buffer.append(item)
            
            # Check if batch is full
            if len(self.buffer) >= self.batch_size:
                self.flush()
            
            return True
    
    def validate(self, item: Dict) -> bool:
        """Validate stream item."""
        # Override in subclass
        return True
    
    def flush(self):
        """Flush buffer to storage."""
        with self.lock:
            if not self.buffer:
                return
            
            try:
                # Process batch
                self.process_batch(self.buffer)
                
                # Clear buffer
                self.buffer.clear()
                self.last_flush = time.time()
                
            except Exception as e:
                self.logger.error(f"Flush failed: {e}")
    
    def process_batch(self, batch: List[Dict]):
        """Process a batch of items."""
        # Override in subclass
        pass
    
    def _flush_worker(self):
        """Background worker for periodic flushing."""
        while self.running:
            time.sleep(1)
            
            # Check if flush interval reached
            if time.time() - self.last_flush >= self.flush_interval:
                self.flush()

# ==================== Data Validators ====================

class DataValidator:
    """
    Validate and clean data.
    """
    
    def __init__(self, schema: BaseModel = None):
        self.schema = schema
        self.logger = logging.getLogger(__name__)
        self.stats = {
            'total': 0,
            'valid': 0,
            'invalid': 0,
            'cleaned': 0
        }
    
    def validate(self, data: Dict) -> Tuple[bool, Optional[Dict]]:
        """
        Validate data against schema.
        
        Returns:
            (is_valid, cleaned_data)
        """
        self.stats['total'] += 1
        
        try:
            if self.schema:
                # Validate with Pydantic
                validated = self.schema(**data)
                self.stats['valid'] += 1
                return True, validated.dict()
            else:
                # Basic validation
                cleaned = self.clean_data(data)
                if cleaned:
                    self.stats['valid'] += 1
                    return True, cleaned
                    
        except Exception as e:
            self.logger.warning(f"Validation failed: {e}")
            self.stats['invalid'] += 1
        
        return False, None
    
    def clean_data(self, data: Dict) -> Optional[Dict]:
        """Clean and normalize data."""
        cleaned = {}
        
        for key, value in data.items():
            # Remove None values
            if value is None:
                continue
            
            # Clean strings
            if isinstance(value, str):
                value = value.strip()
                if not value:
                    continue
            
            # Validate numbers
            if isinstance(value, (int, float)):
                if np.isnan(value) or np.isinf(value):
                    continue
            
            cleaned[key] = value
        
        self.stats['cleaned'] += 1
        return cleaned if cleaned else None
    
    def check_quality(self, data: Dict) -> DataQuality:
        """Assess data quality."""
        if not data:
            return DataQuality.INVALID
        
        # Calculate completeness
        required_fields = ['id', 'name', 'price']
        missing = sum(1 for field in required_fields if field not in data)
        
        if missing == 0:
            return DataQuality.HIGH
        elif missing == 1:
            return DataQuality.MEDIUM
        elif missing == 2:
            return DataQuality.LOW
        else:
            return DataQuality.INVALID

# ==================== Data Transformers ====================

class DataTransformer:
    """
    Transform and enrich data.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def transform(self, data: Dict) -> Dict:
        """Apply transformations to data."""
        # Override in subclass
        return data

class PriceNormalizer(DataTransformer):
    """Normalize prices to common currency."""
    
    def __init__(self, target_currency: str = "USD"):
        super().__init__()
        self.target_currency = target_currency
        self.exchange_rates = self._load_exchange_rates()
    
    def _load_exchange_rates(self) -> Dict[str, float]:
        """Load exchange rates (simplified)."""
        return {
            "USD": 1.0,
            "EUR": 0.85,
            "GBP": 0.73,
            "JPY": 110.0,
            "CNY": 6.45
        }
    
    def transform(self, data: Dict) -> Dict:
        """Normalize price to target currency."""
        if 'price' in data and 'currency' in data:
            currency = data['currency']
            if currency != self.target_currency:
                rate = self.exchange_rates.get(currency, 1.0)
                target_rate = self.exchange_rates.get(self.target_currency, 1.0)
                
                # Convert price
                data['original_price'] = data['price']
                data['original_currency'] = currency
                data['price'] = round(data['price'] * (target_rate / rate), 2)
                data['currency'] = self.target_currency
        
        return data

class TextCleaner(DataTransformer):
    """Clean and normalize text fields."""
    
    def transform(self, data: Dict) -> Dict:
        """Clean text fields."""
        text_fields = ['name', 'description', 'category', 'brand']
        
        for field in text_fields:
            if field in data and data[field]:
                # Remove extra whitespace
                data[field] = ' '.join(data[field].split())
                
                # Remove HTML tags if present
                import re
                data[field] = re.sub(r'<[^>]+>', '', data[field])
                
                # Normalize unicode
                data[field] = data[field].encode('ascii', 'ignore').decode('ascii')
        
        return data

class FeatureEnricher(DataTransformer):
    """Enrich data with additional features."""
    
    def transform(self, data: Dict) -> Dict:
        """Add computed features."""
        # Price tier
        if 'price' in data:
            price = data['price']
            if price < 10:
                data['price_tier'] = 'budget'
            elif price < 50:
                data['price_tier'] = 'mid-range'
            elif price < 200:
                data['price_tier'] = 'premium'
            else:
                data['price_tier'] = 'luxury'
        
        # Availability score
        if 'in_stock' in data:
            data['availability_score'] = 1.0 if data['in_stock'] else 0.0
        
        # Quality score
        if 'rating' in data and 'review_count' in data:
            rating = data.get('rating', 0)
            reviews = data.get('review_count', 0)
            
            # Weighted by review count
            if reviews > 0:
                weight = min(1.0, reviews / 100)
                data['quality_score'] = rating * weight
            else:
                data['quality_score'] = 0
        
        # Add processing timestamp
        data['processed_at'] = datetime.now().isoformat()
        
        return data

# ==================== Data Storage ====================

class StorageBackend:
    """Base class for storage backends."""
    
    def store(self, data: Dict) -> bool:
        """Store data."""
        raise NotImplementedError
    
    def retrieve(self, query: Dict) -> List[Dict]:
        """Retrieve data."""
        raise NotImplementedError

class PostgreSQLStorage(StorageBackend):
    """PostgreSQL storage backend."""
    
    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
        self.Session = sessionmaker(bind=self.engine)
        
        # Create tables
        Base = declarative_base()
        
        class Product(Base):
            __tablename__ = 'products'
            
            id = Column(String, primary_key=True)
            name = Column(String, nullable=False)
            price = Column(Float)
            currency = Column(String(3))
            description = Column(Text)
            category = Column(String)
            brand = Column(String)
            in_stock = Column(Boolean)
            rating = Column(Float)
            review_count = Column(Integer)
            scraped_at = Column(DateTime)
            processed_at = Column(DateTime, default=datetime.now)
        
        Base.metadata.create_all(self.engine)
        self.Product = Product
    
    def store(self, data: Dict) -> bool:
        """Store data in PostgreSQL."""
        session = self.Session()
        
        try:
            product = self.Product(**data)
            session.add(product)
            session.commit()
            return True
            
        except Exception as e:
            session.rollback()
            logging.error(f"PostgreSQL storage failed: {e}")
            return False
            
        finally:
            session.close()

class MongoDBStorage(StorageBackend):
    """MongoDB storage backend."""
    
    def __init__(self, connection_string: str, database: str, collection: str):
        self.client = pymongo.MongoClient(connection_string)
        self.db = self.client[database]
        self.collection = self.db[collection]
        
        # Create indexes
        self.collection.create_index([("id", 1)], unique=True)
        self.collection.create_index([("scraped_at", -1)])
    
    def store(self, data: Dict) -> bool:
        """Store data in MongoDB."""
        try:
            # Upsert document
            self.collection.update_one(
                {"id": data["id"]},
                {"$set": data},
                upsert=True
            )
            return True
            
        except Exception as e:
            logging.error(f"MongoDB storage failed: {e}")
            return False

class ElasticsearchStorage(StorageBackend):
    """Elasticsearch storage backend."""
    
    def __init__(self, hosts: List[str], index: str):
        self.es = Elasticsearch(hosts)
        self.index = index
        
        # Create index if not exists
        if not self.es.indices.exists(index=self.index):
            self.es.indices.create(index=self.index)
    
    def store(self, data: Dict) -> bool:
        """Store data in Elasticsearch."""
        try:
            # Index document
            self.es.index(
                index=self.index,
                id=data.get("id"),
                body=data
            )
            return True
            
        except Exception as e:
            logging.error(f"Elasticsearch storage failed: {e}")
            return False

class RedisCache(StorageBackend):
    """Redis cache backend."""
    
    def __init__(self, host: str = "localhost", port: int = 6379):
        self.redis = redis.Redis(host=host, port=port, decode_responses=True)
    
    def store(self, data: Dict, ttl: int = 3600) -> bool:
        """Store data in Redis with TTL."""
        try:
            key = f"product:{data.get('id')}"
            self.redis.setex(
                key,
                ttl,
                json.dumps(data, default=str)
            )
            return True
            
        except Exception as e:
            logging.error(f"Redis storage failed: {e}")
            return False
    
    def retrieve(self, product_id: str) -> Optional[Dict]:
        """Retrieve data from Redis."""
        try:
            key = f"product:{product_id}"
            data = self.redis.get(key)
            if data:
                return json.loads(data)
                
        except Exception as e:
            logging.error(f"Redis retrieval failed: {e}")
        
        return None

# ==================== Batch Processing ====================

class BatchProcessor:
    """
    Process data in batches for efficiency.
    """
    
    def __init__(self, batch_size: int = 1000, 
                 num_workers: int = 4):
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.logger = logging.getLogger(__name__)
    
    def process_parallel(self, data: List[Dict], 
                        processor: callable) -> List[Dict]:
        """Process data in parallel."""
        results = []
        
        with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
            # Split into batches
            batches = [
                data[i:i + self.batch_size]
                for i in range(0, len(data), self.batch_size)
            ]
            
            # Process batches in parallel
            futures = [
                executor.submit(self._process_batch, batch, processor)
                for batch in batches
            ]
            
            # Collect results
            for future in futures:
                try:
                    batch_results = future.result(timeout=300)
                    results.extend(batch_results)
                except Exception as e:
                    self.logger.error(f"Batch processing failed: {e}")
        
        return results
    
    def _process_batch(self, batch: List[Dict], 
                      processor: callable) -> List[Dict]:
        """Process single batch."""
        results = []
        
        for item in batch:
            try:
                processed = processor(item)
                if processed:
                    results.append(processed)
            except Exception as e:
                self.logger.error(f"Item processing failed: {e}")
        
        return results

# ==================== Pipeline Orchestration ====================

class PipelineOrchestrator:
    """
    Orchestrate multiple pipelines.
    """
    
    def __init__(self):
        self.pipelines = {}
        self.schedules = {}
        self.logger = logging.getLogger(__name__)
    
    def register_pipeline(self, name: str, pipeline: DataPipeline,
                         schedule: Optional[str] = None):
        """Register a pipeline."""
        self.pipelines[name] = pipeline
        
        if schedule:
            self.schedules[name] = schedule
        
        self.logger.info(f"Registered pipeline: {name}")
    
    def run_pipeline(self, name: str, **kwargs) -> PipelineMetrics:
        """Run a specific pipeline."""
        if name not in self.pipelines:
            raise ValueError(f"Pipeline not found: {name}")
        
        pipeline = self.pipelines[name]
        
        self.logger.info(f"Running pipeline: {name}")
        start_time = time.time()
        
        try:
            metrics = pipeline.run(**kwargs)
            
            self.logger.info(
                f"Pipeline completed: {name} - "
                f"Items: {metrics.items_processed}, "
                f"Time: {metrics.processing_time:.2f}s"
            )
            
            return metrics
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {name} - {e}")
            raise
    
    def run_all(self) -> Dict[str, PipelineMetrics]:
        """Run all registered pipelines."""
        results = {}
        
        for name in self.pipelines:
            try:
                results[name] = self.run_pipeline(name)
            except Exception as e:
                self.logger.error(f"Failed to run pipeline {name}: {e}")
        
        return results

# ==================== Luigi Task Example ====================

class ScrapingTask(luigi.Task):
    """Luigi task for web scraping."""
    
    date = luigi.DateParameter(default=datetime.today().date())
    source = luigi.Parameter()
    
    def output(self):
        """Task output."""
        return luigi.LocalTarget(
            f"data/raw/{self.source}_{self.date}.json"
        )
    
    def run(self):
        """Run scraping task."""
        # Scraping logic here
        data = []  # Scraped data
        
        with self.output().open('w') as f:
            json.dump(data, f)

class ProcessingTask(luigi.Task):
    """Luigi task for data processing."""
    
    date = luigi.DateParameter(default=datetime.today().date())
    source = luigi.Parameter()
    
    def requires(self):
        """Task dependencies."""
        return ScrapingTask(date=self.date, source=self.source)
    
    def output(self):
        """Task output."""
        return luigi.LocalTarget(
            f"data/processed/{self.source}_{self.date}.json"
        )
    
    def run(self):
        """Run processing task."""
        # Load scraped data
        with self.input().open('r') as f:
            data = json.load(f)
        
        # Process data
        processor = DataTransformer()
        processed = [processor.transform(item) for item in data]
        
        # Save processed data
        with self.output().open('w') as f:
            json.dump(processed, f)

# Example usage
if __name__ == "__main__":
    print("šŸš€ Data Pipeline Examples\n")
    
    # Example 1: Create ETL pipeline
    print("1ļøāƒ£ ETL Pipeline Setup:")
    
    etl_pipeline = ETLPipeline(
        name="product_pipeline",
        extractors=[],  # Add extractors
        transformers=[
            PriceNormalizer("USD"),
            TextCleaner(),
            FeatureEnricher()
        ],
        loaders=[]  # Add loaders
    )
    
    print(f"   Pipeline created: {etl_pipeline.name}")
    print(f"   Transformers: {len(etl_pipeline.transformers)}")
    
    # Example 2: Data validation
    print("\n2ļøāƒ£ Data Validation:")
    
    validator = DataValidator(schema=ProductData)
    
    test_data = {
        "id": "123",
        "name": "Test Product",
        "price": 99.99,
        "currency": "USD",
        "scraped_at": datetime.now(),
        "source_url": "https://example.com/product/123"
    }
    
    is_valid, cleaned = validator.validate(test_data)
    print(f"   Validation result: {is_valid}")
    print(f"   Data quality: {validator.check_quality(cleaned)}")
    
    # Example 3: Stream processing
    print("\n3ļøāƒ£ Stream Processing:")
    
    stream = StreamProcessor(
        name="product_stream",
        batch_size=10,
        flush_interval=5
    )
    
    print(f"   Stream processor: {stream.name}")
    print(f"   Batch size: {stream.batch_size}")
    print(f"   Flush interval: {stream.flush_interval}s")
    
    # Example 4: Data transformation
    print("\n4ļøāƒ£ Data Transformation:")
    
    # Price normalization
    price_normalizer = PriceNormalizer("USD")
    
    product = {
        "price": 85.50,
        "currency": "EUR"
    }
    
    normalized = price_normalizer.transform(product.copy())
    print(f"   Original: {product['price']} {product['currency']}")
    print(f"   Normalized: {normalized['price']} {normalized['currency']}")
    
    # Feature enrichment
    enricher = FeatureEnricher()
    
    product_data = {
        "price": 150,
        "rating": 4.5,
        "review_count": 250,
        "in_stock": True
    }
    
    enriched = enricher.transform(product_data.copy())
    print(f"   Price tier: {enriched.get('price_tier')}")
    print(f"   Quality score: {enriched.get('quality_score'):.2f}")
    
    # Example 5: Storage backends
    print("\n5ļøāƒ£ Storage Backends:")
    
    storage_options = [
        ("PostgreSQL", "Relational database for structured data"),
        ("MongoDB", "Document store for flexible schemas"),
        ("Elasticsearch", "Full-text search and analytics"),
        ("Redis", "In-memory cache for fast access"),
        ("S3/GCS", "Object storage for raw data"),
        ("Kafka", "Message queue for streaming")
    ]
    
    for storage, description in storage_options:
        print(f"   {storage}: {description}")
    
    # Example 6: Batch processing
    print("\n6ļøāƒ£ Batch Processing:")
    
    batch_processor = BatchProcessor(batch_size=100, num_workers=4)
    
    print(f"   Batch size: {batch_processor.batch_size}")
    print(f"   Workers: {batch_processor.num_workers}")
    print("   Processing modes:")
    print("     • Parallel processing with multiprocessing")
    print("     • Chunked processing for memory efficiency")
    print("     • Fault tolerance with retry logic")
    
    # Example 7: Pipeline metrics
    print("\n7ļøāƒ£ Pipeline Metrics:")
    
    metrics = PipelineMetrics(
        items_processed=10000,
        items_failed=50,
        processing_time=120.5
    )
    
    metrics.throughput = metrics.items_processed / metrics.processing_time
    metrics.error_rate = metrics.items_failed / metrics.items_processed
    
    print(f"   Items processed: {metrics.items_processed:,}")
    print(f"   Throughput: {metrics.throughput:.1f} items/sec")
    print(f"   Error rate: {metrics.error_rate:.2%}")
    
    # Example 8: Data quality levels
    print("\n8ļøāƒ£ Data Quality Assessment:")
    
    quality_checks = [
        ("Completeness", "All required fields present"),
        ("Accuracy", "Values within expected ranges"),
        ("Consistency", "Format and structure uniform"),
        ("Timeliness", "Data is recent and relevant"),
        ("Uniqueness", "No duplicate records"),
        ("Validity", "Conforms to business rules")
    ]
    
    for check, description in quality_checks:
        print(f"   {check}: {description}")
    
    # Example 9: Pipeline patterns
    print("\n9ļøāƒ£ Common Pipeline Patterns:")
    
    patterns = [
        ("Lambda Architecture", "Batch + Stream processing"),
        ("Kappa Architecture", "Stream-only processing"),
        ("Medallion Architecture", "Bronze → Silver → Gold layers"),
        ("Event Sourcing", "Store all state changes"),
        ("CQRS", "Separate read and write models"),
        ("Microservices", "Distributed pipeline components")
    ]
    
    for pattern, description in patterns:
        print(f"   {pattern}: {description}")
    
    # Example 10: Best practices
    print("\nšŸ”Ÿ Pipeline Best Practices:")
    
    best_practices = [
        "šŸ” Validate data early and often",
        "šŸ“Š Monitor pipeline metrics continuously",
        "šŸ”„ Implement idempotent operations",
        "šŸ’¾ Use checkpointing for recovery",
        "⚔ Optimize for throughput vs latency",
        "šŸ›”ļø Handle failures gracefully",
        "šŸ“ˆ Scale horizontally when possible",
        "šŸ·ļø Version your data schemas",
        "šŸ” Secure sensitive data in transit/rest",
        "šŸ“ Log everything for debugging"
    ]
    
    for practice in best_practices:
        print(f"   {practice}")
    
    print("\nāœ… Data pipeline demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Data Pipeline Best Practices šŸ“‹

Pro Tip: Think of data pipelines as assembly lines in a factory - each station has a specific job, quality checks happen throughout, and the whole system should handle hiccups without stopping production. Start with validation - bad data is worse than no data. Use schemas (Pydantic, JSON Schema) to enforce structure. Transform data incrementally with clear, testable functions. Choose storage based on your needs: SQL for structured queries, NoSQL for flexibility, object storage for raw data, and caches for speed. Implement both batch and stream processing - batch for historical data, streams for real-time. Monitor everything: throughput, latency, error rates, data quality scores. Use orchestrators like Airflow or Luigi for complex workflows. Always design for failure - implement retries, dead letter queues, and circuit breakers. Most importantly: make your pipelines idempotent so they can be safely re-run without duplicating data!

Mastering data pipelines transforms you from a data collector to a data engineer. You can now build robust, scalable systems that process millions of records reliably. Whether you're building ETL pipelines, real-time streams, or complex data workflows, these pipeline skills ensure your data flows smoothly from source to insight! 🌊