š§ Email Filtering and Rules: Automate Inbox Management
Email filtering and rules are the traffic controllers of your digital communication - they automatically sort, categorize, respond to, and process emails based on intelligent criteria. Like having a personal assistant that never sleeps, mastering email rules allows you to build systems that manage thousands of emails, trigger workflows, maintain inbox zero, and ensure important messages never get lost. Let's explore the complete toolkit for automated email management! šÆ
The Email Filtering Architecture
Think of email filtering as building a smart postal sorting facility - each incoming message is analyzed, categorized, and routed to the right destination based on rules you define. From simple sender-based filters to complex content analysis with machine learning, understanding filtering patterns, rule engines, and action triggers is essential for building intelligent email automation systems!
Real-World Scenario: The Intelligent Email Management Platform š¤
You're building an intelligent email management platform that processes customer support tickets, sorts newsletters and promotions, prioritizes urgent messages, extracts and processes orders, triggers automated workflows, maintains team inboxes, filters spam and phishing, and learns from user behavior. Your system must handle complex rule chains, scale to millions of emails, provide real-time processing, and adapt to changing patterns. Let's build a comprehensive email filtering framework!
# First, install required packages:
# pip install scikit-learn nltk spacy textblob python-Levenshtein fuzzywuzzy
import re
import json
import logging
from typing import List, Dict, Optional, Any, Callable, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import hashlib
from collections import defaultdict, Counter
import pickle
# Machine learning libraries
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
import nltk
from textblob import TextBlob
# Email modules (assuming previous email classes are available)
# from email_reader import EmailData, IMAPClient
# from email_sender import EmailMessage, SMTPClient
# ==================== Filter Configuration ====================
class FilterPriority(Enum):
"""Filter execution priority."""
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
BACKGROUND = 5
class ActionType(Enum):
"""Email action types."""
MOVE_TO_FOLDER = "move_to_folder"
APPLY_LABEL = "apply_label"
MARK_AS_READ = "mark_as_read"
MARK_AS_IMPORTANT = "mark_as_important"
DELETE = "delete"
FORWARD = "forward"
AUTO_REPLY = "auto_reply"
TRIGGER_WEBHOOK = "trigger_webhook"
EXTRACT_DATA = "extract_data"
ADD_TO_CRM = "add_to_crm"
CREATE_TICKET = "create_ticket"
ARCHIVE = "archive"
BLOCK_SENDER = "block_sender"
@dataclass
class FilterCondition:
"""Single filter condition."""
field: str # from, to, subject, body, etc.
operator: str # contains, equals, starts_with, regex, etc.
value: Any
case_sensitive: bool = False
negate: bool = False # NOT condition
@dataclass
class EmailFilter:
"""Email filter rule."""
name: str
description: str
conditions: List[FilterCondition]
actions: List[Dict[str, Any]]
# Rule settings
enabled: bool = True
priority: FilterPriority = FilterPriority.MEDIUM
stop_processing: bool = False # Stop after this rule matches
# Advanced options
condition_logic: str = "AND" # AND or OR
max_executions_per_day: Optional[int] = None
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
# Statistics
execution_count: int = 0
last_executed: Optional[datetime] = None
def __hash__(self):
return hash(self.name)
# ==================== Rule Engine ====================
class EmailRuleEngine:
"""
Core rule engine for email filtering.
"""
def __init__(self):
self.rules: List[EmailFilter] = []
self.rule_groups: Dict[str, List[EmailFilter]] = defaultdict(list)
self.logger = logging.getLogger(__name__)
# Statistics
self.stats = {
"emails_processed": 0,
"rules_matched": 0,
"actions_executed": 0,
"errors": 0
}
# Performance optimization
self.compiled_regexes = {}
def add_rule(self, rule: EmailFilter, group: str = "default"):
"""Add filter rule."""
self.rules.append(rule)
self.rule_groups[group].append(rule)
# Pre-compile regex patterns
for condition in rule.conditions:
if condition.operator == "regex":
pattern_key = f"{rule.name}_{condition.field}_{condition.value}"
if pattern_key not in self.compiled_regexes:
flags = 0 if condition.case_sensitive else re.IGNORECASE
self.compiled_regexes[pattern_key] = re.compile(condition.value, flags)
self.logger.info(f"Added rule: {rule.name}")
def remove_rule(self, rule_name: str):
"""Remove filter rule."""
self.rules = [r for r in self.rules if r.name != rule_name]
for group in self.rule_groups.values():
group[:] = [r for r in group if r.name != rule_name]
self.logger.info(f"Removed rule: {rule_name}")
def process_email(self, email: 'EmailData') -> List[Dict[str, Any]]:
"""
Process email through all rules.
Returns:
List of actions to execute
"""
self.stats["emails_processed"] += 1
actions_to_execute = []
# Sort rules by priority
sorted_rules = sorted(
[r for r in self.rules if r.enabled],
key=lambda r: r.priority.value
)
for rule in sorted_rules:
# Check if rule is valid
if not self._is_rule_valid(rule):
continue
# Check if rule matches
if self._evaluate_rule(rule, email):
self.logger.info(f"Rule '{rule.name}' matched for email: {email.subject}")
# Update rule statistics
rule.execution_count += 1
rule.last_executed = datetime.now()
self.stats["rules_matched"] += 1
# Add actions
for action in rule.actions:
actions_to_execute.append({
"rule": rule.name,
"action": action,
"email": email
})
self.stats["actions_executed"] += 1
# Stop processing if configured
if rule.stop_processing:
break
return actions_to_execute
def _is_rule_valid(self, rule: EmailFilter) -> bool:
"""Check if rule is currently valid."""
now = datetime.now()
# Check date validity
if rule.valid_from and now < rule.valid_from:
return False
if rule.valid_until and now > rule.valid_until:
return False
# Check execution limit
if rule.max_executions_per_day:
if rule.last_executed and rule.last_executed.date() == now.date():
if rule.execution_count >= rule.max_executions_per_day:
return False
return True
def _evaluate_rule(self, rule: EmailFilter, email: 'EmailData') -> bool:
"""Evaluate if rule matches email."""
condition_results = []
for condition in rule.conditions:
result = self._evaluate_condition(condition, email, rule.name)
if condition.negate:
result = not result
condition_results.append(result)
# Apply condition logic
if rule.condition_logic == "AND":
return all(condition_results) if condition_results else False
elif rule.condition_logic == "OR":
return any(condition_results) if condition_results else False
else:
return False
def _evaluate_condition(self, condition: FilterCondition,
email: 'EmailData', rule_name: str) -> bool:
"""Evaluate single condition."""
# Get field value from email
field_value = self._get_email_field(email, condition.field)
if field_value is None:
return False
# Convert to string for comparison
field_str = str(field_value)
value_str = str(condition.value)
# Apply case sensitivity
if not condition.case_sensitive:
field_str = field_str.lower()
value_str = value_str.lower()
# Evaluate based on operator
if condition.operator == "equals":
return field_str == value_str
elif condition.operator == "contains":
return value_str in field_str
elif condition.operator == "starts_with":
return field_str.startswith(value_str)
elif condition.operator == "ends_with":
return field_str.endswith(value_str)
elif condition.operator == "regex":
pattern_key = f"{rule_name}_{condition.field}_{condition.value}"
if pattern_key in self.compiled_regexes:
return bool(self.compiled_regexes[pattern_key].search(field_str))
else:
flags = 0 if condition.case_sensitive else re.IGNORECASE
return bool(re.search(condition.value, field_str, flags))
elif condition.operator == "greater_than":
try:
return float(field_str) > float(value_str)
except:
return False
elif condition.operator == "less_than":
try:
return float(field_str) < float(value_str)
except:
return False
elif condition.operator == "in_list":
values = value_str.split(',') if isinstance(value_str, str) else condition.value
return field_str in values
elif condition.operator == "is_empty":
return len(field_str.strip()) == 0
elif condition.operator == "is_not_empty":
return len(field_str.strip()) > 0
return False
def _get_email_field(self, email: 'EmailData', field: str) -> Any:
"""Get field value from email."""
field_lower = field.lower()
# Standard fields
if field_lower == "from":
return email.from_addr
elif field_lower == "to":
return ', '.join(email.to_addrs) if hasattr(email, 'to_addrs') else ""
elif field_lower == "subject":
return email.subject
elif field_lower == "body":
return email.get_plain_text() if hasattr(email, 'get_plain_text') else ""
elif field_lower == "cc":
return ', '.join(email.cc_addrs) if hasattr(email, 'cc_addrs') else ""
elif field_lower == "date":
return email.date
elif field_lower == "size":
return email.size if hasattr(email, 'size') else 0
elif field_lower == "has_attachments":
return len(email.attachments) > 0 if hasattr(email, 'attachments') else False
elif field_lower == "attachment_count":
return len(email.attachments) if hasattr(email, 'attachments') else 0
elif field_lower == "is_read":
return email.is_read if hasattr(email, 'is_read') else False
elif field_lower == "is_flagged":
return email.is_flagged if hasattr(email, 'is_flagged') else False
# Custom headers
if hasattr(email, 'headers') and field in email.headers:
return email.headers[field]
return None
# ==================== Action Executor ====================
class ActionExecutor:
"""
Execute filter actions on emails.
"""
def __init__(self, imap_client=None, smtp_client=None):
self.imap_client = imap_client
self.smtp_client = smtp_client
self.logger = logging.getLogger(__name__)
# Action handlers
self.action_handlers = {
ActionType.MOVE_TO_FOLDER: self._move_to_folder,
ActionType.APPLY_LABEL: self._apply_label,
ActionType.MARK_AS_READ: self._mark_as_read,
ActionType.DELETE: self._delete_email,
ActionType.FORWARD: self._forward_email,
ActionType.AUTO_REPLY: self._auto_reply,
ActionType.ARCHIVE: self._archive_email,
ActionType.TRIGGER_WEBHOOK: self._trigger_webhook
}
def execute_action(self, action: Dict[str, Any]):
"""Execute single action."""
action_type = ActionType(action["action"]["type"])
if action_type in self.action_handlers:
try:
self.action_handlers[action_type](
action["email"],
action["action"].get("params", {})
)
self.logger.info(f"Executed action: {action_type.value}")
return True
except Exception as e:
self.logger.error(f"Action execution failed: {e}")
return False
else:
self.logger.warning(f"Unknown action type: {action_type}")
return False
def _move_to_folder(self, email: 'EmailData', params: Dict):
"""Move email to folder."""
if self.imap_client:
folder = params.get("folder", "Processed")
self.imap_client.move_message(email.uid, folder)
def _apply_label(self, email: 'EmailData', params: Dict):
"""Apply label to email."""
label = params.get("label", "")
# Implementation depends on email provider
self.logger.info(f"Applied label '{label}' to email")
def _mark_as_read(self, email: 'EmailData', params: Dict):
"""Mark email as read."""
if self.imap_client:
self.imap_client.mark_as_read(email.uid)
def _delete_email(self, email: 'EmailData', params: Dict):
"""Delete email."""
if self.imap_client:
self.imap_client.delete_message(email.uid)
def _forward_email(self, email: 'EmailData', params: Dict):
"""Forward email to recipients."""
if self.smtp_client:
recipients = params.get("recipients", [])
# Create forwarded message
# Implementation would create new EmailMessage with forwarded content
self.logger.info(f"Forwarded email to {recipients}")
def _auto_reply(self, email: 'EmailData', params: Dict):
"""Send automatic reply."""
if self.smtp_client:
template = params.get("template", "")
# Create reply message
# Implementation would use template to create reply
self.logger.info(f"Sent auto-reply to {email.from_addr}")
def _archive_email(self, email: 'EmailData', params: Dict):
"""Archive email."""
if self.imap_client:
archive_folder = params.get("folder", "Archive")
self.imap_client.move_message(email.uid, archive_folder)
def _trigger_webhook(self, email: 'EmailData', params: Dict):
"""Trigger webhook with email data."""
import requests
webhook_url = params.get("url", "")
if webhook_url:
payload = {
"email_id": email.uid,
"from": email.from_addr,
"subject": email.subject,
"date": email.date.isoformat(),
"body": email.get_plain_text()[:500] if hasattr(email, 'get_plain_text') else ""
}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
self.logger.info(f"Triggered webhook: {webhook_url}")
except Exception as e:
self.logger.error(f"Webhook failed: {e}")
# ==================== Spam Filter ====================
class SpamFilter:
"""
Spam detection using machine learning.
"""
def __init__(self, model_path: Optional[str] = None):
self.logger = logging.getLogger(__name__)
# Initialize ML model
if model_path and Path(model_path).exists():
self.load_model(model_path)
else:
self._initialize_model()
# Spam indicators
self.spam_keywords = [
'viagra', 'cialis', 'casino', 'winner', 'congratulations',
'click here', 'limited time', 'act now', 'free money',
'make money fast', 'work from home', 'lose weight',
'guaranteed', '100% free', 'no obligation'
]
self.suspicious_patterns = [
r'\b[A-Z]{5,}\b', # All caps words
r'[!]{3,}', # Multiple exclamation marks
r'\$\d+', # Money amounts
r'https?://bit\.ly/', # Shortened URLs
]
def _initialize_model(self):
"""Initialize spam detection model."""
# Create pipeline with TF-IDF and Naive Bayes
self.model = Pipeline([
('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
('classifier', MultinomialNB())
])
# Train with sample data (in production, use real training data)
self._train_with_sample_data()
def _train_with_sample_data(self):
"""Train model with sample data."""
# Sample training data
ham_samples = [
"Meeting scheduled for tomorrow at 2 PM",
"Please review the attached document",
"Thanks for your help with the project",
"Can we discuss the quarterly report?",
"Here's the invoice for last month"
]
spam_samples = [
"Congratulations! You've won $1000000!!!",
"Click here for free viagra now limited time",
"Make money fast working from home guaranteed",
"You are the winner! Claim your prize now!",
"Lose weight fast with this one weird trick"
]
X = ham_samples + spam_samples
y = [0] * len(ham_samples) + [1] * len(spam_samples)
self.model.fit(X, y)
def is_spam(self, email: 'EmailData') -> Tuple[bool, float]:
"""
Check if email is spam.
Returns:
Tuple of (is_spam, confidence_score)
"""
# Get email content
content = f"{email.subject} {email.get_plain_text()}" \
if hasattr(email, 'get_plain_text') else email.subject
# Rule-based checks
spam_score = self._calculate_spam_score(content, email)
# ML prediction
try:
prediction = self.model.predict([content])[0]
probability = self.model.predict_proba([content])[0][1]
# Combine rule-based and ML scores
combined_score = (spam_score + probability) / 2
is_spam = combined_score > 0.5
return is_spam, combined_score
except Exception as e:
self.logger.error(f"Spam detection failed: {e}")
return spam_score > 0.5, spam_score
def _calculate_spam_score(self, content: str, email: 'EmailData') -> float:
"""Calculate spam score based on rules."""
score = 0.0
content_lower = content.lower()
# Check spam keywords
keyword_count = sum(1 for keyword in self.spam_keywords
if keyword in content_lower)
score += min(keyword_count * 0.1, 0.5)
# Check suspicious patterns
for pattern in self.suspicious_patterns:
if re.search(pattern, content):
score += 0.1
# Check sender
if hasattr(email, 'from_addr'):
# No sender name
if not hasattr(email, 'from_name') or not email.from_name:
score += 0.1
# Suspicious domain
if any(domain in email.from_addr for domain in ['.tk', '.ml', '.ga']):
score += 0.2
# Check for all caps subject
if email.subject.isupper() and len(email.subject) > 10:
score += 0.2
# Too many links
link_count = len(re.findall(r'https?://', content))
if link_count > 5:
score += 0.1
return min(score, 1.0)
def train(self, emails: List['EmailData'], labels: List[int]):
"""Train spam filter with labeled data."""
X = []
for email in emails:
content = f"{email.subject} {email.get_plain_text()}" \
if hasattr(email, 'get_plain_text') else email.subject
X.append(content)
self.model.fit(X, labels)
self.logger.info(f"Trained spam filter with {len(emails)} samples")
def save_model(self, path: str):
"""Save trained model."""
with open(path, 'wb') as f:
pickle.dump(self.model, f)
self.logger.info(f"Saved spam model to {path}")
def load_model(self, path: str):
"""Load trained model."""
with open(path, 'rb') as f:
self.model = pickle.load(f)
self.logger.info(f"Loaded spam model from {path}")
# ==================== Smart Categorizer ====================
class EmailCategorizer:
"""
Categorize emails using NLP and patterns.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Predefined categories
self.categories = {
"invoices": {
"keywords": ["invoice", "bill", "payment", "due", "amount"],
"patterns": [r"Invoice\s*#?\s*\d+", r"\$\d+\.\d{2}"]
},
"receipts": {
"keywords": ["receipt", "purchase", "order", "confirmation"],
"patterns": [r"Order\s*#?\s*\d+", r"Transaction\s*ID"]
},
"newsletters": {
"keywords": ["newsletter", "unsubscribe", "update", "news"],
"patterns": [r"unsubscribe", r"email preferences"]
},
"support": {
"keywords": ["help", "support", "issue", "problem", "ticket"],
"patterns": [r"Ticket\s*#?\s*\d+", r"Case\s*#?\s*\d+"]
},
"social": {
"keywords": ["friend", "invite", "share", "like", "comment"],
"patterns": [r"@\w+", r"#\w+"]
},
"promotions": {
"keywords": ["sale", "discount", "offer", "deal", "save"],
"patterns": [r"\d+%\s*off", r"limited time"]
}
}
def categorize(self, email: 'EmailData') -> List[str]:
"""
Categorize email into one or more categories.
Returns:
List of category names
"""
content = f"{email.subject} {email.get_plain_text()}" \
if hasattr(email, 'get_plain_text') else email.subject
content_lower = content.lower()
matched_categories = []
scores = {}
for category, criteria in self.categories.items():
score = 0
# Check keywords
for keyword in criteria["keywords"]:
if keyword in content_lower:
score += 1
# Check patterns
for pattern in criteria["patterns"]:
if re.search(pattern, content, re.IGNORECASE):
score += 2
if score > 0:
scores[category] = score
# Return categories sorted by score
sorted_categories = sorted(scores.items(), key=lambda x: x[1], reverse=True)
# Return categories with significant scores
threshold = max(scores.values()) * 0.5 if scores else 0
matched_categories = [cat for cat, score in sorted_categories if score >= threshold]
# Use sentiment analysis for additional categorization
sentiment = self._analyze_sentiment(content)
if sentiment == "urgent":
matched_categories.insert(0, "urgent")
return matched_categories if matched_categories else ["uncategorized"]
def _analyze_sentiment(self, content: str) -> str:
"""Analyze email sentiment."""
try:
blob = TextBlob(content)
polarity = blob.sentiment.polarity
# Check for urgency indicators
urgent_words = ["urgent", "asap", "immediately", "critical", "emergency"]
if any(word in content.lower() for word in urgent_words):
return "urgent"
# Sentiment based on polarity
if polarity > 0.3:
return "positive"
elif polarity < -0.3:
return "negative"
else:
return "neutral"
except Exception as e:
self.logger.debug(f"Sentiment analysis failed: {e}")
return "neutral"
# ==================== Auto Responder ====================
class AutoResponder:
"""
Automatic email response system.
"""
def __init__(self, smtp_client=None):
self.smtp_client = smtp_client
self.logger = logging.getLogger(__name__)
# Response templates
self.templates = {}
# Response tracking
self.response_log = defaultdict(list)
def add_template(self, name: str, template: str,
conditions: Dict[str, Any] = None):
"""Add auto-response template."""
self.templates[name] = {
"template": template,
"conditions": conditions or {},
"usage_count": 0
}
self.logger.info(f"Added auto-response template: {name}")
def should_respond(self, email: 'EmailData') -> bool:
"""Check if email should receive auto-response."""
# Don't respond to automated emails
if self._is_automated_email(email):
return False
# Check if already responded recently
if self._already_responded(email.from_addr):
return False
# Don't respond to mailing lists
if self._is_mailing_list(email):
return False
return True
def generate_response(self, email: 'EmailData',
template_name: str) -> Optional[str]:
"""Generate auto-response for email."""
if template_name not in self.templates:
self.logger.warning(f"Template not found: {template_name}")
return None
template_data = self.templates[template_name]
template = template_data["template"]
# Replace variables
response = template.format(
sender_name=email.from_name or "there",
sender_email=email.from_addr,
subject=email.subject,
date=datetime.now().strftime("%B %d, %Y"),
time=datetime.now().strftime("%I:%M %p")
)
# Update usage
template_data["usage_count"] += 1
# Log response
self.response_log[email.from_addr].append({
"date": datetime.now(),
"template": template_name,
"subject": email.subject
})
return response
def _is_automated_email(self, email: 'EmailData') -> bool:
"""Check if email is automated."""
automated_indicators = [
"noreply", "no-reply", "donotreply", "mailer-daemon",
"postmaster", "automated", "notification"
]
from_addr_lower = email.from_addr.lower()
return any(indicator in from_addr_lower for indicator in automated_indicators)
def _is_mailing_list(self, email: 'EmailData') -> bool:
"""Check if email is from mailing list."""
if hasattr(email, 'headers'):
# Check for list headers
list_headers = ['List-Unsubscribe', 'List-Id', 'Mailing-List']
for header in list_headers:
if header in email.headers:
return True
return False
def _already_responded(self, email_address: str,
hours: int = 24) -> bool:
"""Check if already responded to sender recently."""
if email_address not in self.response_log:
return False
responses = self.response_log[email_address]
if responses:
last_response = responses[-1]["date"]
time_diff = datetime.now() - last_response
return time_diff.total_seconds() < (hours * 3600)
return False
# ==================== Rule Builder ====================
class RuleBuilder:
"""
Fluent interface for building email rules.
"""
def __init__(self, name: str):
self.filter = EmailFilter(
name=name,
description="",
conditions=[],
actions=[]
)
def description(self, desc: str) -> 'RuleBuilder':
"""Set rule description."""
self.filter.description = desc
return self
def when_from(self, email: str) -> 'RuleBuilder':
"""Add from condition."""
self.filter.conditions.append(FilterCondition(
field="from",
operator="contains",
value=email
))
return self
def when_subject_contains(self, text: str) -> 'RuleBuilder':
"""Add subject contains condition."""
self.filter.conditions.append(FilterCondition(
field="subject",
operator="contains",
value=text
))
return self
def when_body_contains(self, text: str) -> 'RuleBuilder':
"""Add body contains condition."""
self.filter.conditions.append(FilterCondition(
field="body",
operator="contains",
value=text
))
return self
def when_has_attachments(self) -> 'RuleBuilder':
"""Add has attachments condition."""
self.filter.conditions.append(FilterCondition(
field="has_attachments",
operator="equals",
value=True
))
return self
def move_to(self, folder: str) -> 'RuleBuilder':
"""Add move to folder action."""
self.filter.actions.append({
"type": ActionType.MOVE_TO_FOLDER.value,
"params": {"folder": folder}
})
return self
def mark_as_read(self) -> 'RuleBuilder':
"""Add mark as read action."""
self.filter.actions.append({
"type": ActionType.MARK_AS_READ.value,
"params": {}
})
return self
def forward_to(self, *recipients: str) -> 'RuleBuilder':
"""Add forward action."""
self.filter.actions.append({
"type": ActionType.FORWARD.value,
"params": {"recipients": list(recipients)}
})
return self
def auto_reply(self, template: str) -> 'RuleBuilder':
"""Add auto-reply action."""
self.filter.actions.append({
"type": ActionType.AUTO_REPLY.value,
"params": {"template": template}
})
return self
def with_priority(self, priority: FilterPriority) -> 'RuleBuilder':
"""Set rule priority."""
self.filter.priority = priority
return self
def stop_processing_after(self) -> 'RuleBuilder':
"""Stop processing after this rule."""
self.filter.stop_processing = True
return self
def build(self) -> EmailFilter:
"""Build and return the filter."""
return self.filter
# Example usage
if __name__ == "__main__":
print("š§ Email Filtering and Rules Examples\n")
# Example 1: Simple filter rule
print("1ļøā£ Creating Simple Filter:")
rule = (RuleBuilder("Invoice Filter")
.description("Move invoices to folder")
.when_subject_contains("invoice")
.move_to("Invoices")
.mark_as_read()
.build())
print(f" Rule: {rule.name}")
print(f" Condition: Subject contains 'invoice'")
print(f" Actions: Move to Invoices, Mark as read")
# Example 2: Complex rule
print("\n2ļøā£ Complex Filter Rule:")
complex_rule = EmailFilter(
name="VIP Customer Support",
description="Priority handling for VIP customers",
conditions=[
FilterCondition("from", "contains", "@vip.com"),
FilterCondition("subject", "regex", r"(urgent|critical|emergency)", False)
],
actions=[
{"type": "move_to_folder", "params": {"folder": "VIP Support"}},
{"type": "mark_as_important", "params": {}},
{"type": "forward", "params": {"recipients": ["support-lead@company.com"]}}
],
priority=FilterPriority.HIGH,
stop_processing=True
)
print(f" Rule: {complex_rule.name}")
print(f" Conditions: {len(complex_rule.conditions)}")
print(f" Actions: {len(complex_rule.actions)}")
print(f" Priority: {complex_rule.priority.name}")
# Example 3: Spam detection
print("\n3ļøā£ Spam Detection:")
spam_filter = SpamFilter()
# Create mock email
class MockEmail:
def __init__(self, subject, body):
self.subject = subject
self.from_addr = "spammer@suspicious.tk"
self.body = body
def get_plain_text(self):
return self.body
spam_email = MockEmail(
"YOU'VE WON $1000000!!!",
"Click here now for free money guaranteed!"
)
is_spam, score = spam_filter.is_spam(spam_email)
print(f" Email: {spam_email.subject}")
print(f" Spam: {is_spam}")
print(f" Confidence: {score:.2%}")
# Example 4: Email categorization
print("\n4ļøā£ Email Categories:")
categorizer = EmailCategorizer()
test_emails = [
("Invoice #12345 - Due March 15", ["invoices"]),
("Your order has been shipped!", ["receipts"]),
("Weekly Newsletter - Tech Updates", ["newsletters"]),
("50% OFF - Limited Time Sale!", ["promotions"]),
("Support Ticket #789 - Resolved", ["support"])
]
for subject, expected in test_emails:
email = MockEmail(subject, "")
categories = categorizer.categorize(email)
print(f" {subject[:30]}... ā {', '.join(categories)}")
# Example 5: Auto-responder
print("\n5ļøā£ Auto-Response Templates:")
responder = AutoResponder()
responder.add_template(
"out_of_office",
"Hi {sender_name},\n\nI'm currently out of office and will respond when I return.\n\nBest regards"
)
responder.add_template(
"received",
"Thank you for your email regarding '{subject}'. We've received it and will respond within 24 hours."
)
print(" Templates added:")
print(" ⢠Out of Office")
print(" ⢠Message Received")
# Example 6: Filter conditions
print("\n6ļøā£ Available Filter Conditions:")
conditions = [
("Sender", "from contains 'example.com'"),
("Subject", "subject starts_with 'Re:'"),
("Body", "body contains 'invoice'"),
("Attachments", "has_attachments equals true"),
("Size", "size greater_than 1048576"),
("Date", "date less_than '2024-01-01'"),
("Read Status", "is_read equals false"),
("Regex", "subject regex '^Invoice #\\d+'")
]
for condition, example in conditions:
print(f" {condition}: {example}")
# Example 7: Filter actions
print("\n7ļøā£ Available Actions:")
actions = [
"š Move to folder",
"š·ļø Apply label",
"ā Mark as read",
"ā Mark as important",
"šļø Delete",
"ā”ļø Forward",
"š¬ Auto-reply",
"š Trigger webhook",
"š Extract data",
"š¦ Archive"
]
for action in actions:
print(f" {action}")
# Example 8: Rule priorities
print("\n8ļøā£ Rule Execution Order:")
priorities = [
("CRITICAL", "Security & spam filters"),
("HIGH", "VIP & urgent messages"),
("MEDIUM", "Normal business rules"),
("LOW", "Newsletters & promotions"),
("BACKGROUND", "Cleanup & archiving")
]
for priority, use_case in priorities:
print(f" {priority}: {use_case}")
# Example 9: Performance tips
print("\n9ļøā£ Performance Optimization:")
tips = [
"Pre-compile regex patterns",
"Use stop_processing to avoid unnecessary checks",
"Order rules by frequency of matches",
"Cache email field values",
"Batch process similar actions",
"Use indexes for large rule sets",
"Monitor rule execution time"
]
for tip in tips:
print(f" ⢠{tip}")
# Example 10: Best practices
print("\nš Filter Best Practices:")
practices = [
"šÆ Start specific, then generalize",
"š Test rules before enabling",
"š Monitor rule performance",
"š Review and update regularly",
"š Document complex rules",
"ā” Optimize for speed",
"š”ļø Security rules first",
"š Track effectiveness",
"šØ Use clear naming",
"š¾ Backup rule configurations"
]
for practice in practices:
print(f" {practice}")
print("\nā
Email filtering demonstration complete!")
Key Takeaways and Best Practices šÆ
- Design Rules Carefully: Start specific, then generalize as needed.
- Order Matters: Process critical rules first with priorities.
- Test Before Deploying: Always test rules with sample emails.
- Use Stop Processing: Prevent unnecessary rule evaluations.
- Monitor Performance: Track rule execution and effectiveness.
- Combine Techniques: Use ML with rule-based filtering.
- Handle Errors Gracefully: Rules should fail safely.
- Document Complex Logic: Make rules maintainable.
Email Filtering Best Practices š
Mastering email filtering and rules enables you to build intelligent email management systems that handle any volume of messages. You can now create smart inboxes that automatically sort, respond to, and process emails based on sophisticated criteria. Whether you're managing customer support, processing orders, or maintaining team communications, these filtering skills keep your email organized and efficient! šÆ
Pro Tip: Think of email filtering as building a smart traffic control system - each rule is a traffic light that directs emails to the right destination. Start with security filters (spam, phishing) at the highest priority to protect your inbox. Build specific rules for known patterns (invoices, newsletters) before general catch-alls. Use the AND/OR logic carefully - AND is more restrictive, OR is more inclusive. Test rules with sample emails before enabling them in production. Monitor rule performance and adjust priorities based on actual usage. Combine rule-based filtering with machine learning for intelligent categorization. Use stop_processing flags to prevent rule conflicts. Implement auto-responses carefully to avoid email loops. Track which rules match most frequently and optimize their conditions. Document complex rule chains so others can understand them. Most importantly: regularly review and update your rules as email patterns change - a good filtering system evolves with your needs!