Skip to main content

šŸ”§ Email Filtering and Rules: Automate Inbox Management

Email filtering and rules are the traffic controllers of your digital communication - they automatically sort, categorize, respond to, and process emails based on intelligent criteria. Like having a personal assistant that never sleeps, mastering email rules allows you to build systems that manage thousands of emails, trigger workflows, maintain inbox zero, and ensure important messages never get lost. Let's explore the complete toolkit for automated email management! šŸŽÆ

The Email Filtering Architecture

Think of email filtering as building a smart postal sorting facility - each incoming message is analyzed, categorized, and routed to the right destination based on rules you define. From simple sender-based filters to complex content analysis with machine learning, understanding filtering patterns, rule engines, and action triggers is essential for building intelligent email automation systems!

graph TB A[Email Filtering System] --> B[Filter Criteria] A --> C[Rule Engine] A --> D[Actions] A --> E[Management] B --> F[Sender/Recipient] B --> G[Subject/Keywords] B --> H[Content Analysis] B --> I[Attachments] C --> J[Priority Rules] C --> K[Conditional Logic] C --> L[Rule Chains] C --> M[ML Classification] D --> N[Move/Label] D --> O[Auto-Reply] D --> P[Forward] D --> Q[Trigger Workflow] E --> R[Rule Testing] E --> S[Performance] E --> T[Monitoring] E --> U[Analytics] V[Advanced] --> W[Spam Detection] V --> X[Sentiment Analysis] V --> Y[Smart Categories] V --> Z[Auto-Learning] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Intelligent Email Management Platform šŸ¤–

You're building an intelligent email management platform that processes customer support tickets, sorts newsletters and promotions, prioritizes urgent messages, extracts and processes orders, triggers automated workflows, maintains team inboxes, filters spam and phishing, and learns from user behavior. Your system must handle complex rule chains, scale to millions of emails, provide real-time processing, and adapt to changing patterns. Let's build a comprehensive email filtering framework!

# First, install required packages:
# pip install scikit-learn nltk spacy textblob python-Levenshtein fuzzywuzzy

import re
import json
import logging
from typing import List, Dict, Optional, Any, Callable, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import hashlib
from collections import defaultdict, Counter
import pickle

# Machine learning libraries
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
import nltk
from textblob import TextBlob

# Email modules (assuming previous email classes are available)
# from email_reader import EmailData, IMAPClient
# from email_sender import EmailMessage, SMTPClient

# ==================== Filter Configuration ====================

class FilterPriority(Enum):
    """Filter execution priority."""
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4
    BACKGROUND = 5

class ActionType(Enum):
    """Email action types."""
    MOVE_TO_FOLDER = "move_to_folder"
    APPLY_LABEL = "apply_label"
    MARK_AS_READ = "mark_as_read"
    MARK_AS_IMPORTANT = "mark_as_important"
    DELETE = "delete"
    FORWARD = "forward"
    AUTO_REPLY = "auto_reply"
    TRIGGER_WEBHOOK = "trigger_webhook"
    EXTRACT_DATA = "extract_data"
    ADD_TO_CRM = "add_to_crm"
    CREATE_TICKET = "create_ticket"
    ARCHIVE = "archive"
    BLOCK_SENDER = "block_sender"

@dataclass
class FilterCondition:
    """Single filter condition."""
    field: str  # from, to, subject, body, etc.
    operator: str  # contains, equals, starts_with, regex, etc.
    value: Any
    case_sensitive: bool = False
    negate: bool = False  # NOT condition

@dataclass
class EmailFilter:
    """Email filter rule."""
    name: str
    description: str
    conditions: List[FilterCondition]
    actions: List[Dict[str, Any]]
    
    # Rule settings
    enabled: bool = True
    priority: FilterPriority = FilterPriority.MEDIUM
    stop_processing: bool = False  # Stop after this rule matches
    
    # Advanced options
    condition_logic: str = "AND"  # AND or OR
    max_executions_per_day: Optional[int] = None
    valid_from: Optional[datetime] = None
    valid_until: Optional[datetime] = None
    
    # Statistics
    execution_count: int = 0
    last_executed: Optional[datetime] = None
    
    def __hash__(self):
        return hash(self.name)

# ==================== Rule Engine ====================

