š¤ Scraping Ethics and robots.txt: Scrape Responsibly
Web scraping isn't just about technical skills - it's about being a responsible digital citizen. Like archaeologists who carefully excavate sites without damaging them, ethical scrapers extract data while respecting websites and their owners. robots.txt is your guidebook, rate limiting is your speed limit, and ethics are your compass. Let's learn to scrape with integrity! āļø
The Ethics and Compliance Framework
Think of ethical scraping as visiting someone's home - you knock before entering, respect their rules, don't take more than offered, and leave everything as you found it. robots.txt is the "house rules" posted at the door, telling you which rooms you can enter and how fast you can move through them. Master these principles, and you'll be welcome everywhere!
Real-World Scenario: The Ethical Data Platform š
You're building a data aggregation platform that collects information from hundreds of websites. You need to respect each site's scraping policies, handle rate limits gracefully, protect user privacy, comply with regulations, and maintain good relationships with website owners. Let's build a comprehensive ethical scraping framework!
import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
import time
import hashlib
import re
from typing import Dict, List, Optional, Tuple, Set, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
from pathlib import Path
import json
import sqlite3
from bs4 import BeautifulSoup
import random
from collections import defaultdict, deque
import threading
from functools import wraps
class ComplianceLevel(Enum):
"""Compliance levels for scraping."""
STRICT = "strict" # Follow all rules strictly
MODERATE = "moderate" # Follow most rules with some flexibility
RELAXED = "relaxed" # Basic compliance only
CUSTOM = "custom" # Custom rules
class ScrapingPermission(Enum):
"""Permission levels for scraping."""
ALLOWED = "allowed"
DISALLOWED = "disallowed"
CONDITIONAL = "conditional"
UNKNOWN = "unknown"
@dataclass
class RobotRules:
"""Parsed robot.txt rules for a domain."""
domain: str
rules: Dict[str, Any]
crawl_delay: Optional[float]
request_rate: Optional[Tuple[int, int]] # (requests, seconds)
sitemap: Optional[str]
disallowed_paths: List[str]
allowed_paths: List[str]
last_checked: datetime
@dataclass
class EthicalPolicy:
"""Ethical scraping policy configuration."""
respect_robots_txt: bool = True
identify_bot: bool = True
rate_limit: float = 1.0 # Seconds between requests
max_concurrent: int = 1
retry_after_error: bool = True
max_retries: int = 3
handle_personal_data: bool = False
cache_pages: bool = True
cache_duration: int = 3600 # Seconds
rotate_user_agents: bool = False
use_proxy: bool = False
respect_nofollow: bool = True
respect_noindex: bool = True
class RobotsChecker:
"""
Comprehensive robots.txt parser and compliance checker.
"""
def __init__(self, user_agent: str = "Python-EthicalScraper/1.0"):
self.user_agent = user_agent
self.robots_cache = {}
self.setup_logging()
def setup_logging(self):
"""Setup logging configuration."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def fetch_robots_txt(self, url: str) -> Optional[str]:
"""Fetch robots.txt for a domain."""
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
try:
response = requests.get(
robots_url,
timeout=10,
headers={'User-Agent': self.user_agent}
)
if response.status_code == 200:
self.logger.info(f"Fetched robots.txt from {robots_url}")
return response.text
else:
self.logger.warning(f"No robots.txt found at {robots_url}")
return None
except Exception as e:
self.logger.error(f"Error fetching robots.txt: {e}")
return None
def parse_robots_txt(self, url: str, robots_content: str = None) -> RobotRules:
"""Parse robots.txt content."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
# Check cache first
if domain in self.robots_cache:
cached = self.robots_cache[domain]
if (datetime.now() - cached.last_checked).seconds < 86400: # 24 hours
return cached
# Fetch if not provided
if robots_content is None:
robots_content = self.fetch_robots_txt(url)
rules = RobotRules(
domain=domain,
rules={},
crawl_delay=None,
request_rate=None,
sitemap=None,
disallowed_paths=[],
allowed_paths=[],
last_checked=datetime.now()
)
if robots_content:
# Parse using RobotFileParser
rp = RobotFileParser()
rp.parse(robots_content.splitlines())
# Extract rules manually for detailed analysis
current_user_agent = None
for line in robots_content.splitlines():
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Parse directives
if ':' in line:
directive, value = line.split(':', 1)
directive = directive.strip().lower()
value = value.strip()
if directive == 'user-agent':
current_user_agent = value
if current_user_agent not in rules.rules:
rules.rules[current_user_agent] = {
'disallow': [],
'allow': [],
'crawl-delay': None
}
elif current_user_agent:
if directive == 'disallow':
rules.rules[current_user_agent]['disallow'].append(value)
if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
rules.disallowed_paths.append(value)
elif directive == 'allow':
rules.rules[current_user_agent]['allow'].append(value)
if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
rules.allowed_paths.append(value)
elif directive == 'crawl-delay':
try:
delay = float(value)
rules.rules[current_user_agent]['crawl-delay'] = delay
if current_user_agent == '*' or current_user_agent.lower() in self.user_agent.lower():
rules.crawl_delay = delay
except:
pass
elif directive == 'request-rate':
# Format: 1/5 (1 request per 5 seconds)
if '/' in value:
try:
requests, seconds = value.split('/')
rules.request_rate = (int(requests), int(seconds))
except:
pass
# Global directives
if directive == 'sitemap':
rules.sitemap = value
# Cache the results
self.robots_cache[domain] = rules
return rules
def can_fetch(self, url: str) -> Tuple[bool, str]:
"""
Check if URL can be fetched according to robots.txt.
Returns (allowed, reason).
"""
rules = self.parse_robots_txt(url)
parsed_url = urlparse(url)
path = parsed_url.path
# Check disallowed paths
for disallowed in rules.disallowed_paths:
if disallowed and path.startswith(disallowed):
# Check if there's an allow rule that overrides
for allowed in rules.allowed_paths:
if allowed and path.startswith(allowed) and len(allowed) > len(disallowed):
return True, f"Allowed by rule: {allowed}"
return False, f"Disallowed by robots.txt: {disallowed}"
# If no specific rules, default to allowed
return True, "No restrictions in robots.txt"
def get_crawl_delay(self, url: str) -> Optional[float]:
"""Get crawl delay for a domain."""
rules = self.parse_robots_txt(url)
return rules.crawl_delay
def get_sitemap(self, url: str) -> Optional[str]:
"""Get sitemap URL from robots.txt."""
rules = self.parse_robots_txt(url)
return rules.sitemap
class EthicalScraper:
"""
Ethical web scraper with compliance and rate limiting.
"""
def __init__(self, policy: EthicalPolicy = None):
self.policy = policy or EthicalPolicy()
self.robots_checker = RobotsChecker()
self.session = self._create_session()
# Rate limiting
self.domain_last_request = {}
self.request_queue = defaultdict(deque)
self.rate_limiter = RateLimiter()
# Caching
self.cache = ResponseCache() if self.policy.cache_pages else None
# Metrics
self.metrics = ScrapingMetrics()
self.setup_logging()
def setup_logging(self):
"""Setup logging configuration."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def _create_session(self) -> requests.Session:
"""Create configured session."""
session = requests.Session()
if self.policy.identify_bot:
session.headers.update({
'User-Agent': 'Python-EthicalScraper/1.0 (+https://example.com/bot)'
})
elif self.policy.rotate_user_agents:
# Rotate between common user agents
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
session.headers['User-Agent'] = random.choice(user_agents)
return session
def scrape(self, url: str, **kwargs) -> Optional[requests.Response]:
"""
Ethically scrape a URL with all compliance checks.
"""
# Check robots.txt compliance
if self.policy.respect_robots_txt:
allowed, reason = self.robots_checker.can_fetch(url)
if not allowed:
self.logger.warning(f"Scraping not allowed for {url}: {reason}")
self.metrics.record_blocked(url, reason)
return None
# Check cache
if self.cache:
cached_response = self.cache.get(url)
if cached_response:
self.logger.info(f"Using cached response for {url}")
self.metrics.record_cache_hit(url)
return cached_response
# Apply rate limiting
domain = urlparse(url).netloc
self._apply_rate_limit(domain)
# Make request
try:
response = self.session.get(url, **kwargs)
# Check for rate limit headers
self._check_rate_limit_headers(response)
# Cache response if successful
if self.cache and response.status_code == 200:
self.cache.store(url, response)
# Record metrics
self.metrics.record_request(url, response.status_code)
# Check for personal data if configured
if self.policy.handle_personal_data:
self._check_personal_data(response)
return response
except Exception as e:
self.logger.error(f"Error scraping {url}: {e}")
self.metrics.record_error(url, str(e))
if self.policy.retry_after_error:
return self._retry_request(url, **kwargs)
return None
def _apply_rate_limit(self, domain: str):
"""Apply rate limiting for a domain."""
# Check robots.txt crawl delay
crawl_delay = self.robots_checker.get_crawl_delay(f"https://{domain}")
if crawl_delay:
delay = max(crawl_delay, self.policy.rate_limit)
else:
delay = self.policy.rate_limit
# Check last request time
if domain in self.domain_last_request:
elapsed = time.time() - self.domain_last_request[domain]
if elapsed < delay:
sleep_time = delay - elapsed
self.logger.info(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}")
time.sleep(sleep_time)
self.domain_last_request[domain] = time.time()
def _check_rate_limit_headers(self, response: requests.Response):
"""Check and respect rate limit headers."""
# Check for Retry-After header
if 'Retry-After' in response.headers:
retry_after = response.headers['Retry-After']
try:
# Could be seconds or HTTP date
sleep_time = int(retry_after)
self.logger.warning(f"Server requested retry after {sleep_time} seconds")
time.sleep(sleep_time)
except:
pass
# Check for X-RateLimit headers
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining <= 0:
reset_time = response.headers.get('X-RateLimit-Reset')
if reset_time:
sleep_until = int(reset_time) - time.time()
if sleep_until > 0:
self.logger.warning(f"Rate limit exceeded, sleeping {sleep_until}s")
time.sleep(sleep_until)
def _check_personal_data(self, response: requests.Response):
"""Check for and handle personal data."""
# Simple PII detection patterns
pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
}
text = response.text
detected_pii = []
for pii_type, pattern in pii_patterns.items():
if re.search(pattern, text):
detected_pii.append(pii_type)
if detected_pii:
self.logger.warning(f"Potential PII detected: {detected_pii}")
self.metrics.record_pii_detection(response.url, detected_pii)
def _retry_request(self, url: str, **kwargs) -> Optional[requests.Response]:
"""Retry failed request with exponential backoff."""
for attempt in range(self.policy.max_retries):
sleep_time = 2 ** attempt
self.logger.info(f"Retrying {url} after {sleep_time}s (attempt {attempt + 1})")
time.sleep(sleep_time)
try:
response = self.session.get(url, **kwargs)
if response.status_code == 200:
return response
except:
continue
return None
def scrape_sitemap(self, url: str) -> List[str]:
"""Scrape URLs from sitemap (ethical way to discover URLs)."""
sitemap_url = self.robots_checker.get_sitemap(url)
if not sitemap_url:
# Try common sitemap locations
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
sitemap_url = f"{base_url}/sitemap.xml"
try:
response = self.scrape(sitemap_url)
if response and response.status_code == 200:
# Parse sitemap
soup = BeautifulSoup(response.content, 'xml')
urls = []
for loc in soup.find_all('loc'):
urls.append(loc.text)
self.logger.info(f"Found {len(urls)} URLs in sitemap")
return urls
except:
pass
return []
class RateLimiter:
"""
Advanced rate limiting with multiple strategies.
"""
def __init__(self):
self.domain_buckets = defaultdict(lambda: TokenBucket())
self.global_bucket = TokenBucket(rate=10, capacity=100)
def check_rate_limit(self, domain: str) -> bool:
"""Check if request is allowed."""
# Check domain-specific limit
if not self.domain_buckets[domain].consume():
return False
# Check global limit
if not self.global_bucket.consume():
return False
return True
def set_domain_rate(self, domain: str, requests_per_second: float):
"""Set rate limit for specific domain."""
self.domain_buckets[domain] = TokenBucket(
rate=requests_per_second,
capacity=int(requests_per_second * 10)
)
class TokenBucket:
"""Token bucket for rate limiting."""
def __init__(self, rate: float = 1, capacity: int = 10):
self.rate = rate # Tokens per second
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = threading.Lock()
def consume(self, tokens: int = 1) -> bool:
"""Try to consume tokens."""
with self.lock:
# Refill bucket
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
# Check if enough tokens
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
class ResponseCache:
"""Cache for scraped responses."""
def __init__(self, cache_dir: str = "./cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.index = {}
self.load_index()
def load_index(self):
"""Load cache index."""
index_file = self.cache_dir / "index.json"
if index_file.exists():
with open(index_file, 'r') as f:
self.index = json.load(f)
def save_index(self):
"""Save cache index."""
with open(self.cache_dir / "index.json", 'w') as f:
json.dump(self.index, f)
def get_cache_key(self, url: str) -> str:
"""Generate cache key for URL."""
return hashlib.md5(url.encode()).hexdigest()
def get(self, url: str) -> Optional[requests.Response]:
"""Get cached response."""
cache_key = self.get_cache_key(url)
if cache_key in self.index:
cache_info = self.index[cache_key]
# Check expiration
if time.time() - cache_info['timestamp'] > cache_info['ttl']:
return None
# Load cached response
cache_file = self.cache_dir / f"{cache_key}.cache"
if cache_file.exists():
with open(cache_file, 'rb') as f:
response = pickle.load(f)
return response
return None
def store(self, url: str, response: requests.Response, ttl: int = 3600):
"""Store response in cache."""
cache_key = self.get_cache_key(url)
# Store response
cache_file = self.cache_dir / f"{cache_key}.cache"
with open(cache_file, 'wb') as f:
pickle.dump(response, f)
# Update index
self.index[cache_key] = {
'url': url,
'timestamp': time.time(),
'ttl': ttl
}
self.save_index()
class ScrapingMetrics:
"""Track scraping metrics for monitoring."""
def __init__(self):
self.total_requests = 0
self.successful_requests = 0
self.blocked_requests = 0
self.errors = 0
self.cache_hits = 0
self.pii_detections = 0
self.domain_stats = defaultdict(lambda: {'requests': 0, 'errors': 0})
# Setup database
self.setup_database()
def setup_database(self):
"""Setup metrics database."""
self.db = sqlite3.connect('scraping_metrics.db')
self.db.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
url TEXT,
domain TEXT,
status_code INTEGER,
response_time REAL,
blocked BOOLEAN,
cached BOOLEAN,
error TEXT
)
''')
self.db.execute('''
CREATE TABLE IF NOT EXISTS pii_detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
url TEXT,
pii_types TEXT
)
''')
def record_request(self, url: str, status_code: int, response_time: float = 0):
"""Record successful request."""
self.total_requests += 1
self.successful_requests += 1
domain = urlparse(url).netloc
self.domain_stats[domain]['requests'] += 1
self.db.execute('''
INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (datetime.now(), url, domain, status_code, response_time, False, False, None))
self.db.commit()
def record_blocked(self, url: str, reason: str):
"""Record blocked request."""
self.total_requests += 1
self.blocked_requests += 1
domain = urlparse(url).netloc
self.db.execute('''
INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (datetime.now(), url, domain, None, 0, True, False, reason))
self.db.commit()
def record_error(self, url: str, error: str):
"""Record error."""
self.total_requests += 1
self.errors += 1
domain = urlparse(url).netloc
self.domain_stats[domain]['errors'] += 1
self.db.execute('''
INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (datetime.now(), url, domain, None, 0, False, False, error))
self.db.commit()
def record_cache_hit(self, url: str):
"""Record cache hit."""
self.cache_hits += 1
domain = urlparse(url).netloc
self.db.execute('''
INSERT INTO metrics (timestamp, url, domain, status_code, response_time, blocked, cached, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (datetime.now(), url, domain, 200, 0, False, True, None))
self.db.commit()
def record_pii_detection(self, url: str, pii_types: List[str]):
"""Record PII detection."""
self.pii_detections += 1
self.db.execute('''
INSERT INTO pii_detections (timestamp, url, pii_types)
VALUES (?, ?, ?)
''', (datetime.now(), url, json.dumps(pii_types)))
self.db.commit()
def get_summary(self) -> Dict:
"""Get metrics summary."""
return {
'total_requests': self.total_requests,
'successful_requests': self.successful_requests,
'blocked_requests': self.blocked_requests,
'errors': self.errors,
'cache_hits': self.cache_hits,
'cache_hit_rate': self.cache_hits / self.total_requests if self.total_requests > 0 else 0,
'success_rate': self.successful_requests / self.total_requests if self.total_requests > 0 else 0,
'pii_detections': self.pii_detections,
'top_domains': sorted(
self.domain_stats.items(),
key=lambda x: x[1]['requests'],
reverse=True
)[:10]
}
class EthicalGuidelines:
"""
Ethical guidelines and best practices for web scraping.
"""
@staticmethod
def get_guidelines() -> Dict[str, List[str]]:
"""Get comprehensive ethical guidelines."""
return {
'Legal Considerations': [
'ā
Always check and comply with Terms of Service',
'ā
Respect copyright and intellectual property',
'ā
Comply with data protection laws (GDPR, CCPA)',
'ā
Obtain consent when scraping personal data',
'ā Never scrape passwords or private information',
'ā Avoid scraping behind login walls without permission',
'ā ļø Consider fair use doctrine for research/education'
],
'Technical Respect': [
'ā
Always check robots.txt',
'ā
Implement rate limiting (1-2 seconds between requests)',
'ā
Use descriptive User-Agent with contact info',
'ā
Handle errors gracefully',
'ā
Cache responses to minimize requests',
'ā Don\'t overwhelm servers with concurrent requests',
'ā Avoid scraping during peak hours',
'ā ļø Monitor server response times and adjust'
],
'Business Ethics': [
'ā
Give attribution when using scraped data',
'ā
Consider the website\'s business model',
'ā
Use APIs when available instead of scraping',
'ā Don\'t resell scraped data without permission',
'ā Avoid undermining the website\'s revenue',
'ā ļø Consider reaching out to website owners'
],
'Data Privacy': [
'ā
Anonymize personal information',
'ā
Implement data retention policies',
'ā
Secure stored data properly',
'ā
Allow users to opt-out',
'ā Never store sensitive personal data',
'ā Don\'t combine data to identify individuals',
'ā ļø Be transparent about data collection'
],
'Best Practices': [
'ā
Start small and scale gradually',
'ā
Monitor your impact on the website',
'ā
Implement exponential backoff for errors',
'ā
Respect meta tags (noindex, nofollow)',
'ā
Use sitemap.xml for URL discovery',
'ā
Keep logs for accountability',
'ā ļø Be prepared to stop if asked'
]
}
@staticmethod
def check_compliance(url: str, policy: EthicalPolicy) -> Dict[str, Any]:
"""Check compliance for a URL."""
compliance_report = {
'url': url,
'timestamp': datetime.now(),
'checks': {},
'recommendations': []
}
# Check robots.txt
checker = RobotsChecker()
allowed, reason = checker.can_fetch(url)
compliance_report['checks']['robots_txt'] = {
'allowed': allowed,
'reason': reason
}
if not allowed and policy.respect_robots_txt:
compliance_report['recommendations'].append(
"This URL is disallowed by robots.txt. Consider using the sitemap or API instead."
)
# Check crawl delay
crawl_delay = checker.get_crawl_delay(url)
if crawl_delay:
compliance_report['checks']['crawl_delay'] = crawl_delay
if policy.rate_limit < crawl_delay:
compliance_report['recommendations'].append(
f"Increase rate limit to at least {crawl_delay} seconds"
)
# Check for API availability
# (This would require actual checking of common API endpoints)
return compliance_report
# Example usage
if __name__ == "__main__":
print("š¤ Ethical Web Scraping Examples\n")
# Example 1: Check robots.txt
print("1ļøā£ Checking robots.txt:")
checker = RobotsChecker()
test_url = "https://example.com/products/item123"
# Check if URL can be scraped
allowed, reason = checker.can_fetch(test_url)
print(f" URL: {test_url}")
print(f" Allowed: {'ā
' if allowed else 'ā'} {reason}")
# Get crawl delay
crawl_delay = checker.get_crawl_delay(test_url)
if crawl_delay:
print(f" Crawl delay: {crawl_delay} seconds")
# Example 2: Ethical scraping with policy
print("\n2ļøā£ Ethical Scraping:")
# Create strict ethical policy
strict_policy = EthicalPolicy(
respect_robots_txt=True,
identify_bot=True,
rate_limit=2.0,
cache_pages=True,
handle_personal_data=True
)
scraper = EthicalScraper(strict_policy)
# Scrape ethically
test_urls = [
"https://httpbin.org/html",
"https://httpbin.org/delay/1"
]
for url in test_urls:
print(f"\n Scraping: {url}")
response = scraper.scrape(url, timeout=10)
if response:
print(f" Status: {response.status_code}")
print(f" Cached: {scraper.cache.get(url) is not None if scraper.cache else False}")
# Example 3: Rate limiting
print("\n3ļøā£ Rate Limiting:")
rate_limiter = RateLimiter()
# Set custom rate for domain
rate_limiter.set_domain_rate("api.example.com", 0.5) # 0.5 requests per second
# Test rate limiting
domain = "api.example.com"
for i in range(5):
allowed = rate_limiter.check_rate_limit(domain)
print(f" Request {i+1}: {'ā
Allowed' if allowed else 'ā³ Rate limited'}")
if allowed:
time.sleep(0.1) # Simulate request
# Example 4: Scraping metrics
print("\n4ļøā£ Scraping Metrics:")
metrics = scraper.metrics.get_summary()
print(f" Total requests: {metrics['total_requests']}")
print(f" Success rate: {metrics['success_rate']:.1%}")
print(f" Cache hit rate: {metrics['cache_hit_rate']:.1%}")
print(f" Blocked requests: {metrics['blocked_requests']}")
# Example 5: Ethical guidelines
print("\n5ļøā£ Ethical Guidelines:")
guidelines = EthicalGuidelines.get_guidelines()
for category, rules in guidelines.items():
print(f"\n {category}:")
for rule in rules[:3]: # Show first 3 rules
print(f" {rule}")
# Example 6: Compliance check
print("\n6ļøā£ Compliance Check:")
compliance = EthicalGuidelines.check_compliance(
"https://example.com/data",
strict_policy
)
print(f" URL: {compliance['url']}")
print(f" robots.txt: {compliance['checks'].get('robots_txt', {}).get('allowed', 'Unknown')}")
if compliance['recommendations']:
print(" Recommendations:")
for rec in compliance['recommendations']:
print(f" - {rec}")
# Example 7: Sitemap parsing
print("\n7ļøā£ Sitemap Discovery:")
# This would actually fetch and parse a sitemap
print(" Checking for sitemap...")
sitemap_urls = scraper.scrape_sitemap("https://example.com")
print(f" Found {len(sitemap_urls)} URLs in sitemap")
# Example 8: Token bucket demonstration
print("\n8ļøā£ Token Bucket Rate Limiting:")
bucket = TokenBucket(rate=2, capacity=5) # 2 tokens/second, max 5
print(" Consuming tokens rapidly:")
for i in range(8):
success = bucket.consume()
print(f" Token {i+1}: {'ā
' if success else 'ā'}")
time.sleep(0.3)
# Example 9: PII detection
print("\n9ļøā£ PII Detection:")
# Simulate response with PII
class MockResponse:
def __init__(self):
self.text = "Contact: john@example.com, Phone: 555-123-4567"
self.url = "https://example.com/contact"
mock_response = MockResponse()
scraper._check_personal_data(mock_response)
print(f" PII detections: {scraper.metrics.pii_detections}")
# Example 10: Best practices summary
print("\nš Best Practices Summary:")
best_practices = [
"š Always check robots.txt before scraping",
"ā±ļø Implement rate limiting (1-2 seconds minimum)",
"š·ļø Use descriptive User-Agent with contact info",
"š¾ Cache responses to minimize server load",
"š Protect personal data and respect privacy",
"š Comply with Terms of Service and laws",
"š¤ Consider reaching out to website owners",
"š Monitor your impact and adjust accordingly"
]
for practice in best_practices:
print(f" {practice}")
print("\nā
Ethical web scraping demonstration complete!")
print("\nāļø Remember: With great scraping power comes great responsibility!")
Key Takeaways and Best Practices šÆ
- Always Check robots.txt: It's the first and most important step.
- Respect Rate Limits: Use at least 1-2 seconds between requests.
- Identify Yourself: Use a descriptive User-Agent with contact information.
- Cache Responses: Reduce server load by caching when possible.
- Handle Errors Gracefully: Implement exponential backoff and retry logic.
- Protect Privacy: Be careful with personal data and comply with regulations.
- Use APIs When Available: Always prefer official APIs over scraping.
- Monitor Your Impact: Track metrics and adjust if causing issues.
Ethical Scraping Best Practices š
Mastering ethical web scraping transforms you from a data taker to a responsible data gatherer. You now understand the legal, technical, and ethical considerations that separate professional scrapers from script kiddies. Whether you're conducting research, building products, or gathering competitive intelligence, these ethical principles ensure your scraping is sustainable, legal, and respectful. Remember: the goal isn't just to get data, but to get it in a way that maintains the health and openness of the web! š
Pro Tip: Ethical scraping isn't just about following rules - it's about being a good citizen of the web. Think of websites as digital properties: you wouldn't break into someone's house just because the door is unlocked. Always start by checking robots.txt, implement reasonable rate limiting (even if not specified), and identify yourself honestly. Cache aggressively to minimize requests, handle errors gracefully, and be prepared to stop if asked. Remember that behind every website is a business or individual who pays for bandwidth and servers. If you're building a commercial scraper, consider reaching out to website owners - many are happy to work with you if you're transparent. Most importantly: just because you CAN scrape something doesn't mean you SHOULD. Always ask yourself: "Would I be okay if someone scraped my website this way?" The golden rule applies to web scraping too!