Skip to main content

๐Ÿ“ฅ Reading Emails with IMAP: Process Incoming Messages

IMAP (Internet Message Access Protocol) is your gateway to reading and managing emails programmatically - it's like having a digital assistant that can monitor your inbox, organize messages, extract data, and trigger actions based on incoming emails. From simple inbox monitoring to complex email processing pipelines, mastering IMAP allows you to build intelligent email automation systems that respond to messages in real-time! ๐Ÿ“ฌ

The IMAP Email Processing Architecture

Think of IMAP as a remote control for your email inbox - you can browse folders, search messages, read content, manage flags, and organize emails without downloading everything locally. Unlike POP3 which downloads and deletes, IMAP keeps emails on the server, allowing multiple clients to access the same mailbox. Understanding IMAP's folder structure, search capabilities, and message handling is essential for building robust email processing systems!

graph TB A[IMAP Email Processing] --> B[Connection] A --> C[Folder Operations] A --> D[Message Retrieval] A --> E[Search & Filter] B --> F[Server Config] B --> G[Authentication] B --> H[SSL/TLS] B --> I[Session Management] C --> J[List Folders] C --> K[Select Folder] C --> L[Create/Delete] C --> M[Folder Stats] D --> N[Fetch Headers] D --> O[Fetch Body] D --> P[Attachments] D --> Q[Partial Fetch] E --> R[Search Criteria] E --> S[Date Range] E --> T[Flags/Status] E --> U[Content Search] V[Processing] --> W[Parse Content] V --> X[Extract Data] V --> Y[Trigger Actions] V --> Z[Update Flags] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Intelligent Email Processor ๐Ÿค–

You're building an intelligent email processing system that monitors multiple inboxes, automatically categorizes messages, extracts attachments and data, responds to customer inquiries, processes orders from emails, generates reports from email data, and triggers workflows based on email content. Your system must handle thousands of emails daily, support multiple email providers, process attachments, and maintain inbox organization. Let's build a comprehensive IMAP email processing framework!

# First, install required packages:
# pip install imap-tools python-dateutil beautifulsoup4 python-magic chardet

import imaplib
import email
from email.header import decode_header
from email.utils import parsedate_to_datetime
import ssl
import os
import re
import logging
from typing import List, Dict, Optional, Any, Union, Tuple, Generator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import json
import base64
import hashlib
import chardet
from bs4 import BeautifulSoup
import time
from collections import defaultdict
import threading
from queue import Queue
import mimetypes

# ==================== IMAP Configuration ====================

class IMAPProvider(Enum):
    """Common IMAP providers."""
    GMAIL = "gmail"
    OUTLOOK = "outlook"
    YAHOO = "yahoo"
    ICLOUD = "icloud"
    CUSTOM = "custom"

@dataclass
class IMAPConfig:
    """IMAP server configuration."""
    host: str
    port: int = 993
    username: str
    password: str
    use_ssl: bool = True
    
    # Provider settings
    provider: IMAPProvider = IMAPProvider.CUSTOM
    
    # Connection settings
    timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 5.0
    
    # Processing settings
    batch_size: int = 50
    mark_as_read: bool = False
    delete_after_processing: bool = False
    
    # Folder settings
    inbox_folder: str = "INBOX"
    sent_folder: str = "Sent"
    trash_folder: str = "Trash"
    
    @classmethod
    def from_provider(cls, provider: IMAPProvider, username: str, password: str):
        """Create config for common providers."""
        configs = {
            IMAPProvider.GMAIL: {
                "host": "imap.gmail.com",
                "port": 993,
                "inbox_folder": "INBOX",
                "sent_folder": "[Gmail]/Sent Mail",
                "trash_folder": "[Gmail]/Trash"
            },
            IMAPProvider.OUTLOOK: {
                "host": "outlook.office365.com",
                "port": 993,
                "inbox_folder": "INBOX",
                "sent_folder": "Sent Items",
                "trash_folder": "Deleted Items"
            },
            IMAPProvider.YAHOO: {
                "host": "imap.mail.yahoo.com",
                "port": 993,
                "inbox_folder": "INBOX",
                "sent_folder": "Sent",
                "trash_folder": "Trash"
            },
            IMAPProvider.ICLOUD: {
                "host": "imap.mail.me.com",
                "port": 993,
                "inbox_folder": "INBOX",
                "sent_folder": "Sent Messages",
                "trash_folder": "Deleted Messages"
            }
        }
        
        if provider in configs:
            config = configs[provider]
            return cls(
                host=config["host"],
                port=config["port"],
                username=username,
                password=password,
                provider=provider,
                inbox_folder=config["inbox_folder"],
                sent_folder=config["sent_folder"],
                trash_folder=config["trash_folder"]
            )
        else:
            raise ValueError(f"Unknown provider: {provider}")