class EmailRuleEngine:
    """
    Core rule engine for email filtering.
    """
    
    def __init__(self):
        self.rules: List[EmailFilter] = []
        self.rule_groups: Dict[str, List[EmailFilter]] = defaultdict(list)
        self.logger = logging.getLogger(__name__)
        
        # Statistics
        self.stats = {
            "emails_processed": 0,
            "rules_matched": 0,
            "actions_executed": 0,
            "errors": 0
        }
        
        # Performance optimization
        self.compiled_regexes = {}
        
    def add_rule(self, rule: EmailFilter, group: str = "default"):
        """Add filter rule."""
        self.rules.append(rule)
        self.rule_groups[group].append(rule)
        
        # Pre-compile regex patterns
        for condition in rule.conditions:
            if condition.operator == "regex":
                pattern_key = f"{rule.name}_{condition.field}_{condition.value}"
                if pattern_key not in self.compiled_regexes:
                    flags = 0 if condition.case_sensitive else re.IGNORECASE
                    self.compiled_regexes[pattern_key] = re.compile(condition.value, flags)
        
        self.logger.info(f"Added rule: {rule.name}")
    
    def remove_rule(self, rule_name: str):
        """Remove filter rule."""
        self.rules = [r for r in self.rules if r.name != rule_name]
        
        for group in self.rule_groups.values():
            group[:] = [r for r in group if r.name != rule_name]
        
        self.logger.info(f"Removed rule: {rule_name}")
    
    def process_email(self, email: 'EmailData') -> List[Dict[str, Any]]:
        """
        Process email through all rules.
        
        Returns:
            List of actions to execute
        """
        self.stats["emails_processed"] += 1
        actions_to_execute = []
        
        # Sort rules by priority
        sorted_rules = sorted(
            [r for r in self.rules if r.enabled],
            key=lambda r: r.priority.value
        )
        
        for rule in sorted_rules:
            # Check if rule is valid
            if not self._is_rule_valid(rule):
                continue
            
            # Check if rule matches
            if self._evaluate_rule(rule, email):
                self.logger.info(f"Rule '{rule.name}' matched for email: {email.subject}")
                
                # Update rule statistics
                rule.execution_count += 1
                rule.last_executed = datetime.now()
                self.stats["rules_matched"] += 1
                
                # Add actions
                for action in rule.actions:
                    actions_to_execute.append({
                        "rule": rule.name,
                        "action": action,
                        "email": email
                    })
                    self.stats["actions_executed"] += 1
                
                # Stop processing if configured
                if rule.stop_processing:
                    break
        
        return actions_to_execute
    
    def _is_rule_valid(self, rule: EmailFilter) -> bool:
        """Check if rule is currently valid."""
        now = datetime.now()
        
        # Check date validity
        if rule.valid_from and now < rule.valid_from:
            return False
        
        if rule.valid_until and now > rule.valid_until:
            return False
        
        # Check execution limit
        if rule.max_executions_per_day:
            if rule.last_executed and rule.last_executed.date() == now.date():
                if rule.execution_count >= rule.max_executions_per_day:
                    return False
        
        return True
    
    def _evaluate_rule(self, rule: EmailFilter, email: 'EmailData') -> bool:
        """Evaluate if rule matches email."""
        condition_results = []
        
        for condition in rule.conditions:
            result = self._evaluate_condition(condition, email, rule.name)
            
            if condition.negate:
                result = not result
            
            condition_results.append(result)
        
        # Apply condition logic
        if rule.condition_logic == "AND":
            return all(condition_results) if condition_results else False
        elif rule.condition_logic == "OR":
            return any(condition_results) if condition_results else False
        else:
            return False
    
    def _evaluate_condition(self, condition: FilterCondition, 
                          email: 'EmailData', rule_name: str) -> bool:
        """Evaluate single condition."""
        # Get field value from email
        field_value = self._get_email_field(email, condition.field)
        
        if field_value is None:
            return False
        
        # Convert to string for comparison
        field_str = str(field_value)
        value_str = str(condition.value)
        
        # Apply case sensitivity
        if not condition.case_sensitive:
            field_str = field_str.lower()
            value_str = value_str.lower()
        
        # Evaluate based on operator
        if condition.operator == "equals":
            return field_str == value_str
        
        elif condition.operator == "contains":
            return value_str in field_str
        
        elif condition.operator == "starts_with":
            return field_str.startswith(value_str)
        
        elif condition.operator == "ends_with":
            return field_str.endswith(value_str)
        
        elif condition.operator == "regex":
            pattern_key = f"{rule_name}_{condition.field}_{condition.value}"
            if pattern_key in self.compiled_regexes:
                return bool(self.compiled_regexes[pattern_key].search(field_str))
            else:
                flags = 0 if condition.case_sensitive else re.IGNORECASE
                return bool(re.search(condition.value, field_str, flags))
        
        elif condition.operator == "greater_than":
            try:
                return float(field_str) > float(value_str)
            except:
                return False
        
        elif condition.operator == "less_than":
            try:
                return float(field_str) < float(value_str)
            except:
                return False
        
        elif condition.operator == "in_list":
            values = value_str.split(',') if isinstance(value_str, str) else condition.value
            return field_str in values
        
        elif condition.operator == "is_empty":
            return len(field_str.strip()) == 0
        
        elif condition.operator == "is_not_empty":
            return len(field_str.strip()) > 0
        
        return False
    
    def _get_email_field(self, email: 'EmailData', field: str) -> Any:
        """Get field value from email."""
        field_lower = field.lower()
        
        # Standard fields
        if field_lower == "from":
            return email.from_addr
        elif field_lower == "to":
            return ', '.join(email.to_addrs) if hasattr(email, 'to_addrs') else ""
        elif field_lower == "subject":
            return email.subject
        elif field_lower == "body":
            return email.get_plain_text() if hasattr(email, 'get_plain_text') else ""
        elif field_lower == "cc":
            return ', '.join(email.cc_addrs) if hasattr(email, 'cc_addrs') else ""
        elif field_lower == "date":
            return email.date
        elif field_lower == "size":
            return email.size if hasattr(email, 'size') else 0
        elif field_lower == "has_attachments":
            return len(email.attachments) > 0 if hasattr(email, 'attachments') else False
        elif field_lower == "attachment_count":
            return len(email.attachments) if hasattr(email, 'attachments') else 0
        elif field_lower == "is_read":
            return email.is_read if hasattr(email, 'is_read') else False
        elif field_lower == "is_flagged":
            return email.is_flagged if hasattr(email, 'is_flagged') else False
        
        # Custom headers
        if hasattr(email, 'headers') and field in email.headers:
            return email.headers[field]
        
        return None

