Skip to main content

šŸ¤– Scraping Ethics and robots.txt: Scrape Responsibly

Web scraping isn't just about technical skills - it's about being a responsible digital citizen. Like archaeologists who carefully excavate sites without damaging them, ethical scrapers extract data while respecting websites and their owners. robots.txt is your guidebook, rate limiting is your speed limit, and ethics are your compass. Let's learn to scrape with integrity! āš–ļø

The Ethics and Compliance Framework

Think of ethical scraping as visiting someone's home - you knock before entering, respect their rules, don't take more than offered, and leave everything as you found it. robots.txt is the "house rules" posted at the door, telling you which rooms you can enter and how fast you can move through them. Master these principles, and you'll be welcome everywhere!

graph TB A[Ethical Web Scraping] --> B[Legal Compliance] A --> C[Technical Respect] A --> D[Business Ethics] A --> E[Data Privacy] B --> F[Terms of Service] B --> G[Copyright Law] B --> H[GDPR/CCPA] B --> I[Local Regulations] C --> J[robots.txt] C --> K[Rate Limiting] C --> L[User-Agent] C --> M[Crawl-Delay] D --> N[Fair Use] D --> O[Attribution] D --> P[Competition] D --> Q[Monetization] E --> R[PII Handling] E --> S[Data Storage] E --> T[Consent] E --> U[Anonymization] J --> V[Parser] K --> W[Throttling] L --> X[Identification] M --> Y[Politeness] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b

Real-World Scenario: The Ethical Data Platform 🌟

You're building a data aggregation platform that collects information from hundreds of websites. You need to respect each site's scraping policies, handle rate limits gracefully, protect user privacy, comply with regulations, and maintain good relationships with website owners. Let's build a comprehensive ethical scraping framework!

import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
import time
import hashlib
import re
from typing import Dict, List, Optional, Tuple, Set, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
from pathlib import Path
import json
import sqlite3
from bs4 import BeautifulSoup
import random
from collections import defaultdict, deque
import threading
from functools import wraps

class ComplianceLevel(Enum):
    """Compliance levels for scraping."""
    STRICT = "strict"      # Follow all rules strictly
    MODERATE = "moderate"  # Follow most rules with some flexibility
    RELAXED = "relaxed"    # Basic compliance only
    CUSTOM = "custom"      # Custom rules

class ScrapingPermission(Enum):
    """Permission levels for scraping."""
    ALLOWED = "allowed"
    DISALLOWED = "disallowed"
    CONDITIONAL = "conditional"
    UNKNOWN = "unknown"

@dataclass
class RobotRules:
    """Parsed robot.txt rules for a domain."""
    domain: str
    rules: Dict[str, Any]
    crawl_delay: Optional[float]
    request_rate: Optional[Tuple[int, int]]  # (requests, seconds)
    sitemap: Optional[str]
    disallowed_paths: List[str]
    allowed_paths: List[str]
    last_checked: datetime

@dataclass
class EthicalPolicy:
    """Ethical scraping policy configuration."""
    respect_robots_txt: bool = True
    identify_bot: bool = True
    rate_limit: float = 1.0  # Seconds between requests
    max_concurrent: int = 1
    retry_after_error: bool = True
    max_retries: int = 3
    handle_personal_data: bool = False
    cache_pages: bool = True
    cache_duration: int = 3600  # Seconds
    rotate_user_agents: bool = False
    use_proxy: bool = False
    respect_nofollow: bool = True
    respect_noindex: bool = True

