āļø AWS SDK (boto3): Master Cloud Automation with Python
AWS SDK for Python (boto3) is your gateway to the entire Amazon Web Services ecosystem - it allows you to programmatically control hundreds of AWS services, from EC2 instances to S3 storage, Lambda functions to DynamoDB databases. Like having the keys to the world's largest cloud infrastructure, mastering boto3 enables you to automate cloud deployments, manage resources at scale, and build sophisticated cloud-native applications. Let's explore the comprehensive world of AWS automation with Python! š
The AWS Automation Architecture
Think of boto3 as your universal remote control for AWS - it provides Python interfaces to every AWS service, allowing you to create, configure, monitor, and manage cloud resources programmatically. From simple S3 file operations to complex multi-service orchestrations, boto3 handles authentication, request signing, error handling, and pagination automatically. Understanding boto3's client/resource models, session management, and service integrations is essential for effective cloud automation!
Real-World Scenario: The Cloud Infrastructure Automation Platform šļø
You're building a comprehensive AWS automation platform that manages EC2 instances for auto-scaling applications, handles S3 storage for backups and static content, deploys Lambda functions for serverless processing, manages DynamoDB tables for application data, configures VPCs and security groups, monitors resources with CloudWatch, manages costs and billing, and orchestrates multi-service deployments. Your system must handle thousands of resources, implement security best practices, optimize costs, and provide disaster recovery capabilities. Let's build a production-ready AWS automation framework!
# First, install required packages:
# pip install boto3 awscli botocore python-dotenv pandas matplotlib
import boto3
from botocore.exceptions import ClientError, BotoCoreError
import json
import os
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
import time
from pathlib import Path
import hashlib
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# ==================== AWS Configuration ====================
@dataclass
class AWSConfig:
"""AWS configuration and credentials."""
access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None
session_token: Optional[str] = None
region: str = "us-east-1"
# Service endpoints (for local testing with LocalStack)
endpoint_url: Optional[str] = None
# Default settings
default_tags: Dict[str, str] = field(default_factory=lambda: {
"ManagedBy": "boto3-automation",
"Environment": "production",
"CreatedAt": datetime.now().isoformat()
})
# Retry configuration
max_retries: int = 3
retry_delay: float = 1.0
# Cost management
enable_cost_optimization: bool = True
max_monthly_budget: float = 1000.0
@classmethod
def from_env(cls):
"""Load configuration from environment variables."""
return cls(
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
session_token=os.getenv("AWS_SESSION_TOKEN"),
region=os.getenv("AWS_DEFAULT_REGION", "us-east-1")
)
# ==================== AWS Session Manager ====================
class AWSSessionManager:
"""Manage AWS sessions and clients."""
def __init__(self, config: AWSConfig):
self.config = config
self.session = self._create_session()
self.clients = {}
self.resources = {}
self.logger = logging.getLogger(__name__)
def _create_session(self) -> boto3.Session:
"""Create boto3 session."""
session_kwargs = {
'region_name': self.config.region
}
if self.config.access_key_id:
session_kwargs['aws_access_key_id'] = self.config.access_key_id
if self.config.secret_access_key:
session_kwargs['aws_secret_access_key'] = self.config.secret_access_key
if self.config.session_token:
session_kwargs['aws_session_token'] = self.config.session_token
return boto3.Session(**session_kwargs)
def get_client(self, service_name: str) -> Any:
"""Get or create service client."""
if service_name not in self.clients:
client_kwargs = {}
if self.config.endpoint_url:
client_kwargs['endpoint_url'] = self.config.endpoint_url
self.clients[service_name] = self.session.client(
service_name,
**client_kwargs
)
return self.clients[service_name]
def get_resource(self, service_name: str) -> Any:
"""Get or create service resource."""
if service_name not in self.resources:
resource_kwargs = {}
if self.config.endpoint_url:
resource_kwargs['endpoint_url'] = self.config.endpoint_url
self.resources[service_name] = self.session.resource(
service_name,
**resource_kwargs
)
return self.resources[service_name]
def get_credentials(self) -> Dict[str, str]:
"""Get current credentials."""
credentials = self.session.get_credentials()
return {
'access_key': credentials.access_key,
'secret_key': credentials.secret_key,
'token': credentials.token
}
def list_regions(self, service_name: str = 'ec2') -> List[str]:
"""List available regions for a service."""
client = self.get_client(service_name)
regions = client.describe_regions()
return [region['RegionName'] for region in regions['Regions']]
# ==================== EC2 Manager ====================
class EC2Manager:
"""Manage EC2 instances and related resources."""
def __init__(self, session_manager: AWSSessionManager):
self.session_manager = session_manager
self.ec2_client = session_manager.get_client('ec2')
self.ec2_resource = session_manager.get_resource('ec2')
self.config = session_manager.config
self.logger = logging.getLogger(__name__)
def create_instance(
self,
name: str,
instance_type: str = "t2.micro",
ami_id: Optional[str] = None,
key_name: Optional[str] = None,
security_group_ids: Optional[List[str]] = None,
subnet_id: Optional[str] = None,
user_data: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""
Create EC2 instance.
Args:
name: Instance name tag
instance_type: EC2 instance type
ami_id: AMI ID (uses latest Amazon Linux 2 if not specified)
key_name: SSH key pair name
security_group_ids: Security group IDs
subnet_id: Subnet ID for VPC
user_data: User data script
Returns:
Instance information dictionary
"""
try:
# Get latest Amazon Linux 2 AMI if not specified
if not ami_id:
ami_id = self._get_latest_amazon_linux_ami()
# Prepare instance parameters
run_params = {
'ImageId': ami_id,
'InstanceType': instance_type,
'MinCount': 1,
'MaxCount': 1,
'TagSpecifications': [{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Name', 'Value': name},
*[{'Key': k, 'Value': v} for k, v in self.config.default_tags.items()]
]
}]
}
if key_name:
run_params['KeyName'] = key_name
if security_group_ids:
run_params['SecurityGroupIds'] = security_group_ids
if subnet_id:
run_params['SubnetId'] = subnet_id
if user_data:
run_params['UserData'] = user_data
# Add additional parameters
run_params.update(kwargs)
# Create instance
response = self.ec2_client.run_instances(**run_params)
instance = response['Instances'][0]
instance_id = instance['InstanceId']
self.logger.info(f"Created EC2 instance: {instance_id} ({name})")
# Wait for instance to be running
waiter = self.ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
# Get updated instance info
instance_info = self.get_instance_info(instance_id)
return instance_info
except ClientError as e:
self.logger.error(f"Failed to create EC2 instance: {e}")
raise
def _get_latest_amazon_linux_ami(self) -> str:
"""Get the latest Amazon Linux 2 AMI ID."""
response = self.ec2_client.describe_images(
Owners=['amazon'],
Filters=[
{'Name': 'name', 'Values': ['amzn2-ami-hvm-*-x86_64-gp2']},
{'Name': 'state', 'Values': ['available']}
]
)
# Sort by creation date and get the latest
images = sorted(response['Images'], key=lambda x: x['CreationDate'], reverse=True)
if images:
return images[0]['ImageId']
else:
raise ValueError("No Amazon Linux 2 AMI found")
def list_instances(self, filters: Optional[List[Dict]] = None) -> List[Dict[str, Any]]:
"""List EC2 instances."""
try:
params = {}
if filters:
params['Filters'] = filters
response = self.ec2_client.describe_instances(**params)
instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instances.append({
'InstanceId': instance['InstanceId'],
'Name': self._get_tag_value(instance.get('Tags', []), 'Name'),
'State': instance['State']['Name'],
'InstanceType': instance['InstanceType'],
'PublicIpAddress': instance.get('PublicIpAddress'),
'PrivateIpAddress': instance.get('PrivateIpAddress'),
'LaunchTime': instance['LaunchTime'].isoformat(),
'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
})
return instances
except ClientError as e:
self.logger.error(f"Failed to list EC2 instances: {e}")
raise
def get_instance_info(self, instance_id: str) -> Dict[str, Any]:
"""Get detailed instance information."""
try:
response = self.ec2_client.describe_instances(InstanceIds=[instance_id])
if response['Reservations']:
instance = response['Reservations'][0]['Instances'][0]
return {
'InstanceId': instance['InstanceId'],
'Name': self._get_tag_value(instance.get('Tags', []), 'Name'),
'State': instance['State']['Name'],
'InstanceType': instance['InstanceType'],
'PublicIpAddress': instance.get('PublicIpAddress'),
'PrivateIpAddress': instance.get('PrivateIpAddress'),
'PublicDnsName': instance.get('PublicDnsName'),
'PrivateDnsName': instance.get('PrivateDnsName'),
'LaunchTime': instance['LaunchTime'].isoformat(),
'SecurityGroups': instance.get('SecurityGroups', []),
'SubnetId': instance.get('SubnetId'),
'VpcId': instance.get('VpcId'),
'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
}
return {}
except ClientError as e:
self.logger.error(f"Failed to get instance info: {e}")
raise
def stop_instance(self, instance_id: str, force: bool = False) -> bool:
"""Stop EC2 instance."""
try:
self.ec2_client.stop_instances(InstanceIds=[instance_id], Force=force)
# Wait for instance to stop
waiter = self.ec2_client.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
self.logger.info(f"Stopped EC2 instance: {instance_id}")
return True
except ClientError as e:
self.logger.error(f"Failed to stop instance: {e}")
return False
def start_instance(self, instance_id: str) -> bool:
"""Start EC2 instance."""
try:
self.ec2_client.start_instances(InstanceIds=[instance_id])
# Wait for instance to start
waiter = self.ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
self.logger.info(f"Started EC2 instance: {instance_id}")
return True
except ClientError as e:
self.logger.error(f"Failed to start instance: {e}")
return False
def terminate_instance(self, instance_id: str) -> bool:
"""Terminate EC2 instance."""
try:
self.ec2_client.terminate_instances(InstanceIds=[instance_id])
self.logger.info(f"Terminated EC2 instance: {instance_id}")
return True
except ClientError as e:
self.logger.error(f"Failed to terminate instance: {e}")
return False
def create_security_group(
self,
name: str,
description: str,
vpc_id: Optional[str] = None,
ingress_rules: Optional[List[Dict]] = None
) -> str:
"""Create security group."""
try:
params = {
'GroupName': name,
'Description': description
}
if vpc_id:
params['VpcId'] = vpc_id
response = self.ec2_client.create_security_group(**params)
group_id = response['GroupId']
# Add ingress rules if specified
if ingress_rules:
self.ec2_client.authorize_security_group_ingress(
GroupId=group_id,
IpPermissions=ingress_rules
)
self.logger.info(f"Created security group: {group_id} ({name})")
return group_id
except ClientError as e:
self.logger.error(f"Failed to create security group: {e}")
raise
def _get_tag_value(self, tags: List[Dict], key: str) -> Optional[str]:
"""Get tag value by key."""
for tag in tags:
if tag['Key'] == key:
return tag['Value']
return None
# ==================== S3 Manager ====================
class S3Manager:
"""Manage S3 buckets and objects."""
def __init__(self, session_manager: AWSSessionManager):
self.session_manager = session_manager
self.s3_client = session_manager.get_client('s3')
self.s3_resource = session_manager.get_resource('s3')
self.config = session_manager.config
self.logger = logging.getLogger(__name__)
def create_bucket(
self,
bucket_name: str,
region: Optional[str] = None,
versioning: bool = False,
encryption: bool = True,
public_access_block: bool = True
) -> bool:
"""
Create S3 bucket with security best practices.
Args:
bucket_name: Name of the bucket
region: AWS region (uses default if not specified)
versioning: Enable versioning
encryption: Enable server-side encryption
public_access_block: Block public access
Returns:
True if successful
"""
try:
# Create bucket
create_params = {'Bucket': bucket_name}
# Add location constraint for regions other than us-east-1
region = region or self.config.region
if region != 'us-east-1':
create_params['CreateBucketConfiguration'] = {
'LocationConstraint': region
}
self.s3_client.create_bucket(**create_params)
# Enable versioning if requested
if versioning:
self.s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# Enable encryption if requested
if encryption:
self.s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
}]
}
)
# Block public access if requested
if public_access_block:
self.s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
# Add tags
self.s3_client.put_bucket_tagging(
Bucket=bucket_name,
Tagging={
'TagSet': [
{'Key': k, 'Value': v}
for k, v in self.config.default_tags.items()
]
}
)
self.logger.info(f"Created S3 bucket: {bucket_name}")
return True
except ClientError as e:
if e.response['Error']['Code'] == 'BucketAlreadyExists':
self.logger.warning(f"Bucket already exists: {bucket_name}")
else:
self.logger.error(f"Failed to create bucket: {e}")
return False
def upload_file(
self,
file_path: str,
bucket_name: str,
object_key: Optional[str] = None,
metadata: Optional[Dict] = None,
content_type: Optional[str] = None
) -> bool:
"""Upload file to S3."""
try:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Use filename as key if not specified
if not object_key:
object_key = file_path.name
# Prepare upload parameters
extra_args = {}
if metadata:
extra_args['Metadata'] = metadata
if content_type:
extra_args['ContentType'] = content_type
# Upload file
self.s3_client.upload_file(
str(file_path),
bucket_name,
object_key,
ExtraArgs=extra_args if extra_args else None
)
self.logger.info(f"Uploaded {file_path} to s3://{bucket_name}/{object_key}")
return True
except Exception as e:
self.logger.error(f"Failed to upload file: {e}")
return False
def download_file(
self,
bucket_name: str,
object_key: str,
file_path: str
) -> bool:
"""Download file from S3."""
try:
self.s3_client.download_file(bucket_name, object_key, file_path)
self.logger.info(f"Downloaded s3://{bucket_name}/{object_key} to {file_path}")
return True
except ClientError as e:
self.logger.error(f"Failed to download file: {e}")
return False
def list_objects(
self,
bucket_name: str,
prefix: Optional[str] = None,
max_keys: int = 1000
) -> List[Dict[str, Any]]:
"""List objects in bucket."""
try:
params = {
'Bucket': bucket_name,
'MaxKeys': max_keys
}
if prefix:
params['Prefix'] = prefix
response = self.s3_client.list_objects_v2(**params)
objects = []
for obj in response.get('Contents', []):
objects.append({
'Key': obj['Key'],
'Size': obj['Size'],
'LastModified': obj['LastModified'].isoformat(),
'ETag': obj['ETag'].strip('"')
})
return objects
except ClientError as e:
self.logger.error(f"Failed to list objects: {e}")
return []
def delete_object(self, bucket_name: str, object_key: str) -> bool:
"""Delete object from S3."""
try:
self.s3_client.delete_object(Bucket=bucket_name, Key=object_key)
self.logger.info(f"Deleted s3://{bucket_name}/{object_key}")
return True
except ClientError as e:
self.logger.error(f"Failed to delete object: {e}")
return False
def generate_presigned_url(
self,
bucket_name: str,
object_key: str,
expiration: int = 3600
) -> Optional[str]:
"""Generate presigned URL for object."""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': object_key},
ExpiresIn=expiration
)
return url
except ClientError as e:
self.logger.error(f"Failed to generate presigned URL: {e}")
return None
# ==================== Lambda Manager ====================
class LambdaManager:
"""Manage Lambda functions."""
def __init__(self, session_manager: AWSSessionManager):
self.session_manager = session_manager
self.lambda_client = session_manager.get_client('lambda')
self.iam_client = session_manager.get_client('iam')
self.config = session_manager.config
self.logger = logging.getLogger(__name__)
def create_function(
self,
function_name: str,
runtime: str,
handler: str,
code_zip_path: str,
role_arn: Optional[str] = None,
memory_size: int = 128,
timeout: int = 30,
environment_vars: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Create Lambda function.
Args:
function_name: Name of the function
runtime: Runtime (e.g., 'python3.9')
handler: Handler (e.g., 'lambda_function.lambda_handler')
code_zip_path: Path to deployment package
role_arn: IAM role ARN (creates basic role if not provided)
memory_size: Memory in MB (128-10240)
timeout: Timeout in seconds (1-900)
environment_vars: Environment variables
Returns:
Function configuration
"""
try:
# Create basic execution role if not provided
if not role_arn:
role_arn = self._create_lambda_execution_role(function_name)
# Wait for role to be available
time.sleep(10)
# Read deployment package
with open(code_zip_path, 'rb') as f:
zip_content = f.read()
# Prepare function configuration
config = {
'FunctionName': function_name,
'Runtime': runtime,
'Role': role_arn,
'Handler': handler,
'Code': {'ZipFile': zip_content},
'MemorySize': memory_size,
'Timeout': timeout,
'Tags': self.config.default_tags
}
if environment_vars:
config['Environment'] = {'Variables': environment_vars}
# Create function
response = self.lambda_client.create_function(**config)
self.logger.info(f"Created Lambda function: {function_name}")
return {
'FunctionName': response['FunctionName'],
'FunctionArn': response['FunctionArn'],
'Runtime': response['Runtime'],
'Handler': response['Handler'],
'MemorySize': response['MemorySize'],
'Timeout': response['Timeout'],
'State': response['State']
}
except ClientError as e:
self.logger.error(f"Failed to create Lambda function: {e}")
raise
def _create_lambda_execution_role(self, function_name: str) -> str:
"""Create basic Lambda execution role."""
role_name = f"{function_name}-execution-role"
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}
try:
# Create role
response = self.iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Tags=[
{'Key': k, 'Value': v}
for k, v in self.config.default_tags.items()
]
)
# Attach basic execution policy
self.iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
)
return response['Role']['Arn']
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
# Role already exists, get its ARN
response = self.iam_client.get_role(RoleName=role_name)
return response['Role']['Arn']
raise
def invoke_function(
self,
function_name: str,
payload: Optional[Dict] = None,
invocation_type: str = 'RequestResponse'
) -> Dict[str, Any]:
"""
Invoke Lambda function.
Args:
function_name: Name of the function
payload: Input payload
invocation_type: 'RequestResponse' (sync) or 'Event' (async)
Returns:
Response from function
"""
try:
params = {
'FunctionName': function_name,
'InvocationType': invocation_type
}
if payload:
params['Payload'] = json.dumps(payload)
response = self.lambda_client.invoke(**params)
result = {
'StatusCode': response['StatusCode'],
'ExecutedVersion': response.get('ExecutedVersion'),
'LogResult': response.get('LogResult')
}
if 'Payload' in response:
result['Payload'] = json.loads(response['Payload'].read())
return result
except ClientError as e:
self.logger.error(f"Failed to invoke function: {e}")
raise
def list_functions(self) -> List[Dict[str, Any]]:
"""List Lambda functions."""
try:
response = self.lambda_client.list_functions()
functions = []
for func in response['Functions']:
functions.append({
'FunctionName': func['FunctionName'],
'FunctionArn': func['FunctionArn'],
'Runtime': func['Runtime'],
'Handler': func['Handler'],
'CodeSize': func['CodeSize'],
'MemorySize': func['MemorySize'],
'Timeout': func['Timeout'],
'LastModified': func['LastModified']
})
return functions
except ClientError as e:
self.logger.error(f"Failed to list functions: {e}")
return []
def update_function_code(
self,
function_name: str,
code_zip_path: str
) -> bool:
"""Update function code."""
try:
with open(code_zip_path, 'rb') as f:
zip_content = f.read()
self.lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_content
)
self.logger.info(f"Updated code for function: {function_name}")
return True
except ClientError as e:
self.logger.error(f"Failed to update function code: {e}")
return False
def delete_function(self, function_name: str) -> bool:
"""Delete Lambda function."""
try:
self.lambda_client.delete_function(FunctionName=function_name)
self.logger.info(f"Deleted Lambda function: {function_name}")
return True
except ClientError as e:
self.logger.error(f"Failed to delete function: {e}")
return False
# ==================== DynamoDB Manager ====================
class DynamoDBManager:
"""Manage DynamoDB tables."""
def __init__(self, session_manager: AWSSessionManager):
self.session_manager = session_manager
self.dynamodb_client = session_manager.get_client('dynamodb')
self.dynamodb_resource = session_manager.get_resource('dynamodb')
self.config = session_manager.config
self.logger = logging.getLogger(__name__)
def create_table(
self,
table_name: str,
partition_key: Tuple[str, str], # (name, type)
sort_key: Optional[Tuple[str, str]] = None,
read_capacity: int = 5,
write_capacity: int = 5,
billing_mode: str = 'PROVISIONED'
) -> bool:
"""
Create DynamoDB table.
Args:
table_name: Name of the table
partition_key: Tuple of (key_name, key_type) e.g., ('id', 'S')
sort_key: Optional tuple of (key_name, key_type)
read_capacity: Read capacity units
write_capacity: Write capacity units
billing_mode: 'PROVISIONED' or 'PAY_PER_REQUEST'
Returns:
True if successful
"""
try:
# Prepare key schema
key_schema = [
{
'AttributeName': partition_key[0],
'KeyType': 'HASH'
}
]
# Prepare attribute definitions
attribute_definitions = [
{
'AttributeName': partition_key[0],
'AttributeType': partition_key[1]
}
]
if sort_key:
key_schema.append({
'AttributeName': sort_key[0],
'KeyType': 'RANGE'
})
attribute_definitions.append({
'AttributeName': sort_key[0],
'AttributeType': sort_key[1]
})
# Create table
params = {
'TableName': table_name,
'KeySchema': key_schema,
'AttributeDefinitions': attribute_definitions,
'BillingMode': billing_mode,
'Tags': [
{'Key': k, 'Value': v}
for k, v in self.config.default_tags.items()
]
}
if billing_mode == 'PROVISIONED':
params['ProvisionedThroughput'] = {
'ReadCapacityUnits': read_capacity,
'WriteCapacityUnits': write_capacity
}
self.dynamodb_client.create_table(**params)
# Wait for table to be active
waiter = self.dynamodb_client.get_waiter('table_exists')
waiter.wait(TableName=table_name)
self.logger.info(f"Created DynamoDB table: {table_name}")
return True
except ClientError as e:
self.logger.error(f"Failed to create table: {e}")
return False
def put_item(self, table_name: str, item: Dict[str, Any]) -> bool:
"""Put item in table."""
try:
table = self.dynamodb_resource.Table(table_name)
table.put_item(Item=item)
return True
except ClientError as e:
self.logger.error(f"Failed to put item: {e}")
return False
def get_item(
self,
table_name: str,
key: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Get item from table."""
try:
table = self.dynamodb_resource.Table(table_name)
response = table.get_item(Key=key)
return response.get('Item')
except ClientError as e:
self.logger.error(f"Failed to get item: {e}")
return None
def query(
self,
table_name: str,
key_condition: str,
expression_values: Dict[str, Any],
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Query table."""
try:
table = self.dynamodb_resource.Table(table_name)
params = {
'KeyConditionExpression': key_condition,
'ExpressionAttributeValues': expression_values
}
if limit:
params['Limit'] = limit
response = table.query(**params)
return response.get('Items', [])
except ClientError as e:
self.logger.error(f"Failed to query table: {e}")
return []
# ==================== CloudWatch Manager ====================
class CloudWatchManager:
"""Manage CloudWatch metrics and alarms."""
def __init__(self, session_manager: AWSSessionManager):
self.session_manager = session_manager
self.cloudwatch_client = session_manager.get_client('cloudwatch')
self.logs_client = session_manager.get_client('logs')
self.config = session_manager.config
self.logger = logging.getLogger(__name__)
def put_metric(
self,
namespace: str,
metric_name: str,
value: float,
unit: str = 'None',
dimensions: Optional[List[Dict]] = None
) -> bool:
"""Put custom metric to CloudWatch."""
try:
params = {
'Namespace': namespace,
'MetricData': [{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.now()
}]
}
if dimensions:
params['MetricData'][0]['Dimensions'] = dimensions
self.cloudwatch_client.put_metric_data(**params)
return True
except ClientError as e:
self.logger.error(f"Failed to put metric: {e}")
return False
def create_alarm(
self,
alarm_name: str,
metric_name: str,
namespace: str,
threshold: float,
comparison_operator: str = 'GreaterThanThreshold',
evaluation_periods: int = 1,
period: int = 300,
statistic: str = 'Average'
) -> bool:
"""Create CloudWatch alarm."""
try:
self.cloudwatch_client.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator=comparison_operator,
EvaluationPeriods=evaluation_periods,
MetricName=metric_name,
Namespace=namespace,
Period=period,
Statistic=statistic,
Threshold=threshold,
ActionsEnabled=True,
AlarmDescription=f'Alarm for {metric_name}',
Tags=[
{'Key': k, 'Value': v}
for k, v in self.config.default_tags.items()
]
)
self.logger.info(f"Created CloudWatch alarm: {alarm_name}")
return True
except ClientError as e:
self.logger.error(f"Failed to create alarm: {e}")
return False
# ==================== Main AWS Automation Class ====================
class AWSAutomation:
"""High-level AWS automation interface."""
def __init__(self, config: Optional[AWSConfig] = None):
self.config = config or AWSConfig.from_env()
self.session_manager = AWSSessionManager(self.config)
# Initialize service managers
self.ec2 = EC2Manager(self.session_manager)
self.s3 = S3Manager(self.session_manager)
self.lambda_mgr = LambdaManager(self.session_manager)
self.dynamodb = DynamoDBManager(self.session_manager)
self.cloudwatch = CloudWatchManager(self.session_manager)
self.logger = logging.getLogger(__name__)
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def get_account_info(self) -> Dict[str, Any]:
"""Get AWS account information."""
try:
sts_client = self.session_manager.get_client('sts')
response = sts_client.get_caller_identity()
return {
'Account': response['Account'],
'UserId': response['UserId'],
'Arn': response['Arn'],
'Region': self.config.region
}
except ClientError as e:
self.logger.error(f"Failed to get account info: {e}")
return {}
def estimate_monthly_cost(self) -> float:
"""Estimate monthly AWS costs."""
try:
ce_client = self.session_manager.get_client('ce')
# Get cost for current month
start_date = datetime.now().replace(day=1).strftime('%Y-%m-%d')
end_date = datetime.now().strftime('%Y-%m-%d')
response = ce_client.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Granularity='MONTHLY',
Metrics=['UnblendedCost']
)
if response['ResultsByTime']:
cost = float(response['ResultsByTime'][0]['Total']['UnblendedCost']['Amount'])
return cost
return 0.0
except ClientError as e:
self.logger.error(f"Failed to estimate cost: {e}")
return 0.0
# Example usage
if __name__ == "__main__":
print("āļø AWS SDK (boto3) Examples\n")
# Example 1: Initialize AWS automation
print("1ļøā£ Initializing AWS Automation:")
config = AWSConfig.from_env()
aws = AWSAutomation(config)
# Get account info
account_info = aws.get_account_info()
if account_info:
print(f" Account: {account_info.get('Account', 'N/A')}")
print(f" Region: {account_info.get('Region', 'N/A')}")
print(f" User: {account_info.get('UserId', 'N/A')}")
# Example 2: EC2 operations
print("\n2ļøā£ EC2 Instance Operations:")
print(" Creating instance...")
print(" instance = aws.ec2.create_instance(")
print(" name='web-server',")
print(" instance_type='t2.micro',")
print(" key_name='my-key'")
print(" )")
# List instances
instances = aws.ec2.list_instances()
print(f"\n Active instances: {len(instances)}")
for instance in instances[:3]: # Show first 3
print(f" ⢠{instance.get('Name', 'Unnamed')} ({instance['InstanceId']}) - {instance['State']}")
# Example 3: S3 operations
print("\n3ļøā£ S3 Storage Operations:")
print(" Creating bucket...")
print(" aws.s3.create_bucket(")
print(" bucket_name='my-app-data',")
print(" versioning=True,")
print(" encryption=True")
print(" )")
print("\n Uploading file...")
print(" aws.s3.upload_file(")
print(" file_path='data.csv',")
print(" bucket_name='my-app-data',")
print(" object_key='data/2024/data.csv'")
print(" )")
# Example 4: Lambda operations
print("\n4ļøā£ Lambda Function Operations:")
print(" Creating function...")
print(" aws.lambda_mgr.create_function(")
print(" function_name='data-processor',")
print(" runtime='python3.9',")
print(" handler='lambda_function.handler',")
print(" code_zip_path='function.zip'")
print(" )")
# Example 5: DynamoDB operations
print("\n5ļøā£ DynamoDB Operations:")
print(" Creating table...")
print(" aws.dynamodb.create_table(")
print(" table_name='users',")
print(" partition_key=('user_id', 'S'),")
print(" sort_key=('created_at', 'N')")
print(" )")
# Example 6: Common boto3 patterns
print("\n6ļøā£ Common boto3 Patterns:")
patterns = [
("Client vs Resource", "Low-level vs High-level API"),
("Pagination", "Handle large result sets"),
("Waiters", "Wait for resource state changes"),
("Exception handling", "Handle ClientError properly"),
("Retry logic", "Implement exponential backoff"),
("Batch operations", "Process items in batches")
]
for pattern, description in patterns:
print(f" {pattern}: {description}")
# Example 7: Cost optimization
print("\n7ļøā£ Cost Optimization:")
tips = [
"Use appropriate instance types",
"Enable S3 lifecycle policies",
"Use Lambda for serverless workloads",
"Implement auto-scaling",
"Delete unused resources",
"Use Reserved Instances for steady workloads",
"Enable CloudWatch alarms for billing"
]
for tip in tips:
print(f" ⢠{tip}")
# Example 8: Security best practices
print("\n8ļøā£ Security Best Practices:")
practices = [
"š Never hardcode credentials",
"š Use IAM roles instead of keys",
"š”ļø Enable MFA for production",
"š Follow least privilege principle",
"š Encrypt data at rest and in transit",
"š Enable CloudTrail logging",
"šØ Set up security alerts",
"š Rotate credentials regularly"
]
for practice in practices:
print(f" {practice}")
# Example 9: Error handling
print("\n9ļøā£ Error Handling:")
print(" try:")
print(" response = ec2_client.describe_instances()")
print(" except ClientError as e:")
print(" error_code = e.response['Error']['Code']")
print(" if error_code == 'UnauthorizedOperation':")
print(" print('Permission denied')")
print(" else:")
print(" raise")
# Example 10: Resource tagging
print("\nš Resource Tagging Strategy:")
tags = [
("Environment", "dev/staging/prod"),
("Project", "Project name"),
("Owner", "Team or person"),
("CostCenter", "Billing department"),
("CreatedBy", "Automation tool"),
("CreatedAt", "Timestamp")
]
for tag, value in tags:
print(f" {tag}: {value}")
print("\nā
AWS boto3 demonstration complete!")
print("\nš Note: To use this code, you need:")
print(" 1. AWS Account and credentials")
print(" 2. Configure AWS CLI: aws configure")
print(" 3. Set environment variables or use IAM roles")
print(" 4. Install boto3: pip install boto3")
Key Takeaways and Best Practices šÆ
- Secure Credentials: Never hardcode AWS credentials in your code.
- Use IAM Roles: Prefer IAM roles over access keys when possible.
- Handle Errors: Always catch and handle ClientError exceptions.
- Tag Resources: Implement consistent tagging for cost tracking.
- Monitor Costs: Set up billing alerts and monitor usage.
- Use Pagination: Handle large result sets with pagination.
- Implement Retries: Use exponential backoff for transient failures.
- Clean Up Resources: Delete unused resources to avoid charges.
AWS Automation Best Practices š
Mastering AWS automation with boto3 enables you to harness the full power of cloud computing programmatically. You can now provision infrastructure, deploy applications, manage storage, process data, and orchestrate complex cloud workflows all through Python code. Whether you're building scalable web applications, data pipelines, or enterprise systems, these AWS automation skills empower you to leverage the cloud effectively! š
Pro Tip: Think of boto3 as your Swiss Army knife for AWS - it can do everything, but you need to use it wisely. Always start with proper credential management: use IAM roles on EC2, environment variables locally, and never commit credentials to version control. Understand the difference between clients (low-level, full control) and resources (high-level, convenient) - use resources for simple operations and clients for advanced features. Implement comprehensive error handling - AWS operations can fail for many reasons (permissions, limits, network). Always tag your resources consistently for cost tracking and organization. Use waiters when creating resources to ensure they're ready before using them. Implement pagination for list operations to handle large result sets. Monitor your AWS costs regularly and set up billing alerts. Use CloudWatch for monitoring and logging. Test your automation in a development account before running in production. Most importantly: always follow the principle of least privilege - grant only the minimum permissions necessary for your automation to function!