# ==================== Action Executor ====================

class ActionExecutor:
    """
    Execute filter actions on emails.
    """
    
    def __init__(self, imap_client=None, smtp_client=None):
        self.imap_client = imap_client
        self.smtp_client = smtp_client
        self.logger = logging.getLogger(__name__)
        
        # Action handlers
        self.action_handlers = {
            ActionType.MOVE_TO_FOLDER: self._move_to_folder,
            ActionType.APPLY_LABEL: self._apply_label,
            ActionType.MARK_AS_READ: self._mark_as_read,
            ActionType.DELETE: self._delete_email,
            ActionType.FORWARD: self._forward_email,
            ActionType.AUTO_REPLY: self._auto_reply,
            ActionType.ARCHIVE: self._archive_email,
            ActionType.TRIGGER_WEBHOOK: self._trigger_webhook
        }
    
    def execute_action(self, action: Dict[str, Any]):
        """Execute single action."""
        action_type = ActionType(action["action"]["type"])
        
        if action_type in self.action_handlers:
            try:
                self.action_handlers[action_type](
                    action["email"],
                    action["action"].get("params", {})
                )
                
                self.logger.info(f"Executed action: {action_type.value}")
                return True
                
            except Exception as e:
                self.logger.error(f"Action execution failed: {e}")
                return False
        else:
            self.logger.warning(f"Unknown action type: {action_type}")
            return False
    
    def _move_to_folder(self, email: 'EmailData', params: Dict):
        """Move email to folder."""
        if self.imap_client:
            folder = params.get("folder", "Processed")
            self.imap_client.move_message(email.uid, folder)
    
    def _apply_label(self, email: 'EmailData', params: Dict):
        """Apply label to email."""
        label = params.get("label", "")
        # Implementation depends on email provider
        self.logger.info(f"Applied label '{label}' to email")
    
    def _mark_as_read(self, email: 'EmailData', params: Dict):
        """Mark email as read."""
        if self.imap_client:
            self.imap_client.mark_as_read(email.uid)
    
    def _delete_email(self, email: 'EmailData', params: Dict):
        """Delete email."""
        if self.imap_client:
            self.imap_client.delete_message(email.uid)
    
    def _forward_email(self, email: 'EmailData', params: Dict):
        """Forward email to recipients."""
        if self.smtp_client:
            recipients = params.get("recipients", [])
            
            # Create forwarded message
            # Implementation would create new EmailMessage with forwarded content
            self.logger.info(f"Forwarded email to {recipients}")
    
    def _auto_reply(self, email: 'EmailData', params: Dict):
        """Send automatic reply."""
        if self.smtp_client:
            template = params.get("template", "")
            
            # Create reply message
            # Implementation would use template to create reply
            self.logger.info(f"Sent auto-reply to {email.from_addr}")
    
    def _archive_email(self, email: 'EmailData', params: Dict):
        """Archive email."""
        if self.imap_client:
            archive_folder = params.get("folder", "Archive")
            self.imap_client.move_message(email.uid, archive_folder)
    
    def _trigger_webhook(self, email: 'EmailData', params: Dict):
        """Trigger webhook with email data."""
        import requests
        
        webhook_url = params.get("url", "")
        
        if webhook_url:
            payload = {
                "email_id": email.uid,
                "from": email.from_addr,
                "subject": email.subject,
                "date": email.date.isoformat(),
                "body": email.get_plain_text()[:500] if hasattr(email, 'get_plain_text') else ""
            }
            
            try:
                response = requests.post(webhook_url, json=payload, timeout=10)
                self.logger.info(f"Triggered webhook: {webhook_url}")
            except Exception as e:
                self.logger.error(f"Webhook failed: {e}")