class RobotsChecker:
    """
    Comprehensive robots.txt parser and compliance checker.
    """
    
    def __init__(self, user_agent: str = "Python-EthicalScraper/1.0"):
        self.user_agent = user_agent
        self.robots_cache = {}
        self.setup_logging()
        
    def setup_logging(self):
        """Setup logging configuration."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def fetch_robots_txt(self, url: str) -> Optional[str]:
        """Fetch robots.txt for a domain."""
        parsed_url = urlparse(url)
        robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
        
        try:
            response = requests.get(
                robots_url,
                timeout=10,
                headers={'User-Agent': self.user_agent}
            )
            
            if response.status_code == 200:
                self.logger.info(f"Fetched robots.txt from {robots_url}")
                return response.text
            else:
                self.logger.warning(f"No robots.txt found at {robots_url}")
                return None
                
        except Exception as e:
            self.logger.error(f"Error fetching robots.txt: {e}")
            return None
    
    def parse_robots_txt(self, url: str, robots_content: str = None) -> RobotRules:
        """Parse robots.txt content."""
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        
        # Check cache first
        if domain in self.robots_cache:
            cached = self.robots_cache[domain]
            if (datetime.now() - cached.last_checked).seconds < 86400:  # 24 hours
                return cached
        
        # Fetch if not provided
        if robots_content is None:
            robots_content = self.fetch_robots_txt(url)
        
        rules = RobotRules(
            domain=domain,
            rules={},
            crawl_delay=None,
            request_rate=None,
            sitemap=None,
            disallowed_paths=[],
            allowed_paths=[],
            last_checked=datetime.now()
        )
        
        if robots_content:
            # Parse using RobotFileParser
            rp = RobotFileParser()
            rp.parse(robots_content.splitlines())
            
            # Extract rules manually for detailed analysis
            current_user_agent = None
            
            for line in robots_content.splitlines():
                line = line.strip()
                
                # Skip comments and empty lines
                if not line or line.startswith('#'):
                    continue
                
                # Parse directives
                if ':' in line:
                    directive, value = line.split(':', 1)
                    directive = directive.strip().lower()
                    value = value.strip()
                    
                    if directive == 'user-agent':
                        current_user_agent = value
                        if current_user_agent not in rules.rules:
                            rules.rules[current_user_agent] = {
                                'disallow': [],
                                'allow': [],
                                'crawl-delay': None
                            }
                    
                    elif current_user_agent:
                        if directive == 'disallow':
                            rules.rules[current_user_agent]['disallow'].append(value)
                            if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
                                rules.disallowed_paths.append(value)
                        
                        elif directive == 'allow':
                            rules.rules[current_user_agent]['allow'].append(value)
                            if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
                                rules.allowed_paths.append(value)
                        
                        elif directive == 'crawl-delay':
                            try:
                                delay = float(value)
                                rules.rules[current_user_agent]['crawl-delay'] = delay
                                if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
                                    rules.crawl_delay = delay
                            except:
                                pass
                        
                        elif directive == 'request-rate':
                            # Format: 1/5 (1 request per 5 seconds)
                            if '/' in value:
                                try:
                                    requests, seconds = value.split('/')
                                    rules.request_rate = (int(requests), int(seconds))
                                except:
                                    pass
                    
                    # Global directives
                    if directive == 'sitemap':
                        rules.sitemap = value
        
        # Cache the results
        self.robots_cache[domain] = rules
        
        return rules
    
    def can_fetch(self, url: str) -> Tuple[bool, str]:
        """
        Check if URL can be fetched according to robots.txt.
        Returns (allowed, reason).
        """
        rules = self.parse_robots_txt(url)
        parsed_url = urlparse(url)
        path = parsed_url.path
        
        # Check disallowed paths
        for disallowed in rules.disallowed_paths:
            if disallowed and path.startswith(disallowed):
                # Check if there's an allow rule that overrides
                for allowed in rules.allowed_paths:
                    if allowed and path.startswith(allowed) and len(allowed) > len(disallowed):
                        return True, f"Allowed by rule: {allowed}"
                
                return False, f"Disallowed by robots.txt: {disallowed}"
        
        # If no specific rules, default to allowed
        return True, "No restrictions in robots.txt"
    
    def get_crawl_delay(self, url: str) -> Optional[float]:
        """Get crawl delay for a domain."""
        rules = self.parse_robots_txt(url)
        return rules.crawl_delay
    
    def get_sitemap(self, url: str) -> Optional[str]:
        """Get sitemap URL from robots.txt."""
        rules = self.parse_robots_txt(url)
        return rules.sitemap

class EthicalScraper:
    """
    Ethical web scraper with compliance and rate limiting.
    """
    
    def __init__(self, policy: EthicalPolicy = None):
        self.policy = policy or EthicalPolicy()
        self.robots_checker = RobotsChecker()
        self.session = self._create_session()
        
        # Rate limiting
        self.domain_last_request = {}
        self.request_queue = defaultdict(deque)
        self.rate_limiter = RateLimiter()
        
        # Caching
        self.cache = ResponseCache() if self.policy.cache_pages else None
        
        # Metrics
        self.metrics = ScrapingMetrics()
        
        self.setup_logging()
    
    def setup_logging(self):
        """Setup logging configuration."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def _create_session(self) -> requests.Session:
        """Create configured session."""
        session = requests.Session()
        
        if self.policy.identify_bot:
            session.headers.update({
                'User-Agent': 'Python-EthicalScraper/1.0 (+https://example.com/bot)'
            })
        elif self.policy.rotate_user_agents:
            # Rotate between common user agents
            user_agents = [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
                'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
            ]
            session.headers['User-Agent'] = random.choice(user_agents)
        
        return session
    
    def scrape(self, url: str, **kwargs) -> Optional[requests.Response]:
        """
        Ethically scrape a URL with all compliance checks.
        """
        # Check robots.txt compliance
        if self.policy.respect_robots_txt:
            allowed, reason = self.robots_checker.can_fetch(url)
            if not allowed:
                self.logger.warning(f"Scraping not allowed for {url}: {reason}")
                self.metrics.record_blocked(url, reason)
                return None
        
        # Check cache
        if self.cache:
            cached_response = self.cache.get(url)
            if cached_response:
                self.logger.info(f"Using cached response for {url}")
                self.metrics.record_cache_hit(url)
                return cached_response
        
        # Apply rate limiting
        domain = urlparse(url).netloc
        self._apply_rate_limit(domain)
        
        # Make request
        try:
            response = self.session.get(url, **kwargs)
            
            # Check for rate limit headers
            self._check_rate_limit_headers(response)
            
            # Cache response if successful
            if self.cache and response.status_code == 200:
                self.cache.store(url, response)
            
            # Record metrics
            self.metrics.record_request(url, response.status_code)
            
            # Check for personal data if configured
            if self.policy.handle_personal_data:
                self._check_personal_data(response)
            
            return response
            
        except Exception as e:
            self.logger.error(f"Error scraping {url}: {e}")
            self.metrics.record_error(url, str(e))
            
            if self.policy.retry_after_error:
                return self._retry_request(url, **kwargs)
            
            return None
    
    def _apply_rate_limit(self, domain: str):
        """Apply rate limiting for a domain."""
        # Check robots.txt crawl delay
        crawl_delay = self.robots_checker.get_crawl_delay(f"https://{domain}")
        if crawl_delay:
            delay = max(crawl_delay, self.policy.rate_limit)
        else:
            delay = self.policy.rate_limit
        
        # Check last request time
        if domain in self.domain_last_request:
            elapsed = time.time() - self.domain_last_request[domain]
            if elapsed < delay:
                sleep_time = delay - elapsed
                self.logger.info(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}")
                time.sleep(sleep_time)
        
        self.domain_last_request[domain] = time.time()
    
    def _check_rate_limit_headers(self, response: requests.Response):
        """Check and respect rate limit headers."""
        # Check for Retry-After header
        if 'Retry-After' in response.headers:
            retry_after = response.headers['Retry-After']
            try:
                # Could be seconds or HTTP date
                sleep_time = int(retry_after)
                self.logger.warning(f"Server requested retry after {sleep_time} seconds")
                time.sleep(sleep_time)
            except:
                pass
        
        # Check for X-RateLimit headers
        if 'X-RateLimit-Remaining' in response.headers:
            remaining = int(response.headers['X-RateLimit-Remaining'])
            if remaining <= 0:
                reset_time = response.headers.get('X-RateLimit-Reset')
                if reset_time:
                    sleep_until = int(reset_time) - time.time()
                    if sleep_until > 0:
                        self.logger.warning(f"Rate limit exceeded, sleeping {sleep_until}s")
                        time.sleep(sleep_until)
    
    def _check_personal_data(self, response: requests.Response):
        """Check for and handle personal data."""
        # Simple PII detection patterns
        pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
        }
        
        text = response.text
        detected_pii = []
        
        for pii_type, pattern in pii_patterns.items():
            if re.search(pattern, text):
                detected_pii.append(pii_type)
        
        if detected_pii:
            self.logger.warning(f"Potential PII detected: {detected_pii}")
            self.metrics.record_pii_detection(response.url, detected_pii)
    
    def _retry_request(self, url: str, **kwargs) -> Optional[requests.Response]:
        """Retry failed request with exponential backoff."""
        for attempt in range(self.policy.max_retries):
            sleep_time = 2 ** attempt
            self.logger.info(f"Retrying {url} after {sleep_time}s (attempt {attempt + 1})")
            time.sleep(sleep_time)
            
            try:
                response = self.session.get(url, **kwargs)
                if response.status_code == 200:
                    return response
            except:
                continue
        
        return None
    
    def scrape_sitemap(self, url: str) -> List[str]:
        """Scrape URLs from sitemap (ethical way to discover URLs)."""
        sitemap_url = self.robots_checker.get_sitemap(url)
        
        if not sitemap_url:
            # Try common sitemap locations
            parsed_url = urlparse(url)
            base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
            sitemap_url = f"{base_url}/sitemap.xml"
        
        try:
            response = self.scrape(sitemap_url)
            if response and response.status_code == 200:
                # Parse sitemap
                soup = BeautifulSoup(response.content, 'xml')
                urls = []
                
                for loc in soup.find_all('loc'):
                    urls.append(loc.text)
                
                self.logger.info(f"Found {len(urls)} URLs in sitemap")
                return urls
        except:
            pass
        
        return []

