š§ Sending Emails with SMTP: Master Email Automation
SMTP (Simple Mail Transfer Protocol) is the backbone of email communication - it's how your Python scripts can send emails to anyone, anywhere in the world. Like having a digital postal service at your fingertips, mastering SMTP allows you to automate notifications, reports, alerts, and communications at scale. Whether you're sending simple text messages or complex HTML newsletters, SMTP is your gateway to automated email communication! š®
The SMTP Email Architecture
Think of SMTP as the postal system of the internet - you compose your letter (email), address it (recipients), add stamps (authentication), and drop it in the mailbox (SMTP server). The postal service (email servers) then handles delivery to the recipient's mailbox. Understanding this flow is crucial for building reliable email automation systems!
Real-World Scenario: The Enterprise Communication System šØ
You're building an enterprise email automation system that handles everything from transactional emails (password resets, order confirmations) to marketing campaigns (newsletters, promotions) and internal communications (reports, alerts). Your system must handle multiple email providers, support templates, manage bounces, respect rate limits, track delivery, and scale to millions of emails. Let's build a comprehensive SMTP email framework!
# First, install required packages:
# pip install python-dotenv jinja2 premailer beautifulsoup4 python-dateutil
import smtplib
import ssl
import os
import time
import logging
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.application import MIMEApplication
from email import encoders
from email.utils import formataddr, make_msgid, formatdate
from email.header import Header
import mimetypes
from pathlib import Path
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import base64
from jinja2 import Template, Environment, FileSystemLoader
import queue
import threading
from functools import wraps
import hashlib
# ==================== Email Configuration ====================
class EmailProvider(Enum):
"""Supported email providers."""
GMAIL = "gmail"
OUTLOOK = "outlook"
YAHOO = "yahoo"
CUSTOM = "custom"
SENDGRID = "sendgrid"
AWS_SES = "aws_ses"
MAILGUN = "mailgun"
@dataclass
class SMTPConfig:
"""SMTP server configuration."""
host: str
port: int
username: str
password: str
use_tls: bool = True
use_ssl: bool = False
timeout: int = 30
# Provider-specific settings
provider: EmailProvider = EmailProvider.CUSTOM
# Advanced settings
local_hostname: Optional[str] = None
source_address: Optional[Tuple[str, int]] = None
# Rate limiting
max_sends_per_minute: int = 60
max_sends_per_hour: int = 1000
max_sends_per_day: int = 10000
# Retry configuration
max_retries: int = 3
retry_delay: float = 5.0
@classmethod
def from_provider(cls, provider: EmailProvider, username: str, password: str):
"""Create config for common email providers."""
configs = {
EmailProvider.GMAIL: {
"host": "smtp.gmail.com",
"port": 587,
"use_tls": True
},
EmailProvider.OUTLOOK: {
"host": "smtp-mail.outlook.com",
"port": 587,
"use_tls": True
},
EmailProvider.YAHOO: {
"host": "smtp.mail.yahoo.com",
"port": 587,
"use_tls": True
}
}
if provider in configs:
config = configs[provider]
return cls(
host=config["host"],
port=config["port"],
username=username,
password=password,
use_tls=config["use_tls"],
provider=provider
)
else:
raise ValueError(f"Unknown provider: {provider}")
# ==================== Email Message ====================
@dataclass
class EmailMessage:
"""Email message structure."""
to: Union[str, List[str]]
subject: str
body: str
from_email: Optional[str] = None
from_name: Optional[str] = None
# Additional recipients
cc: Optional[Union[str, List[str]]] = None
bcc: Optional[Union[str, List[str]]] = None
reply_to: Optional[str] = None
# Content
html_body: Optional[str] = None
attachments: List[str] = field(default_factory=list)
embedded_images: Dict[str, str] = field(default_factory=dict)
# Headers
headers: Dict[str, str] = field(default_factory=dict)
priority: int = 3 # 1=High, 3=Normal, 5=Low
# Tracking
message_id: Optional[str] = None
references: Optional[str] = None
in_reply_to: Optional[str] = None
# Metadata
tags: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Initialize and validate email message."""
# Convert single recipients to lists
if isinstance(self.to, str):
self.to = [self.to]
if isinstance(self.cc, str):
self.cc = [self.cc]
if isinstance(self.bcc, str):
self.bcc = [self.bcc]
# Generate message ID if not provided
if not self.message_id:
self.message_id = make_msgid()
# Validate email addresses
self._validate_emails()
def _validate_emails(self):
"""Validate email addresses."""
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
all_emails = self.to or []
if self.cc:
all_emails.extend(self.cc)
if self.bcc:
all_emails.extend(self.bcc)
for email in all_emails:
if not email_pattern.match(email):
raise ValueError(f"Invalid email address: {email}")
# ==================== SMTP Client ====================
class SMTPClient:
"""
SMTP client for sending emails.
"""
def __init__(self, config: SMTPConfig):
self.config = config
self.connection = None
self.logger = logging.getLogger(__name__)
# Rate limiting
self.send_times = []
self.daily_count = 0
self.daily_reset = datetime.now().date()
# Statistics
self.stats = {
"sent": 0,
"failed": 0,
"retried": 0
}
def connect(self):
"""Establish SMTP connection."""
try:
if self.config.use_ssl:
# SSL connection
context = ssl.create_default_context()
self.connection = smtplib.SMTP_SSL(
self.config.host,
self.config.port,
context=context,
timeout=self.config.timeout
)
else:
# Regular or TLS connection
self.connection = smtplib.SMTP(
self.config.host,
self.config.port,
timeout=self.config.timeout
)
if self.config.use_tls:
context = ssl.create_default_context()
self.connection.starttls(context=context)
# Authenticate
self.connection.login(self.config.username, self.config.password)
self.logger.info(f"Connected to SMTP server: {self.config.host}")
except smtplib.SMTPAuthenticationError as e:
self.logger.error(f"Authentication failed: {e}")
raise
except smtplib.SMTPException as e:
self.logger.error(f"SMTP error: {e}")
raise
except Exception as e:
self.logger.error(f"Connection failed: {e}")
raise
def disconnect(self):
"""Close SMTP connection."""
if self.connection:
try:
self.connection.quit()
except:
pass
self.connection = None
self.logger.info("Disconnected from SMTP server")
def send(self, message: EmailMessage, retry: bool = True) -> bool:
"""
Send email message.
Args:
message: Email message to send
retry: Whether to retry on failure
Returns:
True if sent successfully, False otherwise
"""
# Check rate limits
if not self._check_rate_limit():
self.logger.warning("Rate limit exceeded")
return False
# Ensure connection
if not self.connection:
self.connect()
# Build message
msg = self._build_message(message)
# Get all recipients
recipients = list(message.to)
if message.cc:
recipients.extend(message.cc)
if message.bcc:
recipients.extend(message.bcc)
# Send with retry
max_retries = self.config.max_retries if retry else 1
for attempt in range(max_retries):
try:
# Send message
self.connection.send_message(msg)
# Update statistics
self.stats["sent"] += 1
self._update_rate_limit()
self.logger.info(f"Email sent to {', '.join(recipients)}")
return True
except smtplib.SMTPServerDisconnected:
# Reconnect and retry
self.logger.warning("Server disconnected, reconnecting...")
self.disconnect()
self.connect()
if attempt < max_retries - 1:
self.stats["retried"] += 1
time.sleep(self.config.retry_delay)
continue
except smtplib.SMTPException as e:
self.logger.error(f"Failed to send email: {e}")
if attempt < max_retries - 1:
self.stats["retried"] += 1
time.sleep(self.config.retry_delay)
continue
self.stats["failed"] += 1
return False
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
self.stats["failed"] += 1
return False
return False
def send_bulk(self, messages: List[EmailMessage],
batch_size: int = 50) -> Dict[str, int]:
"""
Send multiple emails in batches.
Args:
messages: List of email messages
batch_size: Number of emails per batch
Returns:
Statistics dictionary
"""
results = {"sent": 0, "failed": 0}
# Process in batches
for i in range(0, len(messages), batch_size):
batch = messages[i:i + batch_size]
for message in batch:
if self.send(message):
results["sent"] += 1
else:
results["failed"] += 1
# Small delay between sends
time.sleep(0.1)
return results
def _build_message(self, message: EmailMessage) -> MIMEMultipart:
"""Build MIME message from EmailMessage."""
# Create multipart message
msg = MIMEMultipart('mixed')
# Set headers
from_addr = message.from_email or self.config.username
from_name = message.from_name or from_addr
msg['From'] = formataddr((from_name, from_addr))
msg['To'] = ', '.join(message.to)
msg['Subject'] = Header(message.subject, 'utf-8')
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = message.message_id
# Priority
priority_map = {1: "1 (Highest)", 3: "3 (Normal)", 5: "5 (Lowest)"}
msg['X-Priority'] = priority_map.get(message.priority, "3 (Normal)")
# Optional headers
if message.cc:
msg['Cc'] = ', '.join(message.cc)
if message.reply_to:
msg['Reply-To'] = message.reply_to
if message.references:
msg['References'] = message.references
if message.in_reply_to:
msg['In-Reply-To'] = message.in_reply_to
# Custom headers
for key, value in message.headers.items():
msg[key] = value
# Create message body
if message.html_body:
# Multipart alternative for text and HTML
msg_alternative = MIMEMultipart('alternative')
# Add plain text
msg_alternative.attach(MIMEText(message.body, 'plain', 'utf-8'))
# Add HTML
msg_alternative.attach(MIMEText(message.html_body, 'html', 'utf-8'))
msg.attach(msg_alternative)
else:
# Plain text only
msg.attach(MIMEText(message.body, 'plain', 'utf-8'))
# Add attachments
for attachment_path in message.attachments:
self._attach_file(msg, attachment_path)
# Add embedded images
for cid, image_path in message.embedded_images.items():
self._embed_image(msg, cid, image_path)
return msg
def _attach_file(self, msg: MIMEMultipart, file_path: str):
"""Attach file to message."""
path = Path(file_path)
if not path.exists():
self.logger.warning(f"Attachment not found: {file_path}")
return
# Guess MIME type
mime_type, _ = mimetypes.guess_type(str(path))
if mime_type:
main_type, sub_type = mime_type.split('/', 1)
with open(path, 'rb') as f:
if main_type == 'text':
attachment = MIMEText(f.read().decode('utf-8'), _subtype=sub_type)
elif main_type == 'image':
attachment = MIMEImage(f.read(), _subtype=sub_type)
elif main_type == 'application':
attachment = MIMEApplication(f.read(), _subtype=sub_type)
else:
attachment = MIMEBase(main_type, sub_type)
attachment.set_payload(f.read())
encoders.encode_base64(attachment)
else:
# Generic binary attachment
with open(path, 'rb') as f:
attachment = MIMEApplication(f.read())
# Set filename
attachment.add_header(
'Content-Disposition',
'attachment',
filename=path.name
)
msg.attach(attachment)
def _embed_image(self, msg: MIMEMultipart, cid: str, image_path: str):
"""Embed image in HTML email."""
path = Path(image_path)
if not path.exists():
self.logger.warning(f"Embedded image not found: {image_path}")
return
with open(path, 'rb') as f:
img = MIMEImage(f.read())
img.add_header('Content-ID', f'<{cid}>')
img.add_header('Content-Disposition', 'inline')
msg.attach(img)
def _check_rate_limit(self) -> bool:
"""Check if sending is within rate limits."""
now = datetime.now()
# Reset daily counter
if now.date() > self.daily_reset:
self.daily_count = 0
self.daily_reset = now.date()
# Check daily limit
if self.daily_count >= self.config.max_sends_per_day:
return False
# Check minute limit
minute_ago = now - timedelta(minutes=1)
self.send_times = [t for t in self.send_times if t > minute_ago]
if len(self.send_times) >= self.config.max_sends_per_minute:
return False
# Check hourly limit
hour_ago = now - timedelta(hours=1)
hourly_sends = len([t for t in self.send_times if t > hour_ago])
if hourly_sends >= self.config.max_sends_per_hour:
return False
return True
def _update_rate_limit(self):
"""Update rate limit counters."""
now = datetime.now()
self.send_times.append(now)
self.daily_count += 1
def get_stats(self) -> Dict[str, int]:
"""Get sending statistics."""
return dict(self.stats)
# ==================== Email Templates ====================
class EmailTemplate:
"""
Email template system using Jinja2.
"""
def __init__(self, template_dir: Optional[str] = None):
self.template_dir = template_dir
if template_dir:
self.env = Environment(loader=FileSystemLoader(template_dir))
else:
self.env = Environment()
self.logger = logging.getLogger(__name__)
def render(self, template_name: str, **context) -> str:
"""Render template with context."""
try:
template = self.env.get_template(template_name)
return template.render(**context)
except Exception as e:
self.logger.error(f"Template rendering failed: {e}")
raise
def render_string(self, template_string: str, **context) -> str:
"""Render template string with context."""
try:
template = self.env.from_string(template_string)
return template.render(**context)
except Exception as e:
self.logger.error(f"Template rendering failed: {e}")
raise
# ==================== Email Queue ====================
class EmailQueue:
"""
Queue system for email sending.
"""
def __init__(self, smtp_client: SMTPClient, max_workers: int = 3):
self.smtp_client = smtp_client
self.queue = queue.Queue()
self.max_workers = max_workers
self.workers = []
self.running = False
self.logger = logging.getLogger(__name__)
def start(self):
"""Start worker threads."""
self.running = True
for i in range(self.max_workers):
worker = threading.Thread(target=self._worker, name=f"EmailWorker-{i}")
worker.daemon = True
worker.start()
self.workers.append(worker)
self.logger.info(f"Started {self.max_workers} email workers")
def stop(self):
"""Stop worker threads."""
self.running = False
# Wait for workers to finish
for worker in self.workers:
worker.join(timeout=5)
self.workers.clear()
self.logger.info("Stopped email workers")
def add(self, message: EmailMessage):
"""Add email to queue."""
self.queue.put(message)
self.logger.debug(f"Queued email to {message.to}")
def _worker(self):
"""Worker thread for processing emails."""
while self.running:
try:
# Get message from queue
message = self.queue.get(timeout=1)
# Send email
success = self.smtp_client.send(message)
if success:
self.logger.info(f"Sent queued email to {message.to}")
else:
self.logger.error(f"Failed to send queued email to {message.to}")
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"Worker error: {e}")
# ==================== Email Builder ====================
class EmailBuilder:
"""
Fluent interface for building emails.
"""
def __init__(self):
self.message = EmailMessage(
to=[],
subject="",
body=""
)
def to(self, *recipients: str) -> 'EmailBuilder':
"""Set recipients."""
self.message.to = list(recipients)
return self
def cc(self, *recipients: str) -> 'EmailBuilder':
"""Set CC recipients."""
self.message.cc = list(recipients)
return self
def bcc(self, *recipients: str) -> 'EmailBuilder':
"""Set BCC recipients."""
self.message.bcc = list(recipients)
return self
def subject(self, subject: str) -> 'EmailBuilder':
"""Set subject."""
self.message.subject = subject
return self
def body(self, body: str) -> 'EmailBuilder':
"""Set plain text body."""
self.message.body = body
return self
def html(self, html: str) -> 'EmailBuilder':
"""Set HTML body."""
self.message.html_body = html
return self
def attach(self, file_path: str) -> 'EmailBuilder':
"""Add attachment."""
self.message.attachments.append(file_path)
return self
def embed_image(self, cid: str, image_path: str) -> 'EmailBuilder':
"""Embed image."""
self.message.embedded_images[cid] = image_path
return self
def header(self, name: str, value: str) -> 'EmailBuilder':
"""Add custom header."""
self.message.headers[name] = value
return self
def high_priority(self) -> 'EmailBuilder':
"""Set high priority."""
self.message.priority = 1
return self
def build(self) -> EmailMessage:
"""Build and return email message."""
return self.message
# ==================== Common Email Templates ====================
class CommonTemplates:
"""Common email templates."""
@staticmethod
def welcome_email(user_name: str, app_name: str) -> str:
"""Welcome email template."""
return f"""
Dear {user_name},
Welcome to {app_name}! We're thrilled to have you on board.
Here are some things you can do to get started:
⢠Complete your profile
⢠Explore our features
⢠Connect with other users
If you have any questions, feel free to reach out to our support team.
Best regards,
The {app_name} Team
"""
@staticmethod
def password_reset(user_name: str, reset_link: str) -> str:
"""Password reset email template."""
return f"""
Hi {user_name},
You recently requested to reset your password. Click the link below to create a new password:
{reset_link}
This link will expire in 1 hour for security reasons.
If you didn't request this, please ignore this email or contact support if you have concerns.
Thanks,
The Security Team
"""
@staticmethod
def order_confirmation(order_id: str, items: List[Dict], total: float) -> str:
"""Order confirmation email template."""
items_text = "\n".join([f" ⢠{item['name']} x {item['quantity']} - ${item['price']}"
for item in items])
return f"""
Order Confirmation
Thank you for your order!
Order ID: {order_id}
Items:
{items_text}
Total: ${total:.2f}
We'll send you a shipping notification once your order is on its way.
Thank you for your business!
"""
# Example usage
if __name__ == "__main__":
print("š§ SMTP Email Sending Examples\n")
# Example 1: SMTP configuration
print("1ļøā£ SMTP Server Configuration:")
# Gmail configuration
gmail_config = SMTPConfig.from_provider(
EmailProvider.GMAIL,
username="your_email@gmail.com",
password="your_app_password" # Use App Password for Gmail
)
print(f" Provider: Gmail")
print(f" Host: {gmail_config.host}")
print(f" Port: {gmail_config.port}")
print(f" TLS: {gmail_config.use_tls}")
print("\n Note: Gmail requires App Password with 2FA enabled")
# Example 2: Simple email
print("\n2ļøā£ Simple Email Example:")
simple_email = EmailMessage(
to="recipient@example.com",
subject="Test Email",
body="This is a test email sent from Python!"
)
print(f" To: {simple_email.to[0]}")
print(f" Subject: {simple_email.subject}")
print(f" Body: {simple_email.body[:50]}...")
# Example 3: Email builder
print("\n3ļøā£ Email Builder Pattern:")
email = (EmailBuilder()
.to("user@example.com")
.cc("manager@example.com")
.subject("Monthly Report")
.body("Please find the monthly report attached.")
.attach("report.pdf")
.high_priority()
.build())
print(" Built email with:")
print(f" To: {email.to[0]}")
print(f" CC: {email.cc[0] if email.cc else 'None'}")
print(f" Attachments: {len(email.attachments)}")
print(f" Priority: {'High' if email.priority == 1 else 'Normal'}")
# Example 4: HTML email with embedded images
print("\n4ļøā£ HTML Email with Images:")
html_content = """
Welcome!
Thanks for signing up.
Best regards,
The Team
Pro Tip: Think of SMTP email sending as running a digital post office - you need proper credentials, correct addresses, and reliable delivery mechanisms. Always use app passwords or OAuth2 instead of regular passwords - most providers block less secure access. Keep your SMTP connection alive when sending multiple emails to avoid connection overhead. Implement proper rate limiting to avoid being flagged as spam. Use HTML templates for rich content but always include a plain text fallback. Handle attachments carefully - large files can cause delivery issues. Queue emails for asynchronous processing to avoid blocking your application. Monitor delivery rates and handle bounces properly. Test with different email providers as each has quirks. Most importantly: respect recipients' privacy with BCC for bulk sends and always provide unsubscribe options!