# ==================== Spam Filter ====================

class SpamFilter:
    """
    Spam detection using machine learning.
    """
    
    def __init__(self, model_path: Optional[str] = None):
        self.logger = logging.getLogger(__name__)
        
        # Initialize ML model
        if model_path and Path(model_path).exists():
            self.load_model(model_path)
        else:
            self._initialize_model()
        
        # Spam indicators
        self.spam_keywords = [
            'viagra', 'cialis', 'casino', 'winner', 'congratulations',
            'click here', 'limited time', 'act now', 'free money',
            'make money fast', 'work from home', 'lose weight',
            'guaranteed', '100% free', 'no obligation'
        ]
        
        self.suspicious_patterns = [
            r'\b[A-Z]{5,}\b',  # All caps words
            r'[!]{3,}',  # Multiple exclamation marks
            r'\$\d+',  # Money amounts
            r'https?://bit\.ly/',  # Shortened URLs
        ]
    
    def _initialize_model(self):
        """Initialize spam detection model."""
        # Create pipeline with TF-IDF and Naive Bayes
        self.model = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
            ('classifier', MultinomialNB())
        ])
        
        # Train with sample data (in production, use real training data)
        self._train_with_sample_data()
    
    def _train_with_sample_data(self):
        """Train model with sample data."""
        # Sample training data
        ham_samples = [
            "Meeting scheduled for tomorrow at 2 PM",
            "Please review the attached document",
            "Thanks for your help with the project",
            "Can we discuss the quarterly report?",
            "Here's the invoice for last month"
        ]
        
        spam_samples = [
            "Congratulations! You've won $1000000!!!",
            "Click here for free viagra now limited time",
            "Make money fast working from home guaranteed",
            "You are the winner! Claim your prize now!",
            "Lose weight fast with this one weird trick"
        ]
        
        X = ham_samples + spam_samples
        y = [0] * len(ham_samples) + [1] * len(spam_samples)
        
        self.model.fit(X, y)
    
    def is_spam(self, email: 'EmailData') -> Tuple[bool, float]:
        """
        Check if email is spam.
        
        Returns:
            Tuple of (is_spam, confidence_score)
        """
        # Get email content
        content = f"{email.subject} {email.get_plain_text()}" \
                  if hasattr(email, 'get_plain_text') else email.subject
        
        # Rule-based checks
        spam_score = self._calculate_spam_score(content, email)
        
        # ML prediction
        try:
            prediction = self.model.predict([content])[0]
            probability = self.model.predict_proba([content])[0][1]
            
            # Combine rule-based and ML scores
            combined_score = (spam_score + probability) / 2
            
            is_spam = combined_score > 0.5
            
            return is_spam, combined_score
            
        except Exception as e:
            self.logger.error(f"Spam detection failed: {e}")
            return spam_score > 0.5, spam_score
    
    def _calculate_spam_score(self, content: str, email: 'EmailData') -> float:
        """Calculate spam score based on rules."""
        score = 0.0
        content_lower = content.lower()
        
        # Check spam keywords
        keyword_count = sum(1 for keyword in self.spam_keywords 
                          if keyword in content_lower)
        score += min(keyword_count * 0.1, 0.5)
        
        # Check suspicious patterns
        for pattern in self.suspicious_patterns:
            if re.search(pattern, content):
                score += 0.1
        
        # Check sender
        if hasattr(email, 'from_addr'):
            # No sender name
            if not hasattr(email, 'from_name') or not email.from_name:
                score += 0.1
            
            # Suspicious domain
            if any(domain in email.from_addr for domain in ['.tk', '.ml', '.ga']):
                score += 0.2
        
        # Check for all caps subject
        if email.subject.isupper() and len(email.subject) > 10:
            score += 0.2
        
        # Too many links
        link_count = len(re.findall(r'https?://', content))
        if link_count > 5:
            score += 0.1
        
        return min(score, 1.0)
    
    def train(self, emails: List['EmailData'], labels: List[int]):
        """Train spam filter with labeled data."""
        X = []
        for email in emails:
            content = f"{email.subject} {email.get_plain_text()}" \
                     if hasattr(email, 'get_plain_text') else email.subject
            X.append(content)
        
        self.model.fit(X, labels)
        self.logger.info(f"Trained spam filter with {len(emails)} samples")
    
    def save_model(self, path: str):
        """Save trained model."""
        with open(path, 'wb') as f:
            pickle.dump(self.model, f)
        self.logger.info(f"Saved spam model to {path}")
    
    def load_model(self, path: str):
        """Load trained model."""
        with open(path, 'rb') as f:
            self.model = pickle.load(f)
        self.logger.info(f"Loaded spam model from {path}")