# ==================== Email Message Structure ====================

@dataclass
class EmailData:
    """Parsed email message data."""
    uid: str
    message_id: str
    subject: str
    from_addr: str
    from_name: Optional[str]
    to_addrs: List[str]
    cc_addrs: List[str]
    bcc_addrs: List[str]
    date: datetime
    
    # Content
    text_body: Optional[str] = None
    html_body: Optional[str] = None
    attachments: List[Dict[str, Any]] = field(default_factory=list)
    
    # Headers
    headers: Dict[str, str] = field(default_factory=dict)
    in_reply_to: Optional[str] = None
    references: Optional[str] = None
    
    # Flags
    is_read: bool = False
    is_flagged: bool = False
    is_answered: bool = False
    is_draft: bool = False
    
    # Metadata
    size: int = 0
    folder: str = "INBOX"
    labels: List[str] = field(default_factory=list)
    
    def get_plain_text(self) -> str:
        """Get plain text version of email."""
        if self.text_body:
            return self.text_body
        elif self.html_body:
            # Convert HTML to plain text
            soup = BeautifulSoup(self.html_body, 'html.parser')
            return soup.get_text()
        return ""

# ==================== IMAP Client ====================

class IMAPClient:
    """
    IMAP client for reading and managing emails.
    """
    
    def __init__(self, config: IMAPConfig):
        self.config = config
        self.connection = None
        self.current_folder = None
        self.logger = logging.getLogger(__name__)
        
        # Statistics
        self.stats = {
            "messages_read": 0,
            "messages_processed": 0,
            "attachments_downloaded": 0,
            "errors": 0
        }
    
    def connect(self):
        """Establish IMAP connection."""
        try:
            if self.config.use_ssl:
                context = ssl.create_default_context()
                self.connection = imaplib.IMAP4_SSL(
                    self.config.host,
                    self.config.port,
                    ssl_context=context,
                    timeout=self.config.timeout
                )
            else:
                self.connection = imaplib.IMAP4(
                    self.config.host,
                    self.config.port,
                    timeout=self.config.timeout
                )
            
            # Login
            self.connection.login(self.config.username, self.config.password)
            
            self.logger.info(f"Connected to IMAP server: {self.config.host}")
            
        except imaplib.IMAP4.error as e:
            self.logger.error(f"IMAP error: {e}")
            raise
        except Exception as e:
            self.logger.error(f"Connection failed: {e}")
            raise
    
    def disconnect(self):
        """Close IMAP connection."""
        if self.connection:
            try:
                self.connection.close()
                self.connection.logout()
            except:
                pass
            self.connection = None
            self.logger.info("Disconnected from IMAP server")
    
    def list_folders(self) -> List[str]:
        """List all folders in mailbox."""
        if not self.connection:
            self.connect()
        
        try:
            status, folders = self.connection.list()
            
            if status == 'OK':
                folder_list = []
                for folder in folders:
                    # Parse folder string
                    if folder:
                        parts = folder.decode().split(' "." ')
                        if len(parts) > 0:
                            folder_name = parts[-1].strip('"')
                            folder_list.append(folder_name)
                
                return folder_list
            
            return []
            
        except Exception as e:
            self.logger.error(f"Failed to list folders: {e}")
            return []
    
    def select_folder(self, folder: str = None) -> Dict[str, int]:
        """
        Select folder for operations.
        
        Returns:
            Dictionary with folder statistics
        """
        if not self.connection:
            self.connect()
        
        folder = folder or self.config.inbox_folder
        
        try:
            status, data = self.connection.select(folder)
            
            if status == 'OK':
                self.current_folder = folder
                
                # Get folder stats
                total_messages = int(data[0].decode()) if data[0] else 0
                
                # Get unread count
                status, unread = self.connection.search(None, 'UNSEEN')
                unread_count = len(unread[0].split()) if status == 'OK' else 0
                
                stats = {
                    "folder": folder,
                    "total": total_messages,
                    "unread": unread_count
                }
                
                self.logger.info(f"Selected folder: {folder} ({total_messages} messages, {unread_count} unread)")
                return stats
            
            raise Exception(f"Failed to select folder: {folder}")
            
        except Exception as e:
            self.logger.error(f"Failed to select folder: {e}")
            raise
    
    def search_messages(self, criteria: str = "ALL", 
                       limit: Optional[int] = None) -> List[str]:
        """
        Search messages using IMAP search criteria.
        
        Common criteria:
        - ALL: All messages
        - UNSEEN: Unread messages
        - SEEN: Read messages
        - FLAGGED: Flagged messages
        - SINCE "date": Messages since date
        - BEFORE "date": Messages before date
        - FROM "email": From specific sender
        - TO "email": To specific recipient
        - SUBJECT "text": Subject contains text
        - TEXT "text": Body contains text
        
        Returns:
            List of message IDs
        """
        if not self.current_folder:
            self.select_folder()
        
        try:
            status, data = self.connection.search(None, criteria)
            
            if status == 'OK':
                message_ids = data[0].split()
                
                if limit and len(message_ids) > limit:
                    message_ids = message_ids[-limit:]  # Get most recent
                
                self.logger.info(f"Found {len(message_ids)} messages matching: {criteria}")
                return [id.decode() for id in message_ids]
            
            return []
            
        except Exception as e:
            self.logger.error(f"Search failed: {e}")
            return []
    
    def fetch_message(self, message_id: str) -> Optional[EmailData]:
        """Fetch and parse single message."""
        try:
            # Fetch message data
            status, data = self.connection.fetch(message_id, '(RFC822 FLAGS)')
            
            if status != 'OK':
                return None
            
            # Parse message
            raw_email = data[0][1]
            msg = email.message_from_bytes(raw_email)
            
            # Parse flags
            flags = imaplib.ParseFlags(data[0][0])
            
            # Create EmailData object
            email_data = self._parse_message(msg, message_id, flags)
            
            self.stats["messages_read"] += 1
            
            return email_data
            
        except Exception as e:
            self.logger.error(f"Failed to fetch message {message_id}: {e}")
            self.stats["errors"] += 1
            return None
    
    def fetch_messages_batch(self, message_ids: List[str]) -> Generator[EmailData, None, None]:
        """Fetch multiple messages in batch."""
        for message_id in message_ids:
            email_data = self.fetch_message(message_id)
            if email_data:
                yield email_data
    
    def _parse_message(self, msg: email.message.Message, 
                      uid: str, flags: bytes) -> EmailData:
        """Parse email message into EmailData object."""
        # Parse headers
        subject = self._decode_header(msg.get("Subject", ""))
        from_header = self._decode_header(msg.get("From", ""))
        from_addr, from_name = self._parse_address(from_header)
        
        # Parse recipients
        to_addrs = self._parse_addresses(msg.get("To", ""))
        cc_addrs = self._parse_addresses(msg.get("Cc", ""))
        bcc_addrs = self._parse_addresses(msg.get("Bcc", ""))
        
        # Parse date
        date_str = msg.get("Date", "")
        try:
            date = parsedate_to_datetime(date_str)
        except:
            date = datetime.now()
        
        # Parse flags
        flags_str = str(flags)
        is_read = "\\Seen" in flags_str
        is_flagged = "\\Flagged" in flags_str
        is_answered = "\\Answered" in flags_str
        is_draft = "\\Draft" in flags_str
        
        # Create EmailData
        email_data = EmailData(
            uid=uid,
            message_id=msg.get("Message-ID", ""),
            subject=subject,
            from_addr=from_addr,
            from_name=from_name,
            to_addrs=to_addrs,
            cc_addrs=cc_addrs,
            bcc_addrs=bcc_addrs,
            date=date,
            in_reply_to=msg.get("In-Reply-To"),
            references=msg.get("References"),
            is_read=is_read,
            is_flagged=is_flagged,
            is_answered=is_answered,
            is_draft=is_draft,
            folder=self.current_folder or "INBOX"
        )
        
        # Extract all headers
        for header, value in msg.items():
            email_data.headers[header] = self._decode_header(value)
        
        # Parse body and attachments
        self._parse_body(msg, email_data)
        
        return email_data
    
    def _parse_body(self, msg: email.message.Message, email_data: EmailData):
        """Parse email body and attachments."""
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition", ""))
                
                # Skip multipart containers
                if part.is_multipart():
                    continue
                
                # Check if attachment
                if "attachment" in content_disposition or \
                   (content_disposition and "inline" in content_disposition and 
                    content_type.startswith("image/")):
                    # Process attachment
                    self._process_attachment(part, email_data)
                else:
                    # Process body
                    if content_type == "text/plain":
                        body = self._decode_content(part)
                        if body and not email_data.text_body:
                            email_data.text_body = body
                    elif content_type == "text/html":
                        body = self._decode_content(part)
                        if body and not email_data.html_body:
                            email_data.html_body = body
        else:
            # Single part message
            content_type = msg.get_content_type()
            
            if content_type == "text/plain":
                email_data.text_body = self._decode_content(msg)
            elif content_type == "text/html":
                email_data.html_body = self._decode_content(msg)
    
    def _process_attachment(self, part: email.message.Message, email_data: EmailData):
        """Process email attachment."""
        filename = part.get_filename()
        
        if filename:
            filename = self._decode_header(filename)
            
            # Get attachment data
            attachment_data = part.get_payload(decode=True)
            
            if attachment_data:
                attachment_info = {
                    "filename": filename,
                    "content_type": part.get_content_type(),
                    "size": len(attachment_data),
                    "data": attachment_data,
                    "content_id": part.get("Content-ID", "").strip("<>")
                }
                
                email_data.attachments.append(attachment_info)
                
                self.logger.debug(f"Found attachment: {filename} ({len(attachment_data)} bytes)")
    
    def _decode_header(self, header: str) -> str:
        """Decode email header."""
        if not header:
            return ""
        
        decoded_parts = []
        for part, encoding in decode_header(header):
            if isinstance(part, bytes):
                if encoding:
                    try:
                        decoded_parts.append(part.decode(encoding))
                    except:
                        decoded_parts.append(part.decode('utf-8', errors='ignore'))
                else:
                    decoded_parts.append(part.decode('utf-8', errors='ignore'))
            else:
                decoded_parts.append(part)
        
        return ' '.join(decoded_parts)
    
    def _decode_content(self, part: email.message.Message) -> str:
        """Decode email content."""
        payload = part.get_payload(decode=True)
        
        if not payload:
            return ""
        
        # Try to detect encoding
        charset = part.get_content_charset()
        
        if not charset:
            # Try to detect encoding
            detected = chardet.detect(payload)
            charset = detected.get('encoding', 'utf-8')
        
        try:
            return payload.decode(charset or 'utf-8')
        except:
            return payload.decode('utf-8', errors='ignore')
    
    def _parse_address(self, address_str: str) -> Tuple[str, Optional[str]]:
        """Parse email address into email and name."""
        import email.utils
        name, addr = email.utils.parseaddr(address_str)
        return addr, name if name else None
    
    def _parse_addresses(self, addresses_str: str) -> List[str]:
        """Parse multiple email addresses."""
        if not addresses_str:
            return []
        
        import email.utils
        addresses = []
        for name, addr in email.utils.getaddresses([addresses_str]):
            if addr:
                addresses.append(addr)
        
        return addresses
    
    def mark_as_read(self, message_id: str):
        """Mark message as read."""
        try:
            self.connection.store(message_id, '+FLAGS', '\\Seen')
            self.logger.debug(f"Marked message {message_id} as read")
        except Exception as e:
            self.logger.error(f"Failed to mark as read: {e}")
    
    def mark_as_unread(self, message_id: str):
        """Mark message as unread."""
        try:
            self.connection.store(message_id, '-FLAGS', '\\Seen')
            self.logger.debug(f"Marked message {message_id} as unread")
        except Exception as e:
            self.logger.error(f"Failed to mark as unread: {e}")
    
    def flag_message(self, message_id: str):
        """Flag message."""
        try:
            self.connection.store(message_id, '+FLAGS', '\\Flagged')
            self.logger.debug(f"Flagged message {message_id}")
        except Exception as e:
            self.logger.error(f"Failed to flag message: {e}")
    
    def move_message(self, message_id: str, target_folder: str):
        """Move message to another folder."""
        try:
            # Copy message to target folder
            result = self.connection.copy(message_id, target_folder)
            
            if result[0] == 'OK':
                # Mark for deletion in current folder
                self.connection.store(message_id, '+FLAGS', '\\Deleted')
                # Expunge to permanently delete from current folder
                self.connection.expunge()
                
                self.logger.info(f"Moved message {message_id} to {target_folder}")
                return True
                
        except Exception as e:
            self.logger.error(f"Failed to move message: {e}")
        
        return False
    
    def delete_message(self, message_id: str):
        """Delete message."""
        try:
            # Move to trash
            if self.config.trash_folder:
                return self.move_message(message_id, self.config.trash_folder)
            else:
                # Mark for deletion
                self.connection.store(message_id, '+FLAGS', '\\Deleted')
                self.connection.expunge()
                return True
                
        except Exception as e:
            self.logger.error(f"Failed to delete message: {e}")
            return False
    
    def save_attachments(self, email_data: EmailData, 
                        output_dir: str = "./attachments") -> List[str]:
        """Save email attachments to disk."""
        saved_files = []
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        for attachment in email_data.attachments:
            filename = attachment["filename"]
            
            # Sanitize filename
            filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
            
            # Handle duplicate filenames
            file_path = output_path / filename
            counter = 1
            while file_path.exists():
                name, ext = os.path.splitext(filename)
                file_path = output_path / f"{name}_{counter}{ext}"
                counter += 1
            
            # Save file
            with open(file_path, 'wb') as f:
                f.write(attachment["data"])
            
            saved_files.append(str(file_path))
            self.stats["attachments_downloaded"] += 1
            
            self.logger.info(f"Saved attachment: {file_path}")
        
        return saved_files