class RateLimiter:
    """
    Advanced rate limiting with multiple strategies.
    """
    
    def __init__(self):
        self.domain_buckets = defaultdict(lambda: TokenBucket())
        self.global_bucket = TokenBucket(rate=10, capacity=100)
    
    def check_rate_limit(self, domain: str) -> bool:
        """Check if request is allowed."""
        # Check domain-specific limit
        if not self.domain_buckets[domain].consume():
            return False
        
        # Check global limit
        if not self.global_bucket.consume():
            return False
        
        return True
    
    def set_domain_rate(self, domain: str, requests_per_second: float):
        """Set rate limit for specific domain."""
        self.domain_buckets[domain] = TokenBucket(
            rate=requests_per_second,
            capacity=int(requests_per_second * 10)
        )

class TokenBucket:
    """Token bucket for rate limiting."""
    
    def __init__(self, rate: float = 1, capacity: int = 10):
        self.rate = rate  # Tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        """Try to consume tokens."""
        with self.lock:
            # Refill bucket
            now = time.time()
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            # Check if enough tokens
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False

class ResponseCache:
    """Cache for scraped responses."""
    
    def __init__(self, cache_dir: str = "./cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.index = {}
        self.load_index()
    
    def load_index(self):
        """Load cache index."""
        index_file = self.cache_dir / "index.json"
        if index_file.exists():
            with open(index_file, 'r') as f:
                self.index = json.load(f)
    
    def save_index(self):
        """Save cache index."""
        with open(self.cache_dir / "index.json", 'w') as f:
            json.dump(self.index, f)
    
    def get_cache_key(self, url: str) -> str:
        """Generate cache key for URL."""
        return hashlib.md5(url.encode()).hexdigest()
    
    def get(self, url: str) -> Optional[requests.Response]:
        """Get cached response."""
        cache_key = self.get_cache_key(url)
        
        if cache_key in self.index:
            cache_info = self.index[cache_key]
            
            # Check expiration
            if time.time() - cache_info['timestamp'] > cache_info['ttl']:
                return None
            
            # Load cached response
            cache_file = self.cache_dir / f"{cache_key}.cache"
            if cache_file.exists():
                with open(cache_file, 'rb') as f:
                    response = pickle.load(f)
                    return response
        
        return None
    
    def store(self, url: str, response: requests.Response, ttl: int = 3600):
        """Store response in cache."""
        cache_key = self.get_cache_key(url)
        
        # Store response
        cache_file = self.cache_dir / f"{cache_key}.cache"
        with open(cache_file, 'wb') as f:
            pickle.dump(response, f)
        
        # Update index
        self.index[cache_key] = {
            'url': url,
            'timestamp': time.time(),
            'ttl': ttl
        }
        self.save_index()

class ScrapingMetrics:
    """Track scraping metrics for monitoring."""
    
    def __init__(self):
        self.total_requests = 0
        self.successful_requests = 0
        self.blocked_requests = 0
        self.errors = 0
        self.cache_hits = 0
        self.pii_detections = 0
        self.domain_stats = defaultdict(lambda: {'requests': 0, 'errors': 0})
        
        # Setup database
        self.setup_database()
    
    def setup_database(self):
        """Setup metrics database."""
        self.db = sqlite3.connect('scraping_metrics.db')
        self.db.execute('''
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                url TEXT,
                domain TEXT,
                status_code INTEGER,
                response_time REAL,
                blocked BOOLEAN,
                cached BOOLEAN,
                error TEXT
            )
        ''')
        
        self.db.execute('''
            CREATE TABLE IF NOT EXISTS pii_detections (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                url TEXT,
                pii_types TEXT
            )
        ''')
    
    def record_request(self, url: str, status_code: int, response_time: float = 0):
        """Record successful request."""
        self.total_requests += 1
        self.successful_requests += 1
        
        domain = urlparse(url).netloc
        self.domain_stats[domain]['requests'] += 1
        
        self.db.execute('''
            INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (datetime.now(), url, domain, status_code, response_time, False, False, None))
        self.db.commit()
    
    def record_blocked(self, url: str, reason: str):
        """Record blocked request."""
        self.total_requests += 1
        self.blocked_requests += 1
        
        domain = urlparse(url).netloc
        
        self.db.execute('''
            INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (datetime.now(), url, domain, None, 0, True, False, reason))
        self.db.commit()
    
    def record_error(self, url: str, error: str):
        """Record error."""
        self.total_requests += 1
        self.errors += 1
        
        domain = urlparse(url).netloc
        self.domain_stats[domain]['errors'] += 1
        
        self.db.execute('''
            INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (datetime.now(), url, domain, None, 0, False, False, error))
        self.db.commit()
    
    def record_cache_hit(self, url: str):
        """Record cache hit."""
        self.cache_hits += 1
        
        domain = urlparse(url).netloc
        
        self.db.execute('''
            INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (datetime.now(), url, domain, 200, 0, False, True, None))
        self.db.commit()
    
    def record_pii_detection(self, url: str, pii_types: List[str]):
        """Record PII detection."""
        self.pii_detections += 1
        
        self.db.execute('''
            INSERT INTO pii_detections (timestamp, url, pii_types)
            VALUES (?, ?, ?)
        ''', (datetime.now(), url, json.dumps(pii_types)))
        self.db.commit()
    
    def get_summary(self) -> Dict:
        """Get metrics summary."""
        return {
            'total_requests': self.total_requests,
            'successful_requests': self.successful_requests,
            'blocked_requests': self.blocked_requests,
            'errors': self.errors,
            'cache_hits': self.cache_hits,
            'cache_hit_rate': self.cache_hits / self.total_requests if self.total_requests > 0 else 0,
            'success_rate': self.successful_requests / self.total_requests if self.total_requests > 0 else 0,
            'pii_detections': self.pii_detections,
            'top_domains': sorted(
                self.domain_stats.items(),
                key=lambda x: x[1]['requests'],
                reverse=True
            )[:10]
        }

class EthicalGuidelines:
    """
    Ethical guidelines and best practices for web scraping.
    """
    
    @staticmethod
    def get_guidelines() -> Dict[str, List[str]]:
        """Get comprehensive ethical guidelines."""
        return {
            'Legal Considerations': [
                'āœ… Always check and comply with Terms of Service',
                'āœ… Respect copyright and intellectual property',
                'āœ… Comply with data protection laws (GDPR, CCPA)',
                'āœ… Obtain consent when scraping personal data',
                'āŒ Never scrape passwords or private information',
                'āŒ Avoid scraping behind login walls without permission',
                'āš ļø Consider fair use doctrine for research/education'
            ],
            
            'Technical Respect': [
                'āœ… Always check robots.txt',
                'āœ… Implement rate limiting (1-2 seconds between requests)',
                'āœ… Use descriptive User-Agent with contact info',
                'āœ… Handle errors gracefully',
                'āœ… Cache responses to minimize requests',
                'āŒ Don\'t overwhelm servers with concurrent requests',
                'āŒ Avoid scraping during peak hours',
                'āš ļø Monitor server response times and adjust'
            ],
            
            'Business Ethics': [
                'āœ… Give attribution when using scraped data',
                'āœ… Consider the website\'s business model',
                'āœ… Use APIs when available instead of scraping',
                'āŒ Don\'t resell scraped data without permission',
                'āŒ Avoid undermining the website\'s revenue',
                'āš ļø Consider reaching out to website owners'
            ],
            
            'Data Privacy': [
                'āœ… Anonymize personal information',
                'āœ… Implement data retention policies',
                'āœ… Secure stored data properly',
                'āœ… Allow users to opt-out',
                'āŒ Never store sensitive personal data',
                'āŒ Don\'t combine data to identify individuals',
                'āš ļø Be transparent about data collection'
            ],
            
            'Best Practices': [
                'āœ… Start small and scale gradually',
                'āœ… Monitor your impact on the website',
                'āœ… Implement exponential backoff for errors',
                'āœ… Respect meta tags (noindex, nofollow)',
                'āœ… Use sitemap.xml for URL discovery',
                'āœ… Keep logs for accountability',
                'āš ļø Be prepared to stop if asked'
            ]
        }
    
    @staticmethod
    def check_compliance(url: str, policy: EthicalPolicy) -> Dict[str, Any]:
        """Check compliance for a URL."""
        compliance_report = {
            'url': url,
            'timestamp': datetime.now(),
            'checks': {},
            'recommendations': []
        }
        
        # Check robots.txt
        checker = RobotsChecker()
        allowed, reason = checker.can_fetch(url)
        compliance_report['checks']['robots_txt'] = {
            'allowed': allowed,
            'reason': reason
        }
        
        if not allowed and policy.respect_robots_txt:
            compliance_report['recommendations'].append(
                "This URL is disallowed by robots.txt. Consider using the sitemap or API instead."
            )
        
        # Check crawl delay
        crawl_delay = checker.get_crawl_delay(url)
        if crawl_delay:
            compliance_report['checks']['crawl_delay'] = crawl_delay
            if policy.rate_limit < crawl_delay:
                compliance_report['recommendations'].append(
                    f"Increase rate limit to at least {crawl_delay} seconds"
                )
        
        # Check for API availability
        # (This would require actual checking of common API endpoints)
        
        return compliance_report

# Example usage
if __name__ == "__main__":
    print("šŸ¤– Ethical Web Scraping Examples\n")
    
    # Example 1: Check robots.txt
    print("1ļøāƒ£ Checking robots.txt:")
    
    checker = RobotsChecker()
    test_url = "https://example.com/products/item123"
    
    # Check if URL can be scraped
    allowed, reason = checker.can_fetch(test_url)
    print(f"   URL: {test_url}")
    print(f"   Allowed: {'āœ…' if allowed else 'āŒ'} {reason}")
    
    # Get crawl delay
    crawl_delay = checker.get_crawl_delay(test_url)
    if crawl_delay:
        print(f"   Crawl delay: {crawl_delay} seconds")
    
    # Example 2: Ethical scraping with policy
    print("\n2ļøāƒ£ Ethical Scraping:")
    
    # Create strict ethical policy
    strict_policy = EthicalPolicy(
        respect_robots_txt=True,
        identify_bot=True,
        rate_limit=2.0,
        cache_pages=True,
        handle_personal_data=True
    )
    
    scraper = EthicalScraper(strict_policy)
    
    # Scrape ethically
    test_urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/delay/1"
    ]
    
    for url in test_urls:
        print(f"\n   Scraping: {url}")
        response = scraper.scrape(url, timeout=10)
        if response:
            print(f"   Status: {response.status_code}")
            print(f"   Cached: {scraper.cache.get(url) is not None if scraper.cache else False}")
    
    # Example 3: Rate limiting
    print("\n3ļøāƒ£ Rate Limiting:")
    
    rate_limiter = RateLimiter()
    
    # Set custom rate for domain
    rate_limiter.set_domain_rate("api.example.com", 0.5)  # 0.5 requests per second
    
    # Test rate limiting
    domain = "api.example.com"
    for i in range(5):
        allowed = rate_limiter.check_rate_limit(domain)
        print(f"   Request {i+1}: {'āœ… Allowed' if allowed else 'ā³ Rate limited'}")
        if allowed:
            time.sleep(0.1)  # Simulate request
    
    # Example 4: Scraping metrics
    print("\n4ļøāƒ£ Scraping Metrics:")
    
    metrics = scraper.metrics.get_summary()
    print(f"   Total requests: {metrics['total_requests']}")
    print(f"   Success rate: {metrics['success_rate']:.1%}")
    print(f"   Cache hit rate: {metrics['cache_hit_rate']:.1%}")
    print(f"   Blocked requests: {metrics['blocked_requests']}")
    
    # Example 5: Ethical guidelines
    print("\n5ļøāƒ£ Ethical Guidelines:")
    
    guidelines = EthicalGuidelines.get_guidelines()
    
    for category, rules in guidelines.items():
        print(f"\n   {category}:")
        for rule in rules[:3]:  # Show first 3 rules
            print(f"     {rule}")
    
    # Example 6: Compliance check
    print("\n6ļøāƒ£ Compliance Check:")
    
    compliance = EthicalGuidelines.check_compliance(
        "https://example.com/data",
        strict_policy
    )
    
    print(f"   URL: {compliance['url']}")
    print(f"   robots.txt: {compliance['checks'].get('robots_txt', {}).get('allowed', 'Unknown')}")
    
    if compliance['recommendations']:
        print("   Recommendations:")
        for rec in compliance['recommendations']:
            print(f"     - {rec}")
    
    # Example 7: Sitemap parsing
    print("\n7ļøāƒ£ Sitemap Discovery:")
    
    # This would actually fetch and parse a sitemap
    print("   Checking for sitemap...")
    sitemap_urls = scraper.scrape_sitemap("https://example.com")
    print(f"   Found {len(sitemap_urls)} URLs in sitemap")
    
    # Example 8: Token bucket demonstration
    print("\n8ļøāƒ£ Token Bucket Rate Limiting:")
    
    bucket = TokenBucket(rate=2, capacity=5)  # 2 tokens/second, max 5
    
    print("   Consuming tokens rapidly:")
    for i in range(8):
        success = bucket.consume()
        print(f"     Token {i+1}: {'āœ…' if success else 'āŒ'}")
        time.sleep(0.3)
    
    # Example 9: PII detection
    print("\n9ļøāƒ£ PII Detection:")
    
    # Simulate response with PII
    class MockResponse:
        def __init__(self):
            self.text = "Contact: john@example.com, Phone: 555-123-4567"
            self.url = "https://example.com/contact"
    
    mock_response = MockResponse()
    scraper._check_personal_data(mock_response)
    
    print(f"   PII detections: {scraper.metrics.pii_detections}")
    
    # Example 10: Best practices summary
    print("\nšŸ”Ÿ Best Practices Summary:")
    
    best_practices = [
        "šŸ“ Always check robots.txt before scraping",
        "ā±ļø Implement rate limiting (1-2 seconds minimum)",
        "šŸ·ļø Use descriptive User-Agent with contact info",
        "šŸ’¾ Cache responses to minimize server load",
        "šŸ”’ Protect personal data and respect privacy",
        "šŸ“œ Comply with Terms of Service and laws",
        "šŸ¤ Consider reaching out to website owners",
        "šŸ“Š Monitor your impact and adjust accordingly"
    ]
    
    for practice in best_practices:
        print(f"   {practice}")
    
    print("\nāœ… Ethical web scraping demonstration complete!")
    print("\nāš–ļø Remember: With great scraping power comes great responsibility!")

Key Takeaways and Best Practices šŸŽÆ

Ethical Scraping Best Practices šŸ“‹

Pro Tip: Ethical scraping isn't just about following rules - it's about being a good citizen of the web. Think of websites as digital properties: you wouldn't break into someone's house just because the door is unlocked. Always start by checking robots.txt, implement reasonable rate limiting (even if not specified), and identify yourself honestly. Cache aggressively to minimize requests, handle errors gracefully, and be prepared to stop if asked. Remember that behind every website is a business or individual who pays for bandwidth and servers. If you're building a commercial scraper, consider reaching out to website owners - many are happy to work with you if you're transparent. Most importantly: just because you CAN scrape something doesn't mean you SHOULD. Always ask yourself: "Would I be okay if someone scraped my website this way?" The golden rule applies to web scraping too!

Mastering ethical web scraping transforms you from a data taker to a responsible data gatherer. You now understand the legal, technical, and ethical considerations that separate professional scrapers from script kiddies. Whether you're conducting research, building products, or gathering competitive intelligence, these ethical principles ensure your scraping is sustainable, legal, and respectful. Remember: the goal isn't just to get data, but to get it in a way that maintains the health and openness of the web! 🌐