# ==================== Smart Categorizer ====================

class EmailCategorizer:
    """
    Categorize emails using NLP and patterns.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        # Predefined categories
        self.categories = {
            "invoices": {
                "keywords": ["invoice", "bill", "payment", "due", "amount"],
                "patterns": [r"Invoice\s*#?\s*\d+", r"\$\d+\.\d{2}"]
            },
            "receipts": {
                "keywords": ["receipt", "purchase", "order", "confirmation"],
                "patterns": [r"Order\s*#?\s*\d+", r"Transaction\s*ID"]
            },
            "newsletters": {
                "keywords": ["newsletter", "unsubscribe", "update", "news"],
                "patterns": [r"unsubscribe", r"email preferences"]
            },
            "support": {
                "keywords": ["help", "support", "issue", "problem", "ticket"],
                "patterns": [r"Ticket\s*#?\s*\d+", r"Case\s*#?\s*\d+"]
            },
            "social": {
                "keywords": ["friend", "invite", "share", "like", "comment"],
                "patterns": [r"@\w+", r"#\w+"]
            },
            "promotions": {
                "keywords": ["sale", "discount", "offer", "deal", "save"],
                "patterns": [r"\d+%\s*off", r"limited time"]
            }
        }
    
    def categorize(self, email: 'EmailData') -> List[str]:
        """
        Categorize email into one or more categories.
        
        Returns:
            List of category names
        """
        content = f"{email.subject} {email.get_plain_text()}" \
                 if hasattr(email, 'get_plain_text') else email.subject
        
        content_lower = content.lower()
        matched_categories = []
        scores = {}
        
        for category, criteria in self.categories.items():
            score = 0
            
            # Check keywords
            for keyword in criteria["keywords"]:
                if keyword in content_lower:
                    score += 1
            
            # Check patterns
            for pattern in criteria["patterns"]:
                if re.search(pattern, content, re.IGNORECASE):
                    score += 2
            
            if score > 0:
                scores[category] = score
        
        # Return categories sorted by score
        sorted_categories = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        
        # Return categories with significant scores
        threshold = max(scores.values()) * 0.5 if scores else 0
        matched_categories = [cat for cat, score in sorted_categories if score >= threshold]
        
        # Use sentiment analysis for additional categorization
        sentiment = self._analyze_sentiment(content)
        
        if sentiment == "urgent":
            matched_categories.insert(0, "urgent")
        
        return matched_categories if matched_categories else ["uncategorized"]
    
    def _analyze_sentiment(self, content: str) -> str:
        """Analyze email sentiment."""
        try:
            blob = TextBlob(content)
            polarity = blob.sentiment.polarity
            
            # Check for urgency indicators
            urgent_words = ["urgent", "asap", "immediately", "critical", "emergency"]
            if any(word in content.lower() for word in urgent_words):
                return "urgent"
            
            # Sentiment based on polarity
            if polarity > 0.3:
                return "positive"
            elif polarity < -0.3:
                return "negative"
            else:
                return "neutral"
                
        except Exception as e:
            self.logger.debug(f"Sentiment analysis failed: {e}")
            return "neutral"

# ==================== Auto Responder ====================

class AutoResponder:
    """
    Automatic email response system.
    """
    
    def __init__(self, smtp_client=None):
        self.smtp_client = smtp_client
        self.logger = logging.getLogger(__name__)
        
        # Response templates
        self.templates = {}
        
        # Response tracking
        self.response_log = defaultdict(list)
        
    def add_template(self, name: str, template: str, 
                     conditions: Dict[str, Any] = None):
        """Add auto-response template."""
        self.templates[name] = {
            "template": template,
            "conditions": conditions or {},
            "usage_count": 0
        }
        
        self.logger.info(f"Added auto-response template: {name}")
    
    def should_respond(self, email: 'EmailData') -> bool:
        """Check if email should receive auto-response."""
        # Don't respond to automated emails
        if self._is_automated_email(email):
            return False
        
        # Check if already responded recently
        if self._already_responded(email.from_addr):
            return False
        
        # Don't respond to mailing lists
        if self._is_mailing_list(email):
            return False
        
        return True
    
    def generate_response(self, email: 'EmailData', 
                         template_name: str) -> Optional[str]:
        """Generate auto-response for email."""
        if template_name not in self.templates:
            self.logger.warning(f"Template not found: {template_name}")
            return None
        
        template_data = self.templates[template_name]
        template = template_data["template"]
        
        # Replace variables
        response = template.format(
            sender_name=email.from_name or "there",
            sender_email=email.from_addr,
            subject=email.subject,
            date=datetime.now().strftime("%B %d, %Y"),
            time=datetime.now().strftime("%I:%M %p")
        )
        
        # Update usage
        template_data["usage_count"] += 1
        
        # Log response
        self.response_log[email.from_addr].append({
            "date": datetime.now(),
            "template": template_name,
            "subject": email.subject
        })
        
        return response
    
    def _is_automated_email(self, email: 'EmailData') -> bool:
        """Check if email is automated."""
        automated_indicators = [
            "noreply", "no-reply", "donotreply", "mailer-daemon",
            "postmaster", "automated", "notification"
        ]
        
        from_addr_lower = email.from_addr.lower()
        
        return any(indicator in from_addr_lower for indicator in automated_indicators)
    
    def _is_mailing_list(self, email: 'EmailData') -> bool:
        """Check if email is from mailing list."""
        if hasattr(email, 'headers'):
            # Check for list headers
            list_headers = ['List-Unsubscribe', 'List-Id', 'Mailing-List']
            
            for header in list_headers:
                if header in email.headers:
                    return True
        
        return False
    
    def _already_responded(self, email_address: str, 
                          hours: int = 24) -> bool:
        """Check if already responded to sender recently."""
        if email_address not in self.response_log:
            return False
        
        responses = self.response_log[email_address]
        
        if responses:
            last_response = responses[-1]["date"]
            time_diff = datetime.now() - last_response
            
            return time_diff.total_seconds() < (hours * 3600)
        
        return False

# ==================== Rule Builder ====================

class RuleBuilder:
    """
    Fluent interface for building email rules.
    """
    
    def __init__(self, name: str):
        self.filter = EmailFilter(
            name=name,
            description="",
            conditions=[],
            actions=[]
        )
    
    def description(self, desc: str) -> 'RuleBuilder':
        """Set rule description."""
        self.filter.description = desc
        return self
    
    def when_from(self, email: str) -> 'RuleBuilder':
        """Add from condition."""
        self.filter.conditions.append(FilterCondition(
            field="from",
            operator="contains",
            value=email
        ))
        return self
    
    def when_subject_contains(self, text: str) -> 'RuleBuilder':
        """Add subject contains condition."""
        self.filter.conditions.append(FilterCondition(
            field="subject",
            operator="contains",
            value=text
        ))
        return self
    
    def when_body_contains(self, text: str) -> 'RuleBuilder':
        """Add body contains condition."""
        self.filter.conditions.append(FilterCondition(
            field="body",
            operator="contains",
            value=text
        ))
        return self
    
    def when_has_attachments(self) -> 'RuleBuilder':
        """Add has attachments condition."""
        self.filter.conditions.append(FilterCondition(
            field="has_attachments",
            operator="equals",
            value=True
        ))
        return self
    
    def move_to(self, folder: str) -> 'RuleBuilder':
        """Add move to folder action."""
        self.filter.actions.append({
            "type": ActionType.MOVE_TO_FOLDER.value,
            "params": {"folder": folder}
        })
        return self
    
    def mark_as_read(self) -> 'RuleBuilder':
        """Add mark as read action."""
        self.filter.actions.append({
            "type": ActionType.MARK_AS_READ.value,
            "params": {}
        })
        return self
    
    def forward_to(self, *recipients: str) -> 'RuleBuilder':
        """Add forward action."""
        self.filter.actions.append({
            "type": ActionType.FORWARD.value,
            "params": {"recipients": list(recipients)}
        })
        return self
    
    def auto_reply(self, template: str) -> 'RuleBuilder':
        """Add auto-reply action."""
        self.filter.actions.append({
            "type": ActionType.AUTO_REPLY.value,
            "params": {"template": template}
        })
        return self
    
    def with_priority(self, priority: FilterPriority) -> 'RuleBuilder':
        """Set rule priority."""
        self.filter.priority = priority
        return self
    
    def stop_processing_after(self) -> 'RuleBuilder':
        """Stop processing after this rule."""
        self.filter.stop_processing = True
        return self
    
    def build(self) -> EmailFilter:
        """Build and return the filter."""
        return self.filter

# Example usage
if __name__ == "__main__":
    print("šŸ”§ Email Filtering and Rules Examples\n")
    
    # Example 1: Simple filter rule
    print("1ļøāƒ£ Creating Simple Filter:")
    
    rule = (RuleBuilder("Invoice Filter")
            .description("Move invoices to folder")
            .when_subject_contains("invoice")
            .move_to("Invoices")
            .mark_as_read()
            .build())
    
    print(f"   Rule: {rule.name}")
    print(f"   Condition: Subject contains 'invoice'")
    print(f"   Actions: Move to Invoices, Mark as read")
    
    # Example 2: Complex rule
    print("\n2ļøāƒ£ Complex Filter Rule:")
    
    complex_rule = EmailFilter(
        name="VIP Customer Support",
        description="Priority handling for VIP customers",
        conditions=[
            FilterCondition("from", "contains", "@vip.com"),
            FilterCondition("subject", "regex", r"(urgent|critical|emergency)", False)
        ],
        actions=[
            {"type": "move_to_folder", "params": {"folder": "VIP Support"}},
            {"type": "mark_as_important", "params": {}},
            {"type": "forward", "params": {"recipients": ["support-lead@company.com"]}}
        ],
        priority=FilterPriority.HIGH,
        stop_processing=True
    )
    
    print(f"   Rule: {complex_rule.name}")
    print(f"   Conditions: {len(complex_rule.conditions)}")
    print(f"   Actions: {len(complex_rule.actions)}")
    print(f"   Priority: {complex_rule.priority.name}")
    
    # Example 3: Spam detection
    print("\n3ļøāƒ£ Spam Detection:")
    
    spam_filter = SpamFilter()
    
    # Create mock email
    class MockEmail:
        def __init__(self, subject, body):
            self.subject = subject
            self.from_addr = "spammer@suspicious.tk"
            self.body = body
        def get_plain_text(self):
            return self.body
    
    spam_email = MockEmail(
        "YOU'VE WON $1000000!!!",
        "Click here now for free money guaranteed!"
    )
    
    is_spam, score = spam_filter.is_spam(spam_email)
    
    print(f"   Email: {spam_email.subject}")
    print(f"   Spam: {is_spam}")
    print(f"   Confidence: {score:.2%}")
    
    # Example 4: Email categorization
    print("\n4ļøāƒ£ Email Categories:")
    
    categorizer = EmailCategorizer()
    
    test_emails = [
        ("Invoice #12345 - Due March 15", ["invoices"]),
        ("Your order has been shipped!", ["receipts"]),
        ("Weekly Newsletter - Tech Updates", ["newsletters"]),
        ("50% OFF - Limited Time Sale!", ["promotions"]),
        ("Support Ticket #789 - Resolved", ["support"])
    ]
    
    for subject, expected in test_emails:
        email = MockEmail(subject, "")
        categories = categorizer.categorize(email)
        print(f"   {subject[:30]}... → {', '.join(categories)}")
    
    # Example 5: Auto-responder
    print("\n5ļøāƒ£ Auto-Response Templates:")
    
    responder = AutoResponder()
    
    responder.add_template(
        "out_of_office",
        "Hi {sender_name},\n\nI'm currently out of office and will respond when I return.\n\nBest regards"
    )
    
    responder.add_template(
        "received",
        "Thank you for your email regarding '{subject}'. We've received it and will respond within 24 hours."
    )
    
    print("   Templates added:")
    print("     • Out of Office")
    print("     • Message Received")
    
    # Example 6: Filter conditions
    print("\n6ļøāƒ£ Available Filter Conditions:")
    
    conditions = [
        ("Sender", "from contains 'example.com'"),
        ("Subject", "subject starts_with 'Re:'"),
        ("Body", "body contains 'invoice'"),
        ("Attachments", "has_attachments equals true"),
        ("Size", "size greater_than 1048576"),
        ("Date", "date less_than '2024-01-01'"),
        ("Read Status", "is_read equals false"),
        ("Regex", "subject regex '^Invoice #\\d+'")
    ]
    
    for condition, example in conditions:
        print(f"   {condition}: {example}")
    
    # Example 7: Filter actions
    print("\n7ļøāƒ£ Available Actions:")
    
    actions = [
        "šŸ“ Move to folder",
        "šŸ·ļø Apply label",
        "āœ“ Mark as read",
        "⭐ Mark as important",
        "šŸ—‘ļø Delete",
        "āž”ļø Forward",
        "šŸ’¬ Auto-reply",
        "šŸ”” Trigger webhook",
        "šŸ“Š Extract data",
        "šŸ“¦ Archive"
    ]
    
    for action in actions:
        print(f"   {action}")
    
    # Example 8: Rule priorities
    print("\n8ļøāƒ£ Rule Execution Order:")
    
    priorities = [
        ("CRITICAL", "Security & spam filters"),
        ("HIGH", "VIP & urgent messages"),
        ("MEDIUM", "Normal business rules"),
        ("LOW", "Newsletters & promotions"),
        ("BACKGROUND", "Cleanup & archiving")
    ]
    
    for priority, use_case in priorities:
        print(f"   {priority}: {use_case}")
    
    # Example 9: Performance tips
    print("\n9ļøāƒ£ Performance Optimization:")
    
    tips = [
        "Pre-compile regex patterns",
        "Use stop_processing to avoid unnecessary checks",
        "Order rules by frequency of matches",
        "Cache email field values",
        "Batch process similar actions",
        "Use indexes for large rule sets",
        "Monitor rule execution time"
    ]
    
    for tip in tips:
        print(f"   • {tip}")
    
    # Example 10: Best practices
    print("\nšŸ”Ÿ Filter Best Practices:")
    
    practices = [
        "šŸŽÆ Start specific, then generalize",
        "šŸ” Test rules before enabling",
        "šŸ“Š Monitor rule performance",
        "šŸ”„ Review and update regularly",
        "šŸ“ Document complex rules",
        "⚔ Optimize for speed",
        "šŸ›”ļø Security rules first",
        "šŸ“ˆ Track effectiveness",
        "šŸŽØ Use clear naming",
        "šŸ’¾ Backup rule configurations"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    print("\nāœ… Email filtering demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Email Filtering Best Practices šŸ“‹

Pro Tip: Think of email filtering as building a smart traffic control system - each rule is a traffic light that directs emails to the right destination. Start with security filters (spam, phishing) at the highest priority to protect your inbox. Build specific rules for known patterns (invoices, newsletters) before general catch-alls. Use the AND/OR logic carefully - AND is more restrictive, OR is more inclusive. Test rules with sample emails before enabling them in production. Monitor rule performance and adjust priorities based on actual usage. Combine rule-based filtering with machine learning for intelligent categorization. Use stop_processing flags to prevent rule conflicts. Implement auto-responses carefully to avoid email loops. Track which rules match most frequently and optimize their conditions. Document complex rule chains so others can understand them. Most importantly: regularly review and update your rules as email patterns change - a good filtering system evolves with your needs!

Mastering email filtering and rules enables you to build intelligent email management systems that handle any volume of messages. You can now create smart inboxes that automatically sort, respond to, and process emails based on sophisticated criteria. Whether you're managing customer support, processing orders, or maintaining team communications, these filtering skills keep your email organized and efficient! šŸŽÆ