๐ฅ Reading Emails with IMAP: Process Incoming Messages
IMAP (Internet Message Access Protocol) is your gateway to reading and managing emails programmatically - it's like having a digital assistant that can monitor your inbox, organize messages, extract data, and trigger actions based on incoming emails. From simple inbox monitoring to complex email processing pipelines, mastering IMAP allows you to build intelligent email automation systems that respond to messages in real-time! ๐ฌ
The IMAP Email Processing Architecture
Think of IMAP as a remote control for your email inbox - you can browse folders, search messages, read content, manage flags, and organize emails without downloading everything locally. Unlike POP3 which downloads and deletes, IMAP keeps emails on the server, allowing multiple clients to access the same mailbox. Understanding IMAP's folder structure, search capabilities, and message handling is essential for building robust email processing systems!
Real-World Scenario: The Intelligent Email Processor ๐ค
You're building an intelligent email processing system that monitors multiple inboxes, automatically categorizes messages, extracts attachments and data, responds to customer inquiries, processes orders from emails, generates reports from email data, and triggers workflows based on email content. Your system must handle thousands of emails daily, support multiple email providers, process attachments, and maintain inbox organization. Let's build a comprehensive IMAP email processing framework!
# First, install required packages:
# pip install imap-tools python-dateutil beautifulsoup4 python-magic chardet
import imaplib
import email
from email.header import decode_header
from email.utils import parsedate_to_datetime
import ssl
import os
import re
import logging
from typing import List, Dict, Optional, Any, Union, Tuple, Generator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import json
import base64
import hashlib
import chardet
from bs4 import BeautifulSoup
import time
from collections import defaultdict
import threading
from queue import Queue
import mimetypes
# ==================== IMAP Configuration ====================
class IMAPProvider(Enum):
"""Common IMAP providers."""
GMAIL = "gmail"
OUTLOOK = "outlook"
YAHOO = "yahoo"
ICLOUD = "icloud"
CUSTOM = "custom"
@dataclass
class IMAPConfig:
"""IMAP server configuration."""
host: str
port: int = 993
username: str
password: str
use_ssl: bool = True
# Provider settings
provider: IMAPProvider = IMAPProvider.CUSTOM
# Connection settings
timeout: int = 30
max_retries: int = 3
retry_delay: float = 5.0
# Processing settings
batch_size: int = 50
mark_as_read: bool = False
delete_after_processing: bool = False
# Folder settings
inbox_folder: str = "INBOX"
sent_folder: str = "Sent"
trash_folder: str = "Trash"
@classmethod
def from_provider(cls, provider: IMAPProvider, username: str, password: str):
"""Create config for common providers."""
configs = {
IMAPProvider.GMAIL: {
"host": "imap.gmail.com",
"port": 993,
"inbox_folder": "INBOX",
"sent_folder": "[Gmail]/Sent Mail",
"trash_folder": "[Gmail]/Trash"
},
IMAPProvider.OUTLOOK: {
"host": "outlook.office365.com",
"port": 993,
"inbox_folder": "INBOX",
"sent_folder": "Sent Items",
"trash_folder": "Deleted Items"
},
IMAPProvider.YAHOO: {
"host": "imap.mail.yahoo.com",
"port": 993,
"inbox_folder": "INBOX",
"sent_folder": "Sent",
"trash_folder": "Trash"
},
IMAPProvider.ICLOUD: {
"host": "imap.mail.me.com",
"port": 993,
"inbox_folder": "INBOX",
"sent_folder": "Sent Messages",
"trash_folder": "Deleted Messages"
}
}
if provider in configs:
config = configs[provider]
return cls(
host=config["host"],
port=config["port"],
username=username,
password=password,
provider=provider,
inbox_folder=config["inbox_folder"],
sent_folder=config["sent_folder"],
trash_folder=config["trash_folder"]
)
else:
raise ValueError(f"Unknown provider: {provider}")
# ==================== Email Message Structure ====================
@dataclass
class EmailData:
"""Parsed email message data."""
uid: str
message_id: str
subject: str
from_addr: str
from_name: Optional[str]
to_addrs: List[str]
cc_addrs: List[str]
bcc_addrs: List[str]
date: datetime
# Content
text_body: Optional[str] = None
html_body: Optional[str] = None
attachments: List[Dict[str, Any]] = field(default_factory=list)
# Headers
headers: Dict[str, str] = field(default_factory=dict)
in_reply_to: Optional[str] = None
references: Optional[str] = None
# Flags
is_read: bool = False
is_flagged: bool = False
is_answered: bool = False
is_draft: bool = False
# Metadata
size: int = 0
folder: str = "INBOX"
labels: List[str] = field(default_factory=list)
def get_plain_text(self) -> str:
"""Get plain text version of email."""
if self.text_body:
return self.text_body
elif self.html_body:
# Convert HTML to plain text
soup = BeautifulSoup(self.html_body, 'html.parser')
return soup.get_text()
return ""
# ==================== IMAP Client ====================
class IMAPClient:
"""
IMAP client for reading and managing emails.
"""
def __init__(self, config: IMAPConfig):
self.config = config
self.connection = None
self.current_folder = None
self.logger = logging.getLogger(__name__)
# Statistics
self.stats = {
"messages_read": 0,
"messages_processed": 0,
"attachments_downloaded": 0,
"errors": 0
}
def connect(self):
"""Establish IMAP connection."""
try:
if self.config.use_ssl:
context = ssl.create_default_context()
self.connection = imaplib.IMAP4_SSL(
self.config.host,
self.config.port,
ssl_context=context,
timeout=self.config.timeout
)
else:
self.connection = imaplib.IMAP4(
self.config.host,
self.config.port,
timeout=self.config.timeout
)
# Login
self.connection.login(self.config.username, self.config.password)
self.logger.info(f"Connected to IMAP server: {self.config.host}")
except imaplib.IMAP4.error as e:
self.logger.error(f"IMAP error: {e}")
raise
except Exception as e:
self.logger.error(f"Connection failed: {e}")
raise
def disconnect(self):
"""Close IMAP connection."""
if self.connection:
try:
self.connection.close()
self.connection.logout()
except:
pass
self.connection = None
self.logger.info("Disconnected from IMAP server")
def list_folders(self) -> List[str]:
"""List all folders in mailbox."""
if not self.connection:
self.connect()
try:
status, folders = self.connection.list()
if status == 'OK':
folder_list = []
for folder in folders:
# Parse folder string
if folder:
parts = folder.decode().split(' "." ')
if len(parts) > 0:
folder_name = parts[-1].strip('"')
folder_list.append(folder_name)
return folder_list
return []
except Exception as e:
self.logger.error(f"Failed to list folders: {e}")
return []
def select_folder(self, folder: str = None) -> Dict[str, int]:
"""
Select folder for operations.
Returns:
Dictionary with folder statistics
"""
if not self.connection:
self.connect()
folder = folder or self.config.inbox_folder
try:
status, data = self.connection.select(folder)
if status == 'OK':
self.current_folder = folder
# Get folder stats
total_messages = int(data[0].decode()) if data[0] else 0
# Get unread count
status, unread = self.connection.search(None, 'UNSEEN')
unread_count = len(unread[0].split()) if status == 'OK' else 0
stats = {
"folder": folder,
"total": total_messages,
"unread": unread_count
}
self.logger.info(f"Selected folder: {folder} ({total_messages} messages, {unread_count} unread)")
return stats
raise Exception(f"Failed to select folder: {folder}")
except Exception as e:
self.logger.error(f"Failed to select folder: {e}")
raise
def search_messages(self, criteria: str = "ALL",
limit: Optional[int] = None) -> List[str]:
"""
Search messages using IMAP search criteria.
Common criteria:
- ALL: All messages
- UNSEEN: Unread messages
- SEEN: Read messages
- FLAGGED: Flagged messages
- SINCE "date": Messages since date
- BEFORE "date": Messages before date
- FROM "email": From specific sender
- TO "email": To specific recipient
- SUBJECT "text": Subject contains text
- TEXT "text": Body contains text
Returns:
List of message IDs
"""
if not self.current_folder:
self.select_folder()
try:
status, data = self.connection.search(None, criteria)
if status == 'OK':
message_ids = data[0].split()
if limit and len(message_ids) > limit:
message_ids = message_ids[-limit:] # Get most recent
self.logger.info(f"Found {len(message_ids)} messages matching: {criteria}")
return [id.decode() for id in message_ids]
return []
except Exception as e:
self.logger.error(f"Search failed: {e}")
return []
def fetch_message(self, message_id: str) -> Optional[EmailData]:
"""Fetch and parse single message."""
try:
# Fetch message data
status, data = self.connection.fetch(message_id, '(RFC822 FLAGS)')
if status != 'OK':
return None
# Parse message
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# Parse flags
flags = imaplib.ParseFlags(data[0][0])
# Create EmailData object
email_data = self._parse_message(msg, message_id, flags)
self.stats["messages_read"] += 1
return email_data
except Exception as e:
self.logger.error(f"Failed to fetch message {message_id}: {e}")
self.stats["errors"] += 1
return None
def fetch_messages_batch(self, message_ids: List[str]) -> Generator[EmailData, None, None]:
"""Fetch multiple messages in batch."""
for message_id in message_ids:
email_data = self.fetch_message(message_id)
if email_data:
yield email_data
def _parse_message(self, msg: email.message.Message,
uid: str, flags: bytes) -> EmailData:
"""Parse email message into EmailData object."""
# Parse headers
subject = self._decode_header(msg.get("Subject", ""))
from_header = self._decode_header(msg.get("From", ""))
from_addr, from_name = self._parse_address(from_header)
# Parse recipients
to_addrs = self._parse_addresses(msg.get("To", ""))
cc_addrs = self._parse_addresses(msg.get("Cc", ""))
bcc_addrs = self._parse_addresses(msg.get("Bcc", ""))
# Parse date
date_str = msg.get("Date", "")
try:
date = parsedate_to_datetime(date_str)
except:
date = datetime.now()
# Parse flags
flags_str = str(flags)
is_read = "\\Seen" in flags_str
is_flagged = "\\Flagged" in flags_str
is_answered = "\\Answered" in flags_str
is_draft = "\\Draft" in flags_str
# Create EmailData
email_data = EmailData(
uid=uid,
message_id=msg.get("Message-ID", ""),
subject=subject,
from_addr=from_addr,
from_name=from_name,
to_addrs=to_addrs,
cc_addrs=cc_addrs,
bcc_addrs=bcc_addrs,
date=date,
in_reply_to=msg.get("In-Reply-To"),
references=msg.get("References"),
is_read=is_read,
is_flagged=is_flagged,
is_answered=is_answered,
is_draft=is_draft,
folder=self.current_folder or "INBOX"
)
# Extract all headers
for header, value in msg.items():
email_data.headers[header] = self._decode_header(value)
# Parse body and attachments
self._parse_body(msg, email_data)
return email_data
def _parse_body(self, msg: email.message.Message, email_data: EmailData):
"""Parse email body and attachments."""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip multipart containers
if part.is_multipart():
continue
# Check if attachment
if "attachment" in content_disposition or \
(content_disposition and "inline" in content_disposition and
content_type.startswith("image/")):
# Process attachment
self._process_attachment(part, email_data)
else:
# Process body
if content_type == "text/plain":
body = self._decode_content(part)
if body and not email_data.text_body:
email_data.text_body = body
elif content_type == "text/html":
body = self._decode_content(part)
if body and not email_data.html_body:
email_data.html_body = body
else:
# Single part message
content_type = msg.get_content_type()
if content_type == "text/plain":
email_data.text_body = self._decode_content(msg)
elif content_type == "text/html":
email_data.html_body = self._decode_content(msg)
def _process_attachment(self, part: email.message.Message, email_data: EmailData):
"""Process email attachment."""
filename = part.get_filename()
if filename:
filename = self._decode_header(filename)
# Get attachment data
attachment_data = part.get_payload(decode=True)
if attachment_data:
attachment_info = {
"filename": filename,
"content_type": part.get_content_type(),
"size": len(attachment_data),
"data": attachment_data,
"content_id": part.get("Content-ID", "").strip("<>")
}
email_data.attachments.append(attachment_info)
self.logger.debug(f"Found attachment: {filename} ({len(attachment_data)} bytes)")
def _decode_header(self, header: str) -> str:
"""Decode email header."""
if not header:
return ""
decoded_parts = []
for part, encoding in decode_header(header):
if isinstance(part, bytes):
if encoding:
try:
decoded_parts.append(part.decode(encoding))
except:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(part)
return ' '.join(decoded_parts)
def _decode_content(self, part: email.message.Message) -> str:
"""Decode email content."""
payload = part.get_payload(decode=True)
if not payload:
return ""
# Try to detect encoding
charset = part.get_content_charset()
if not charset:
# Try to detect encoding
detected = chardet.detect(payload)
charset = detected.get('encoding', 'utf-8')
try:
return payload.decode(charset or 'utf-8')
except:
return payload.decode('utf-8', errors='ignore')
def _parse_address(self, address_str: str) -> Tuple[str, Optional[str]]:
"""Parse email address into email and name."""
import email.utils
name, addr = email.utils.parseaddr(address_str)
return addr, name if name else None
def _parse_addresses(self, addresses_str: str) -> List[str]:
"""Parse multiple email addresses."""
if not addresses_str:
return []
import email.utils
addresses = []
for name, addr in email.utils.getaddresses([addresses_str]):
if addr:
addresses.append(addr)
return addresses
def mark_as_read(self, message_id: str):
"""Mark message as read."""
try:
self.connection.store(message_id, '+FLAGS', '\\Seen')
self.logger.debug(f"Marked message {message_id} as read")
except Exception as e:
self.logger.error(f"Failed to mark as read: {e}")
def mark_as_unread(self, message_id: str):
"""Mark message as unread."""
try:
self.connection.store(message_id, '-FLAGS', '\\Seen')
self.logger.debug(f"Marked message {message_id} as unread")
except Exception as e:
self.logger.error(f"Failed to mark as unread: {e}")
def flag_message(self, message_id: str):
"""Flag message."""
try:
self.connection.store(message_id, '+FLAGS', '\\Flagged')
self.logger.debug(f"Flagged message {message_id}")
except Exception as e:
self.logger.error(f"Failed to flag message: {e}")
def move_message(self, message_id: str, target_folder: str):
"""Move message to another folder."""
try:
# Copy message to target folder
result = self.connection.copy(message_id, target_folder)
if result[0] == 'OK':
# Mark for deletion in current folder
self.connection.store(message_id, '+FLAGS', '\\Deleted')
# Expunge to permanently delete from current folder
self.connection.expunge()
self.logger.info(f"Moved message {message_id} to {target_folder}")
return True
except Exception as e:
self.logger.error(f"Failed to move message: {e}")
return False
def delete_message(self, message_id: str):
"""Delete message."""
try:
# Move to trash
if self.config.trash_folder:
return self.move_message(message_id, self.config.trash_folder)
else:
# Mark for deletion
self.connection.store(message_id, '+FLAGS', '\\Deleted')
self.connection.expunge()
return True
except Exception as e:
self.logger.error(f"Failed to delete message: {e}")
return False
def save_attachments(self, email_data: EmailData,
output_dir: str = "./attachments") -> List[str]:
"""Save email attachments to disk."""
saved_files = []
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for attachment in email_data.attachments:
filename = attachment["filename"]
# Sanitize filename
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Handle duplicate filenames
file_path = output_path / filename
counter = 1
while file_path.exists():
name, ext = os.path.splitext(filename)
file_path = output_path / f"{name}_{counter}{ext}"
counter += 1
# Save file
with open(file_path, 'wb') as f:
f.write(attachment["data"])
saved_files.append(str(file_path))
self.stats["attachments_downloaded"] += 1
self.logger.info(f"Saved attachment: {file_path}")
return saved_files
# ==================== Email Monitor ====================
class EmailMonitor:
"""
Monitor inbox for new emails.
"""
def __init__(self, imap_client: IMAPClient):
self.client = imap_client
self.running = False
self.check_interval = 60 # seconds
self.last_check = None
self.processed_ids = set()
self.callbacks = []
self.logger = logging.getLogger(__name__)
def add_callback(self, callback: callable):
"""Add callback for new emails."""
self.callbacks.append(callback)
def start(self, check_interval: int = 60):
"""Start monitoring."""
self.check_interval = check_interval
self.running = True
monitor_thread = threading.Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
self.logger.info(f"Started email monitoring (interval: {check_interval}s)")
def stop(self):
"""Stop monitoring."""
self.running = False
self.logger.info("Stopped email monitoring")
def _monitor_loop(self):
"""Main monitoring loop."""
while self.running:
try:
# Check for new emails
self._check_new_emails()
# Wait for next check
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Monitor error: {e}")
time.sleep(self.check_interval)
def _check_new_emails(self):
"""Check for new emails."""
try:
# Select inbox
self.client.select_folder()
# Search for recent messages
if self.last_check:
# Get messages since last check
since_date = self.last_check.strftime("%d-%b-%Y")
criteria = f'SINCE "{since_date}"'
else:
# Get unread messages
criteria = "UNSEEN"
message_ids = self.client.search_messages(criteria)
# Process new messages
for message_id in message_ids:
if message_id not in self.processed_ids:
email_data = self.client.fetch_message(message_id)
if email_data:
# Call callbacks
for callback in self.callbacks:
try:
callback(email_data)
except Exception as e:
self.logger.error(f"Callback error: {e}")
self.processed_ids.add(message_id)
self.logger.info(f"Processed new email: {email_data.subject}")
self.last_check = datetime.now()
except Exception as e:
self.logger.error(f"Check failed: {e}")
# ==================== Email Processor ====================
class EmailProcessor:
"""
Process emails with rules and actions.
"""
def __init__(self, imap_client: IMAPClient):
self.client = imap_client
self.rules = []
self.logger = logging.getLogger(__name__)
def add_rule(self, condition: callable, action: callable):
"""Add processing rule."""
self.rules.append((condition, action))
def process_inbox(self, criteria: str = "UNSEEN", limit: Optional[int] = None):
"""Process emails in inbox."""
# Select inbox
self.client.select_folder()
# Search messages
message_ids = self.client.search_messages(criteria, limit)
processed = 0
for message_id in message_ids:
email_data = self.client.fetch_message(message_id)
if email_data:
self.process_email(email_data, message_id)
processed += 1
self.logger.info(f"Processed {processed} emails")
return processed
def process_email(self, email_data: EmailData, message_id: str):
"""Process single email with rules."""
for condition, action in self.rules:
try:
if condition(email_data):
action(email_data, message_id, self.client)
self.logger.debug(f"Applied rule to: {email_data.subject}")
except Exception as e:
self.logger.error(f"Rule processing error: {e}")
# Mark as processed if configured
if self.client.config.mark_as_read:
self.client.mark_as_read(message_id)
# ==================== Email Search Builder ====================
class IMAPSearchBuilder:
"""
Build IMAP search queries.
"""
def __init__(self):
self.criteria = []
def from_address(self, email: str) -> 'IMAPSearchBuilder':
"""Search by sender."""
self.criteria.append(f'FROM "{email}"')
return self
def to_address(self, email: str) -> 'IMAPSearchBuilder':
"""Search by recipient."""
self.criteria.append(f'TO "{email}"')
return self
def subject(self, text: str) -> 'IMAPSearchBuilder':
"""Search by subject."""
self.criteria.append(f'SUBJECT "{text}"')
return self
def body(self, text: str) -> 'IMAPSearchBuilder':
"""Search by body text."""
self.criteria.append(f'BODY "{text}"')
return self
def since(self, date: datetime) -> 'IMAPSearchBuilder':
"""Messages since date."""
date_str = date.strftime("%d-%b-%Y")
self.criteria.append(f'SINCE "{date_str}"')
return self
def before(self, date: datetime) -> 'IMAPSearchBuilder':
"""Messages before date."""
date_str = date.strftime("%d-%b-%Y")
self.criteria.append(f'BEFORE "{date_str}"')
return self
def unread(self) -> 'IMAPSearchBuilder':
"""Unread messages."""
self.criteria.append("UNSEEN")
return self
def read(self) -> 'IMAPSearchBuilder':
"""Read messages."""
self.criteria.append("SEEN")
return self
def flagged(self) -> 'IMAPSearchBuilder':
"""Flagged messages."""
self.criteria.append("FLAGGED")
return self
def with_attachments(self) -> 'IMAPSearchBuilder':
"""Messages with attachments (approximation)."""
self.criteria.append('HEADER Content-Type "multipart"')
return self
def build(self) -> str:
"""Build search query."""
if not self.criteria:
return "ALL"
# IMAP requires parentheses for multiple criteria
if len(self.criteria) == 1:
return self.criteria[0]
else:
return f"({' '.join(self.criteria)})"
# Example usage
if __name__ == "__main__":
print("๐ฅ IMAP Email Reading Examples\n")
# Example 1: IMAP configuration
print("1๏ธโฃ IMAP Server Configuration:")
gmail_config = IMAPConfig.from_provider(
IMAPProvider.GMAIL,
username="your_email@gmail.com",
password="your_app_password"
)
print(f" Provider: Gmail")
print(f" Host: {gmail_config.host}")
print(f" Port: {gmail_config.port}")
print(f" Folders:")
print(f" Inbox: {gmail_config.inbox_folder}")
print(f" Sent: {gmail_config.sent_folder}")
print(f" Trash: {gmail_config.trash_folder}")
# Example 2: Common IMAP providers
print("\n2๏ธโฃ Common IMAP Providers:")
providers = [
("Gmail", "imap.gmail.com", 993, "App Password required"),
("Outlook", "outlook.office365.com", 993, "Standard password"),
("Yahoo", "imap.mail.yahoo.com", 993, "App Password required"),
("iCloud", "imap.mail.me.com", 993, "App-specific password"),
("ProtonMail", "127.0.0.1", 1143, "Bridge required")
]
for provider, host, port, note in providers:
print(f" {provider}:")
print(f" Host: {host}")
print(f" Port: {port}")
print(f" Note: {note}")
# Example 3: Search criteria
print("\n3๏ธโฃ IMAP Search Examples:")
# Build search query
search = (IMAPSearchBuilder()
.from_address("important@example.com")
.since(datetime.now() - timedelta(days=7))
.unread()
.build())
print(f" Query: {search}")
print("\n Common searches:")
print(" UNSEEN - Unread messages")
print(" SINCE \"01-Jan-2024\" - Recent messages")
print(" FROM \"sender@example.com\" - From specific sender")
print(" SUBJECT \"Invoice\" - Subject contains text")
print(" FLAGGED - Important messages")
# Example 4: Email processing
print("\n4๏ธโฃ Email Processing Example:")
print(" Processing workflow:")
print(" 1. Connect to IMAP server")
print(" 2. Select folder (INBOX)")
print(" 3. Search messages")
print(" 4. Fetch message data")
print(" 5. Parse content and attachments")
print(" 6. Apply processing rules")
print(" 7. Update message flags/folders")
# Example 5: Email data structure
print("\n5๏ธโฃ Email Data Structure:")
sample_email = EmailData(
uid="123",
message_id="",
subject="Monthly Report",
from_addr="sender@example.com",
from_name="John Doe",
to_addrs=["recipient@example.com"],
cc_addrs=[],
bcc_addrs=[],
date=datetime.now(),
text_body="Please find the report attached.",
attachments=[{"filename": "report.pdf", "size": 524288}]
)
print(f" Subject: {sample_email.subject}")
print(f" From: {sample_email.from_name} <{sample_email.from_addr}>")
print(f" Attachments: {len(sample_email.attachments)}")
print(f" Plain text: {sample_email.get_plain_text()[:50]}...")
# Example 6: Processing rules
print("\n6๏ธโฃ Email Processing Rules:")
def invoice_condition(email: EmailData) -> bool:
return "invoice" in email.subject.lower()
def invoice_action(email: EmailData, msg_id: str, client: IMAPClient):
print(f" Processing invoice: {email.subject}")
# Save attachments
# Move to "Invoices" folder
# Mark as processed
print(" Example rules:")
print(" โข If subject contains 'invoice' โ Save attachments, move to folder")
print(" โข If from VIP sender โ Flag and forward")
print(" โข If spam keywords โ Move to trash")
print(" โข If attachment > 10MB โ Save locally and remove")
# Example 7: Monitoring inbox
print("\n7๏ธโฃ Inbox Monitoring:")
print(" Monitor configuration:")
print(" Check interval: 60 seconds")
print(" Process: Unread messages")
print(" Actions: Notify, process, mark as read")
# Example 8: Attachment handling
print("\n8๏ธโฃ Attachment Processing:")
print(" Attachment workflow:")
print(" 1. Detect attachments in email")
print(" 2. Extract metadata (name, size, type)")
print(" 3. Decode attachment data")
print(" 4. Save to disk with sanitized filename")
print(" 5. Process based on file type")
# Example 9: Common operations
print("\n9๏ธโฃ Common IMAP Operations:")
operations = [
"List folders - Get mailbox structure",
"Select folder - Choose working folder",
"Search messages - Find specific emails",
"Fetch message - Get full email data",
"Mark as read/unread - Update status",
"Flag/unflag - Mark important",
"Move to folder - Organize emails",
"Delete message - Move to trash",
"Save attachments - Extract files"
]
for operation in operations:
print(f" โข {operation}")
# Example 10: Best practices
print("\n๐ IMAP Best Practices:")
practices = [
"๐ Use OAuth2 or App Passwords",
"๐ Keep connections alive for efficiency",
"๐ Batch fetch operations",
"๐พ Cache message data locally",
"โก Use IDLE for real-time updates",
"๐ Optimize search criteria",
"๐ Organize with folders",
"๐งน Clean up old messages",
"๐ Monitor mailbox quota",
"๐ก๏ธ Handle connection failures gracefully"
]
for practice in practices:
print(f" {practice}")
print("\nโ
IMAP email reading demonstration complete!")
Key Takeaways and Best Practices ๐ฏ
- Use App Passwords: Modern providers require app-specific passwords for IMAP.
- Keep Connections Alive: Reuse IMAP connections for multiple operations.
- Handle Encoding Properly: Email content can have various encodings.
- Process Attachments Carefully: Sanitize filenames and check file types.
- Implement Search Efficiently: Use IMAP search criteria to filter server-side.
- Monitor Responsibly: Don't poll too frequently to avoid rate limits.
- Organize with Folders: Use IMAP folders to categorize emails.
- Cache When Possible: Store processed message IDs to avoid reprocessing.
IMAP Email Processing Best Practices ๐
Mastering IMAP email reading enables you to build intelligent email processing systems that automate inbox management, extract data from emails, handle customer communications, and trigger workflows based on email content. Whether you're building support ticket systems, order processors, or email analytics tools, these IMAP skills power your email automation! ๐จ
Pro Tip: Think of IMAP as having remote access to your email filing cabinet - you can browse, search, and organize without downloading everything. Always use SSL/TLS for secure connections and app passwords for authentication. Keep your IMAP connection alive when processing multiple emails to avoid reconnection overhead. Use server-side search with IMAP criteria instead of fetching all emails and filtering locally. Handle character encodings carefully - emails come in many languages and formats. Process attachments with caution - sanitize filenames and validate file types for security. Implement intelligent monitoring with reasonable check intervals to avoid being rate-limited. Use folders to organize processed emails rather than deleting them. Cache message IDs you've processed to avoid duplicate processing. Most importantly: IMAP keeps emails on the server, so be mindful of mailbox quotas and clean up regularly!