# ==================== Email Monitor ====================

class EmailMonitor:
    """
    Monitor inbox for new emails.
    """
    
    def __init__(self, imap_client: IMAPClient):
        self.client = imap_client
        self.running = False
        self.check_interval = 60  # seconds
        self.last_check = None
        self.processed_ids = set()
        self.callbacks = []
        self.logger = logging.getLogger(__name__)
    
    def add_callback(self, callback: callable):
        """Add callback for new emails."""
        self.callbacks.append(callback)
    
    def start(self, check_interval: int = 60):
        """Start monitoring."""
        self.check_interval = check_interval
        self.running = True
        
        monitor_thread = threading.Thread(target=self._monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()
        
        self.logger.info(f"Started email monitoring (interval: {check_interval}s)")
    
    def stop(self):
        """Stop monitoring."""
        self.running = False
        self.logger.info("Stopped email monitoring")
    
    def _monitor_loop(self):
        """Main monitoring loop."""
        while self.running:
            try:
                # Check for new emails
                self._check_new_emails()
                
                # Wait for next check
                time.sleep(self.check_interval)
                
            except Exception as e:
                self.logger.error(f"Monitor error: {e}")
                time.sleep(self.check_interval)
    
    def _check_new_emails(self):
        """Check for new emails."""
        try:
            # Select inbox
            self.client.select_folder()
            
            # Search for recent messages
            if self.last_check:
                # Get messages since last check
                since_date = self.last_check.strftime("%d-%b-%Y")
                criteria = f'SINCE "{since_date}"'
            else:
                # Get unread messages
                criteria = "UNSEEN"
            
            message_ids = self.client.search_messages(criteria)
            
            # Process new messages
            for message_id in message_ids:
                if message_id not in self.processed_ids:
                    email_data = self.client.fetch_message(message_id)
                    
                    if email_data:
                        # Call callbacks
                        for callback in self.callbacks:
                            try:
                                callback(email_data)
                            except Exception as e:
                                self.logger.error(f"Callback error: {e}")
                        
                        self.processed_ids.add(message_id)
                        self.logger.info(f"Processed new email: {email_data.subject}")
            
            self.last_check = datetime.now()
            
        except Exception as e:
            self.logger.error(f"Check failed: {e}")

# ==================== Email Processor ====================

class EmailProcessor:
    """
    Process emails with rules and actions.
    """
    
    def __init__(self, imap_client: IMAPClient):
        self.client = imap_client
        self.rules = []
        self.logger = logging.getLogger(__name__)
    
    def add_rule(self, condition: callable, action: callable):
        """Add processing rule."""
        self.rules.append((condition, action))
    
    def process_inbox(self, criteria: str = "UNSEEN", limit: Optional[int] = None):
        """Process emails in inbox."""
        # Select inbox
        self.client.select_folder()
        
        # Search messages
        message_ids = self.client.search_messages(criteria, limit)
        
        processed = 0
        for message_id in message_ids:
            email_data = self.client.fetch_message(message_id)
            
            if email_data:
                self.process_email(email_data, message_id)
                processed += 1
        
        self.logger.info(f"Processed {processed} emails")
        return processed
    
    def process_email(self, email_data: EmailData, message_id: str):
        """Process single email with rules."""
        for condition, action in self.rules:
            try:
                if condition(email_data):
                    action(email_data, message_id, self.client)
                    self.logger.debug(f"Applied rule to: {email_data.subject}")
            except Exception as e:
                self.logger.error(f"Rule processing error: {e}")
        
        # Mark as processed if configured
        if self.client.config.mark_as_read:
            self.client.mark_as_read(message_id)

# ==================== Email Search Builder ====================

class IMAPSearchBuilder:
    """
    Build IMAP search queries.
    """
    
    def __init__(self):
        self.criteria = []
    
    def from_address(self, email: str) -> 'IMAPSearchBuilder':
        """Search by sender."""
        self.criteria.append(f'FROM "{email}"')
        return self
    
    def to_address(self, email: str) -> 'IMAPSearchBuilder':
        """Search by recipient."""
        self.criteria.append(f'TO "{email}"')
        return self
    
    def subject(self, text: str) -> 'IMAPSearchBuilder':
        """Search by subject."""
        self.criteria.append(f'SUBJECT "{text}"')
        return self
    
    def body(self, text: str) -> 'IMAPSearchBuilder':
        """Search by body text."""
        self.criteria.append(f'BODY "{text}"')
        return self
    
    def since(self, date: datetime) -> 'IMAPSearchBuilder':
        """Messages since date."""
        date_str = date.strftime("%d-%b-%Y")
        self.criteria.append(f'SINCE "{date_str}"')
        return self
    
    def before(self, date: datetime) -> 'IMAPSearchBuilder':
        """Messages before date."""
        date_str = date.strftime("%d-%b-%Y")
        self.criteria.append(f'BEFORE "{date_str}"')
        return self
    
    def unread(self) -> 'IMAPSearchBuilder':
        """Unread messages."""
        self.criteria.append("UNSEEN")
        return self
    
    def read(self) -> 'IMAPSearchBuilder':
        """Read messages."""
        self.criteria.append("SEEN")
        return self
    
    def flagged(self) -> 'IMAPSearchBuilder':
        """Flagged messages."""
        self.criteria.append("FLAGGED")
        return self
    
    def with_attachments(self) -> 'IMAPSearchBuilder':
        """Messages with attachments (approximation)."""
        self.criteria.append('HEADER Content-Type "multipart"')
        return self
    
    def build(self) -> str:
        """Build search query."""
        if not self.criteria:
            return "ALL"
        
        # IMAP requires parentheses for multiple criteria
        if len(self.criteria) == 1:
            return self.criteria[0]
        else:
            return f"({' '.join(self.criteria)})"

# Example usage
if __name__ == "__main__":
    print("๐Ÿ“ฅ IMAP Email Reading Examples\n")
    
    # Example 1: IMAP configuration
    print("1๏ธโƒฃ IMAP Server Configuration:")
    
    gmail_config = IMAPConfig.from_provider(
        IMAPProvider.GMAIL,
        username="your_email@gmail.com",
        password="your_app_password"
    )
    
    print(f"   Provider: Gmail")
    print(f"   Host: {gmail_config.host}")
    print(f"   Port: {gmail_config.port}")
    print(f"   Folders:")
    print(f"     Inbox: {gmail_config.inbox_folder}")
    print(f"     Sent: {gmail_config.sent_folder}")
    print(f"     Trash: {gmail_config.trash_folder}")
    
    # Example 2: Common IMAP providers
    print("\n2๏ธโƒฃ Common IMAP Providers:")
    
    providers = [
        ("Gmail", "imap.gmail.com", 993, "App Password required"),
        ("Outlook", "outlook.office365.com", 993, "Standard password"),
        ("Yahoo", "imap.mail.yahoo.com", 993, "App Password required"),
        ("iCloud", "imap.mail.me.com", 993, "App-specific password"),
        ("ProtonMail", "127.0.0.1", 1143, "Bridge required")
    ]
    
    for provider, host, port, note in providers:
        print(f"   {provider}:")
        print(f"     Host: {host}")
        print(f"     Port: {port}")
        print(f"     Note: {note}")
    
    # Example 3: Search criteria
    print("\n3๏ธโƒฃ IMAP Search Examples:")
    
    # Build search query
    search = (IMAPSearchBuilder()
             .from_address("important@example.com")
             .since(datetime.now() - timedelta(days=7))
             .unread()
             .build())
    
    print(f"   Query: {search}")
    print("\n   Common searches:")
    print("     UNSEEN - Unread messages")
    print("     SINCE \"01-Jan-2024\" - Recent messages")
    print("     FROM \"sender@example.com\" - From specific sender")
    print("     SUBJECT \"Invoice\" - Subject contains text")
    print("     FLAGGED - Important messages")
    
    # Example 4: Email processing
    print("\n4๏ธโƒฃ Email Processing Example:")
    
    print("   Processing workflow:")
    print("     1. Connect to IMAP server")
    print("     2. Select folder (INBOX)")
    print("     3. Search messages")
    print("     4. Fetch message data")
    print("     5. Parse content and attachments")
    print("     6. Apply processing rules")
    print("     7. Update message flags/folders")
    
    # Example 5: Email data structure
    print("\n5๏ธโƒฃ Email Data Structure:")
    
    sample_email = EmailData(
        uid="123",
        message_id="",
        subject="Monthly Report",
        from_addr="sender@example.com",
        from_name="John Doe",
        to_addrs=["recipient@example.com"],
        cc_addrs=[],
        bcc_addrs=[],
        date=datetime.now(),
        text_body="Please find the report attached.",
        attachments=[{"filename": "report.pdf", "size": 524288}]
    )
    
    print(f"   Subject: {sample_email.subject}")
    print(f"   From: {sample_email.from_name} <{sample_email.from_addr}>")
    print(f"   Attachments: {len(sample_email.attachments)}")
    print(f"   Plain text: {sample_email.get_plain_text()[:50]}...")
    
    # Example 6: Processing rules
    print("\n6๏ธโƒฃ Email Processing Rules:")
    
    def invoice_condition(email: EmailData) -> bool:
        return "invoice" in email.subject.lower()
    
    def invoice_action(email: EmailData, msg_id: str, client: IMAPClient):
        print(f"   Processing invoice: {email.subject}")
        # Save attachments
        # Move to "Invoices" folder
        # Mark as processed
    
    print("   Example rules:")
    print("     โ€ข If subject contains 'invoice' โ†’ Save attachments, move to folder")
    print("     โ€ข If from VIP sender โ†’ Flag and forward")
    print("     โ€ข If spam keywords โ†’ Move to trash")
    print("     โ€ข If attachment > 10MB โ†’ Save locally and remove")
    
    # Example 7: Monitoring inbox
    print("\n7๏ธโƒฃ Inbox Monitoring:")
    
    print("   Monitor configuration:")
    print("     Check interval: 60 seconds")
    print("     Process: Unread messages")
    print("     Actions: Notify, process, mark as read")
    
    # Example 8: Attachment handling
    print("\n8๏ธโƒฃ Attachment Processing:")
    
    print("   Attachment workflow:")
    print("     1. Detect attachments in email")
    print("     2. Extract metadata (name, size, type)")
    print("     3. Decode attachment data")
    print("     4. Save to disk with sanitized filename")
    print("     5. Process based on file type")
    
    # Example 9: Common operations
    print("\n9๏ธโƒฃ Common IMAP Operations:")
    
    operations = [
        "List folders - Get mailbox structure",
        "Select folder - Choose working folder",
        "Search messages - Find specific emails",
        "Fetch message - Get full email data",
        "Mark as read/unread - Update status",
        "Flag/unflag - Mark important",
        "Move to folder - Organize emails",
        "Delete message - Move to trash",
        "Save attachments - Extract files"
    ]
    
    for operation in operations:
        print(f"   โ€ข {operation}")
    
    # Example 10: Best practices
    print("\n๐Ÿ”Ÿ IMAP Best Practices:")
    
    practices = [
        "๐Ÿ” Use OAuth2 or App Passwords",
        "๐Ÿ”„ Keep connections alive for efficiency",
        "๐Ÿ“Š Batch fetch operations",
        "๐Ÿ’พ Cache message data locally",
        "โšก Use IDLE for real-time updates",
        "๐Ÿ” Optimize search criteria",
        "๐Ÿ“ Organize with folders",
        "๐Ÿงน Clean up old messages",
        "๐Ÿ“ˆ Monitor mailbox quota",
        "๐Ÿ›ก๏ธ Handle connection failures gracefully"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    print("\nโœ… IMAP email reading demonstration complete!")

Key Takeaways and Best Practices ๐ŸŽฏ

IMAP Email Processing Best Practices ๐Ÿ“‹

Pro Tip: Think of IMAP as having remote access to your email filing cabinet - you can browse, search, and organize without downloading everything. Always use SSL/TLS for secure connections and app passwords for authentication. Keep your IMAP connection alive when processing multiple emails to avoid reconnection overhead. Use server-side search with IMAP criteria instead of fetching all emails and filtering locally. Handle character encodings carefully - emails come in many languages and formats. Process attachments with caution - sanitize filenames and validate file types for security. Implement intelligent monitoring with reasonable check intervals to avoid being rate-limited. Use folders to organize processed emails rather than deleting them. Cache message IDs you've processed to avoid duplicate processing. Most importantly: IMAP keeps emails on the server, so be mindful of mailbox quotas and clean up regularly!

Mastering IMAP email reading enables you to build intelligent email processing systems that automate inbox management, extract data from emails, handle customer communications, and trigger workflows based on email content. Whether you're building support ticket systems, order processors, or email analytics tools, these IMAP skills power your email automation! ๐Ÿ“จ