Skip to main content

ā˜ļø AWS SDK (boto3): Master Cloud Automation with Python

AWS SDK for Python (boto3) is your gateway to the entire Amazon Web Services ecosystem - it allows you to programmatically control hundreds of AWS services, from EC2 instances to S3 storage, Lambda functions to DynamoDB databases. Like having the keys to the world's largest cloud infrastructure, mastering boto3 enables you to automate cloud deployments, manage resources at scale, and build sophisticated cloud-native applications. Let's explore the comprehensive world of AWS automation with Python! šŸš€

The AWS Automation Architecture

Think of boto3 as your universal remote control for AWS - it provides Python interfaces to every AWS service, allowing you to create, configure, monitor, and manage cloud resources programmatically. From simple S3 file operations to complex multi-service orchestrations, boto3 handles authentication, request signing, error handling, and pagination automatically. Understanding boto3's client/resource models, session management, and service integrations is essential for effective cloud automation!

graph TB A[AWS Automation with boto3] --> B[Core Services] A --> C[Resource Management] A --> D[Security & IAM] A --> E[Monitoring & Logging] B --> F[EC2 Instances] B --> G[S3 Storage] B --> H[Lambda Functions] B --> I[DynamoDB] C --> J[Creation] C --> K[Configuration] C --> L[Scaling] C --> M[Deletion] D --> N[IAM Roles] D --> O[Security Groups] D --> P[KMS Encryption] D --> Q[Secrets Manager] E --> R[CloudWatch] E --> S[CloudTrail] E --> T[Cost Explorer] E --> U[Systems Manager] V[Operations] --> W[Automation] V --> X[Deployment] V --> Y[Backup] V --> Z[Disaster Recovery] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Cloud Infrastructure Automation Platform šŸ—ļø

You're building a comprehensive AWS automation platform that manages EC2 instances for auto-scaling applications, handles S3 storage for backups and static content, deploys Lambda functions for serverless processing, manages DynamoDB tables for application data, configures VPCs and security groups, monitors resources with CloudWatch, manages costs and billing, and orchestrates multi-service deployments. Your system must handle thousands of resources, implement security best practices, optimize costs, and provide disaster recovery capabilities. Let's build a production-ready AWS automation framework!

# First, install required packages:
# pip install boto3 awscli botocore python-dotenv pandas matplotlib

import boto3
from botocore.exceptions import ClientError, BotoCoreError
import json
import os
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
import time
from pathlib import Path
import hashlib
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# ==================== AWS Configuration ====================

@dataclass
class AWSConfig:
    """AWS configuration and credentials."""
    access_key_id: Optional[str] = None
    secret_access_key: Optional[str] = None
    session_token: Optional[str] = None
    region: str = "us-east-1"
    
    # Service endpoints (for local testing with LocalStack)
    endpoint_url: Optional[str] = None
    
    # Default settings
    default_tags: Dict[str, str] = field(default_factory=lambda: {
        "ManagedBy": "boto3-automation",
        "Environment": "production",
        "CreatedAt": datetime.now().isoformat()
    })
    
    # Retry configuration
    max_retries: int = 3
    retry_delay: float = 1.0
    
    # Cost management
    enable_cost_optimization: bool = True
    max_monthly_budget: float = 1000.0
    
    @classmethod
    def from_env(cls):
        """Load configuration from environment variables."""
        return cls(
            access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
            secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
            session_token=os.getenv("AWS_SESSION_TOKEN"),
            region=os.getenv("AWS_DEFAULT_REGION", "us-east-1")
        )

# ==================== AWS Session Manager ====================

class AWSSessionManager:
    """Manage AWS sessions and clients."""
    
    def __init__(self, config: AWSConfig):
        self.config = config
        self.session = self._create_session()
        self.clients = {}
        self.resources = {}
        self.logger = logging.getLogger(__name__)
    
    def _create_session(self) -> boto3.Session:
        """Create boto3 session."""
        session_kwargs = {
            'region_name': self.config.region
        }
        
        if self.config.access_key_id:
            session_kwargs['aws_access_key_id'] = self.config.access_key_id
        if self.config.secret_access_key:
            session_kwargs['aws_secret_access_key'] = self.config.secret_access_key
        if self.config.session_token:
            session_kwargs['aws_session_token'] = self.config.session_token
        
        return boto3.Session(**session_kwargs)
    
    def get_client(self, service_name: str) -> Any:
        """Get or create service client."""
        if service_name not in self.clients:
            client_kwargs = {}
            if self.config.endpoint_url:
                client_kwargs['endpoint_url'] = self.config.endpoint_url
            
            self.clients[service_name] = self.session.client(
                service_name,
                **client_kwargs
            )
        
        return self.clients[service_name]
    
    def get_resource(self, service_name: str) -> Any:
        """Get or create service resource."""
        if service_name not in self.resources:
            resource_kwargs = {}
            if self.config.endpoint_url:
                resource_kwargs['endpoint_url'] = self.config.endpoint_url
            
            self.resources[service_name] = self.session.resource(
                service_name,
                **resource_kwargs
            )
        
        return self.resources[service_name]
    
    def get_credentials(self) -> Dict[str, str]:
        """Get current credentials."""
        credentials = self.session.get_credentials()
        return {
            'access_key': credentials.access_key,
            'secret_key': credentials.secret_key,
            'token': credentials.token
        }
    
    def list_regions(self, service_name: str = 'ec2') -> List[str]:
        """List available regions for a service."""
        client = self.get_client(service_name)
        regions = client.describe_regions()
        return [region['RegionName'] for region in regions['Regions']]

# ==================== EC2 Manager ====================

class EC2Manager:
    """Manage EC2 instances and related resources."""
    
    def __init__(self, session_manager: AWSSessionManager):
        self.session_manager = session_manager
        self.ec2_client = session_manager.get_client('ec2')
        self.ec2_resource = session_manager.get_resource('ec2')
        self.config = session_manager.config
        self.logger = logging.getLogger(__name__)
    
    def create_instance(
        self,
        name: str,
        instance_type: str = "t2.micro",
        ami_id: Optional[str] = None,
        key_name: Optional[str] = None,
        security_group_ids: Optional[List[str]] = None,
        subnet_id: Optional[str] = None,
        user_data: Optional[str] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """
        Create EC2 instance.
        
        Args:
            name: Instance name tag
            instance_type: EC2 instance type
            ami_id: AMI ID (uses latest Amazon Linux 2 if not specified)
            key_name: SSH key pair name
            security_group_ids: Security group IDs
            subnet_id: Subnet ID for VPC
            user_data: User data script
            
        Returns:
            Instance information dictionary
        """
        try:
            # Get latest Amazon Linux 2 AMI if not specified
            if not ami_id:
                ami_id = self._get_latest_amazon_linux_ami()
            
            # Prepare instance parameters
            run_params = {
                'ImageId': ami_id,
                'InstanceType': instance_type,
                'MinCount': 1,
                'MaxCount': 1,
                'TagSpecifications': [{
                    'ResourceType': 'instance',
                    'Tags': [
                        {'Key': 'Name', 'Value': name},
                        *[{'Key': k, 'Value': v} for k, v in self.config.default_tags.items()]
                    ]
                }]
            }
            
            if key_name:
                run_params['KeyName'] = key_name
            if security_group_ids:
                run_params['SecurityGroupIds'] = security_group_ids
            if subnet_id:
                run_params['SubnetId'] = subnet_id
            if user_data:
                run_params['UserData'] = user_data
            
            # Add additional parameters
            run_params.update(kwargs)
            
            # Create instance
            response = self.ec2_client.run_instances(**run_params)
            instance = response['Instances'][0]
            instance_id = instance['InstanceId']
            
            self.logger.info(f"Created EC2 instance: {instance_id} ({name})")
            
            # Wait for instance to be running
            waiter = self.ec2_client.get_waiter('instance_running')
            waiter.wait(InstanceIds=[instance_id])
            
            # Get updated instance info
            instance_info = self.get_instance_info(instance_id)
            
            return instance_info
            
        except ClientError as e:
            self.logger.error(f"Failed to create EC2 instance: {e}")
            raise
    
    def _get_latest_amazon_linux_ami(self) -> str:
        """Get the latest Amazon Linux 2 AMI ID."""
        response = self.ec2_client.describe_images(
            Owners=['amazon'],
            Filters=[
                {'Name': 'name', 'Values': ['amzn2-ami-hvm-*-x86_64-gp2']},
                {'Name': 'state', 'Values': ['available']}
            ]
        )
        
        # Sort by creation date and get the latest
        images = sorted(response['Images'], key=lambda x: x['CreationDate'], reverse=True)
        
        if images:
            return images[0]['ImageId']
        else:
            raise ValueError("No Amazon Linux 2 AMI found")
    
    def list_instances(self, filters: Optional[List[Dict]] = None) -> List[Dict[str, Any]]:
        """List EC2 instances."""
        try:
            params = {}
            if filters:
                params['Filters'] = filters
            
            response = self.ec2_client.describe_instances(**params)
            
            instances = []
            for reservation in response['Reservations']:
                for instance in reservation['Instances']:
                    instances.append({
                        'InstanceId': instance['InstanceId'],
                        'Name': self._get_tag_value(instance.get('Tags', []), 'Name'),
                        'State': instance['State']['Name'],
                        'InstanceType': instance['InstanceType'],
                        'PublicIpAddress': instance.get('PublicIpAddress'),
                        'PrivateIpAddress': instance.get('PrivateIpAddress'),
                        'LaunchTime': instance['LaunchTime'].isoformat(),
                        'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                    })
            
            return instances
            
        except ClientError as e:
            self.logger.error(f"Failed to list EC2 instances: {e}")
            raise
    
    def get_instance_info(self, instance_id: str) -> Dict[str, Any]:
        """Get detailed instance information."""
        try:
            response = self.ec2_client.describe_instances(InstanceIds=[instance_id])
            
            if response['Reservations']:
                instance = response['Reservations'][0]['Instances'][0]
                return {
                    'InstanceId': instance['InstanceId'],
                    'Name': self._get_tag_value(instance.get('Tags', []), 'Name'),
                    'State': instance['State']['Name'],
                    'InstanceType': instance['InstanceType'],
                    'PublicIpAddress': instance.get('PublicIpAddress'),
                    'PrivateIpAddress': instance.get('PrivateIpAddress'),
                    'PublicDnsName': instance.get('PublicDnsName'),
                    'PrivateDnsName': instance.get('PrivateDnsName'),
                    'LaunchTime': instance['LaunchTime'].isoformat(),
                    'SecurityGroups': instance.get('SecurityGroups', []),
                    'SubnetId': instance.get('SubnetId'),
                    'VpcId': instance.get('VpcId'),
                    'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                }
            
            return {}
            
        except ClientError as e:
            self.logger.error(f"Failed to get instance info: {e}")
            raise
    
    def stop_instance(self, instance_id: str, force: bool = False) -> bool:
        """Stop EC2 instance."""
        try:
            self.ec2_client.stop_instances(InstanceIds=[instance_id], Force=force)
            
            # Wait for instance to stop
            waiter = self.ec2_client.get_waiter('instance_stopped')
            waiter.wait(InstanceIds=[instance_id])
            
            self.logger.info(f"Stopped EC2 instance: {instance_id}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to stop instance: {e}")
            return False
    
    def start_instance(self, instance_id: str) -> bool:
        """Start EC2 instance."""
        try:
            self.ec2_client.start_instances(InstanceIds=[instance_id])
            
            # Wait for instance to start
            waiter = self.ec2_client.get_waiter('instance_running')
            waiter.wait(InstanceIds=[instance_id])
            
            self.logger.info(f"Started EC2 instance: {instance_id}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to start instance: {e}")
            return False
    
    def terminate_instance(self, instance_id: str) -> bool:
        """Terminate EC2 instance."""
        try:
            self.ec2_client.terminate_instances(InstanceIds=[instance_id])
            
            self.logger.info(f"Terminated EC2 instance: {instance_id}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to terminate instance: {e}")
            return False
    
    def create_security_group(
        self,
        name: str,
        description: str,
        vpc_id: Optional[str] = None,
        ingress_rules: Optional[List[Dict]] = None
    ) -> str:
        """Create security group."""
        try:
            params = {
                'GroupName': name,
                'Description': description
            }
            
            if vpc_id:
                params['VpcId'] = vpc_id
            
            response = self.ec2_client.create_security_group(**params)
            group_id = response['GroupId']
            
            # Add ingress rules if specified
            if ingress_rules:
                self.ec2_client.authorize_security_group_ingress(
                    GroupId=group_id,
                    IpPermissions=ingress_rules
                )
            
            self.logger.info(f"Created security group: {group_id} ({name})")
            return group_id
            
        except ClientError as e:
            self.logger.error(f"Failed to create security group: {e}")
            raise
    
    def _get_tag_value(self, tags: List[Dict], key: str) -> Optional[str]:
        """Get tag value by key."""
        for tag in tags:
            if tag['Key'] == key:
                return tag['Value']
        return None

# ==================== S3 Manager ====================

class S3Manager:
    """Manage S3 buckets and objects."""
    
    def __init__(self, session_manager: AWSSessionManager):
        self.session_manager = session_manager
        self.s3_client = session_manager.get_client('s3')
        self.s3_resource = session_manager.get_resource('s3')
        self.config = session_manager.config
        self.logger = logging.getLogger(__name__)
    
    def create_bucket(
        self,
        bucket_name: str,
        region: Optional[str] = None,
        versioning: bool = False,
        encryption: bool = True,
        public_access_block: bool = True
    ) -> bool:
        """
        Create S3 bucket with security best practices.
        
        Args:
            bucket_name: Name of the bucket
            region: AWS region (uses default if not specified)
            versioning: Enable versioning
            encryption: Enable server-side encryption
            public_access_block: Block public access
            
        Returns:
            True if successful
        """
        try:
            # Create bucket
            create_params = {'Bucket': bucket_name}
            
            # Add location constraint for regions other than us-east-1
            region = region or self.config.region
            if region != 'us-east-1':
                create_params['CreateBucketConfiguration'] = {
                    'LocationConstraint': region
                }
            
            self.s3_client.create_bucket(**create_params)
            
            # Enable versioning if requested
            if versioning:
                self.s3_client.put_bucket_versioning(
                    Bucket=bucket_name,
                    VersioningConfiguration={'Status': 'Enabled'}
                )
            
            # Enable encryption if requested
            if encryption:
                self.s3_client.put_bucket_encryption(
                    Bucket=bucket_name,
                    ServerSideEncryptionConfiguration={
                        'Rules': [{
                            'ApplyServerSideEncryptionByDefault': {
                                'SSEAlgorithm': 'AES256'
                            }
                        }]
                    }
                )
            
            # Block public access if requested
            if public_access_block:
                self.s3_client.put_public_access_block(
                    Bucket=bucket_name,
                    PublicAccessBlockConfiguration={
                        'BlockPublicAcls': True,
                        'IgnorePublicAcls': True,
                        'BlockPublicPolicy': True,
                        'RestrictPublicBuckets': True
                    }
                )
            
            # Add tags
            self.s3_client.put_bucket_tagging(
                Bucket=bucket_name,
                Tagging={
                    'TagSet': [
                        {'Key': k, 'Value': v}
                        for k, v in self.config.default_tags.items()
                    ]
                }
            )
            
            self.logger.info(f"Created S3 bucket: {bucket_name}")
            return True
            
        except ClientError as e:
            if e.response['Error']['Code'] == 'BucketAlreadyExists':
                self.logger.warning(f"Bucket already exists: {bucket_name}")
            else:
                self.logger.error(f"Failed to create bucket: {e}")
            return False
    
    def upload_file(
        self,
        file_path: str,
        bucket_name: str,
        object_key: Optional[str] = None,
        metadata: Optional[Dict] = None,
        content_type: Optional[str] = None
    ) -> bool:
        """Upload file to S3."""
        try:
            file_path = Path(file_path)
            
            if not file_path.exists():
                raise FileNotFoundError(f"File not found: {file_path}")
            
            # Use filename as key if not specified
            if not object_key:
                object_key = file_path.name
            
            # Prepare upload parameters
            extra_args = {}
            
            if metadata:
                extra_args['Metadata'] = metadata
            
            if content_type:
                extra_args['ContentType'] = content_type
            
            # Upload file
            self.s3_client.upload_file(
                str(file_path),
                bucket_name,
                object_key,
                ExtraArgs=extra_args if extra_args else None
            )
            
            self.logger.info(f"Uploaded {file_path} to s3://{bucket_name}/{object_key}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to upload file: {e}")
            return False
    
    def download_file(
        self,
        bucket_name: str,
        object_key: str,
        file_path: str
    ) -> bool:
        """Download file from S3."""
        try:
            self.s3_client.download_file(bucket_name, object_key, file_path)
            
            self.logger.info(f"Downloaded s3://{bucket_name}/{object_key} to {file_path}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to download file: {e}")
            return False
    
    def list_objects(
        self,
        bucket_name: str,
        prefix: Optional[str] = None,
        max_keys: int = 1000
    ) -> List[Dict[str, Any]]:
        """List objects in bucket."""
        try:
            params = {
                'Bucket': bucket_name,
                'MaxKeys': max_keys
            }
            
            if prefix:
                params['Prefix'] = prefix
            
            response = self.s3_client.list_objects_v2(**params)
            
            objects = []
            for obj in response.get('Contents', []):
                objects.append({
                    'Key': obj['Key'],
                    'Size': obj['Size'],
                    'LastModified': obj['LastModified'].isoformat(),
                    'ETag': obj['ETag'].strip('"')
                })
            
            return objects
            
        except ClientError as e:
            self.logger.error(f"Failed to list objects: {e}")
            return []
    
    def delete_object(self, bucket_name: str, object_key: str) -> bool:
        """Delete object from S3."""
        try:
            self.s3_client.delete_object(Bucket=bucket_name, Key=object_key)
            
            self.logger.info(f"Deleted s3://{bucket_name}/{object_key}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to delete object: {e}")
            return False
    
    def generate_presigned_url(
        self,
        bucket_name: str,
        object_key: str,
        expiration: int = 3600
    ) -> Optional[str]:
        """Generate presigned URL for object."""
        try:
            url = self.s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
            
            return url
            
        except ClientError as e:
            self.logger.error(f"Failed to generate presigned URL: {e}")
            return None

# ==================== Lambda Manager ====================

class LambdaManager:
    """Manage Lambda functions."""
    
    def __init__(self, session_manager: AWSSessionManager):
        self.session_manager = session_manager
        self.lambda_client = session_manager.get_client('lambda')
        self.iam_client = session_manager.get_client('iam')
        self.config = session_manager.config
        self.logger = logging.getLogger(__name__)
    
    def create_function(
        self,
        function_name: str,
        runtime: str,
        handler: str,
        code_zip_path: str,
        role_arn: Optional[str] = None,
        memory_size: int = 128,
        timeout: int = 30,
        environment_vars: Optional[Dict] = None
    ) -> Dict[str, Any]:
        """
        Create Lambda function.
        
        Args:
            function_name: Name of the function
            runtime: Runtime (e.g., 'python3.9')
            handler: Handler (e.g., 'lambda_function.lambda_handler')
            code_zip_path: Path to deployment package
            role_arn: IAM role ARN (creates basic role if not provided)
            memory_size: Memory in MB (128-10240)
            timeout: Timeout in seconds (1-900)
            environment_vars: Environment variables
            
        Returns:
            Function configuration
        """
        try:
            # Create basic execution role if not provided
            if not role_arn:
                role_arn = self._create_lambda_execution_role(function_name)
                # Wait for role to be available
                time.sleep(10)
            
            # Read deployment package
            with open(code_zip_path, 'rb') as f:
                zip_content = f.read()
            
            # Prepare function configuration
            config = {
                'FunctionName': function_name,
                'Runtime': runtime,
                'Role': role_arn,
                'Handler': handler,
                'Code': {'ZipFile': zip_content},
                'MemorySize': memory_size,
                'Timeout': timeout,
                'Tags': self.config.default_tags
            }
            
            if environment_vars:
                config['Environment'] = {'Variables': environment_vars}
            
            # Create function
            response = self.lambda_client.create_function(**config)
            
            self.logger.info(f"Created Lambda function: {function_name}")
            
            return {
                'FunctionName': response['FunctionName'],
                'FunctionArn': response['FunctionArn'],
                'Runtime': response['Runtime'],
                'Handler': response['Handler'],
                'MemorySize': response['MemorySize'],
                'Timeout': response['Timeout'],
                'State': response['State']
            }
            
        except ClientError as e:
            self.logger.error(f"Failed to create Lambda function: {e}")
            raise
    
    def _create_lambda_execution_role(self, function_name: str) -> str:
        """Create basic Lambda execution role."""
        role_name = f"{function_name}-execution-role"
        
        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": "sts:AssumeRole"
            }]
        }
        
        try:
            # Create role
            response = self.iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Tags=[
                    {'Key': k, 'Value': v}
                    for k, v in self.config.default_tags.items()
                ]
            )
            
            # Attach basic execution policy
            self.iam_client.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
            )
            
            return response['Role']['Arn']
            
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityAlreadyExists':
                # Role already exists, get its ARN
                response = self.iam_client.get_role(RoleName=role_name)
                return response['Role']['Arn']
            raise
    
    def invoke_function(
        self,
        function_name: str,
        payload: Optional[Dict] = None,
        invocation_type: str = 'RequestResponse'
    ) -> Dict[str, Any]:
        """
        Invoke Lambda function.
        
        Args:
            function_name: Name of the function
            payload: Input payload
            invocation_type: 'RequestResponse' (sync) or 'Event' (async)
            
        Returns:
            Response from function
        """
        try:
            params = {
                'FunctionName': function_name,
                'InvocationType': invocation_type
            }
            
            if payload:
                params['Payload'] = json.dumps(payload)
            
            response = self.lambda_client.invoke(**params)
            
            result = {
                'StatusCode': response['StatusCode'],
                'ExecutedVersion': response.get('ExecutedVersion'),
                'LogResult': response.get('LogResult')
            }
            
            if 'Payload' in response:
                result['Payload'] = json.loads(response['Payload'].read())
            
            return result
            
        except ClientError as e:
            self.logger.error(f"Failed to invoke function: {e}")
            raise
    
    def list_functions(self) -> List[Dict[str, Any]]:
        """List Lambda functions."""
        try:
            response = self.lambda_client.list_functions()
            
            functions = []
            for func in response['Functions']:
                functions.append({
                    'FunctionName': func['FunctionName'],
                    'FunctionArn': func['FunctionArn'],
                    'Runtime': func['Runtime'],
                    'Handler': func['Handler'],
                    'CodeSize': func['CodeSize'],
                    'MemorySize': func['MemorySize'],
                    'Timeout': func['Timeout'],
                    'LastModified': func['LastModified']
                })
            
            return functions
            
        except ClientError as e:
            self.logger.error(f"Failed to list functions: {e}")
            return []
    
    def update_function_code(
        self,
        function_name: str,
        code_zip_path: str
    ) -> bool:
        """Update function code."""
        try:
            with open(code_zip_path, 'rb') as f:
                zip_content = f.read()
            
            self.lambda_client.update_function_code(
                FunctionName=function_name,
                ZipFile=zip_content
            )
            
            self.logger.info(f"Updated code for function: {function_name}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to update function code: {e}")
            return False
    
    def delete_function(self, function_name: str) -> bool:
        """Delete Lambda function."""
        try:
            self.lambda_client.delete_function(FunctionName=function_name)
            
            self.logger.info(f"Deleted Lambda function: {function_name}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to delete function: {e}")
            return False

# ==================== DynamoDB Manager ====================

class DynamoDBManager:
    """Manage DynamoDB tables."""
    
    def __init__(self, session_manager: AWSSessionManager):
        self.session_manager = session_manager
        self.dynamodb_client = session_manager.get_client('dynamodb')
        self.dynamodb_resource = session_manager.get_resource('dynamodb')
        self.config = session_manager.config
        self.logger = logging.getLogger(__name__)
    
    def create_table(
        self,
        table_name: str,
        partition_key: Tuple[str, str],  # (name, type)
        sort_key: Optional[Tuple[str, str]] = None,
        read_capacity: int = 5,
        write_capacity: int = 5,
        billing_mode: str = 'PROVISIONED'
    ) -> bool:
        """
        Create DynamoDB table.
        
        Args:
            table_name: Name of the table
            partition_key: Tuple of (key_name, key_type) e.g., ('id', 'S')
            sort_key: Optional tuple of (key_name, key_type)
            read_capacity: Read capacity units
            write_capacity: Write capacity units
            billing_mode: 'PROVISIONED' or 'PAY_PER_REQUEST'
            
        Returns:
            True if successful
        """
        try:
            # Prepare key schema
            key_schema = [
                {
                    'AttributeName': partition_key[0],
                    'KeyType': 'HASH'
                }
            ]
            
            # Prepare attribute definitions
            attribute_definitions = [
                {
                    'AttributeName': partition_key[0],
                    'AttributeType': partition_key[1]
                }
            ]
            
            if sort_key:
                key_schema.append({
                    'AttributeName': sort_key[0],
                    'KeyType': 'RANGE'
                })
                attribute_definitions.append({
                    'AttributeName': sort_key[0],
                    'AttributeType': sort_key[1]
                })
            
            # Create table
            params = {
                'TableName': table_name,
                'KeySchema': key_schema,
                'AttributeDefinitions': attribute_definitions,
                'BillingMode': billing_mode,
                'Tags': [
                    {'Key': k, 'Value': v}
                    for k, v in self.config.default_tags.items()
                ]
            }
            
            if billing_mode == 'PROVISIONED':
                params['ProvisionedThroughput'] = {
                    'ReadCapacityUnits': read_capacity,
                    'WriteCapacityUnits': write_capacity
                }
            
            self.dynamodb_client.create_table(**params)
            
            # Wait for table to be active
            waiter = self.dynamodb_client.get_waiter('table_exists')
            waiter.wait(TableName=table_name)
            
            self.logger.info(f"Created DynamoDB table: {table_name}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to create table: {e}")
            return False
    
    def put_item(self, table_name: str, item: Dict[str, Any]) -> bool:
        """Put item in table."""
        try:
            table = self.dynamodb_resource.Table(table_name)
            table.put_item(Item=item)
            
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to put item: {e}")
            return False
    
    def get_item(
        self,
        table_name: str,
        key: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Get item from table."""
        try:
            table = self.dynamodb_resource.Table(table_name)
            response = table.get_item(Key=key)
            
            return response.get('Item')
            
        except ClientError as e:
            self.logger.error(f"Failed to get item: {e}")
            return None
    
    def query(
        self,
        table_name: str,
        key_condition: str,
        expression_values: Dict[str, Any],
        limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """Query table."""
        try:
            table = self.dynamodb_resource.Table(table_name)
            
            params = {
                'KeyConditionExpression': key_condition,
                'ExpressionAttributeValues': expression_values
            }
            
            if limit:
                params['Limit'] = limit
            
            response = table.query(**params)
            
            return response.get('Items', [])
            
        except ClientError as e:
            self.logger.error(f"Failed to query table: {e}")
            return []

# ==================== CloudWatch Manager ====================

class CloudWatchManager:
    """Manage CloudWatch metrics and alarms."""
    
    def __init__(self, session_manager: AWSSessionManager):
        self.session_manager = session_manager
        self.cloudwatch_client = session_manager.get_client('cloudwatch')
        self.logs_client = session_manager.get_client('logs')
        self.config = session_manager.config
        self.logger = logging.getLogger(__name__)
    
    def put_metric(
        self,
        namespace: str,
        metric_name: str,
        value: float,
        unit: str = 'None',
        dimensions: Optional[List[Dict]] = None
    ) -> bool:
        """Put custom metric to CloudWatch."""
        try:
            params = {
                'Namespace': namespace,
                'MetricData': [{
                    'MetricName': metric_name,
                    'Value': value,
                    'Unit': unit,
                    'Timestamp': datetime.now()
                }]
            }
            
            if dimensions:
                params['MetricData'][0]['Dimensions'] = dimensions
            
            self.cloudwatch_client.put_metric_data(**params)
            
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to put metric: {e}")
            return False
    
    def create_alarm(
        self,
        alarm_name: str,
        metric_name: str,
        namespace: str,
        threshold: float,
        comparison_operator: str = 'GreaterThanThreshold',
        evaluation_periods: int = 1,
        period: int = 300,
        statistic: str = 'Average'
    ) -> bool:
        """Create CloudWatch alarm."""
        try:
            self.cloudwatch_client.put_metric_alarm(
                AlarmName=alarm_name,
                ComparisonOperator=comparison_operator,
                EvaluationPeriods=evaluation_periods,
                MetricName=metric_name,
                Namespace=namespace,
                Period=period,
                Statistic=statistic,
                Threshold=threshold,
                ActionsEnabled=True,
                AlarmDescription=f'Alarm for {metric_name}',
                Tags=[
                    {'Key': k, 'Value': v}
                    for k, v in self.config.default_tags.items()
                ]
            )
            
            self.logger.info(f"Created CloudWatch alarm: {alarm_name}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to create alarm: {e}")
            return False

# ==================== Main AWS Automation Class ====================

class AWSAutomation:
    """High-level AWS automation interface."""
    
    def __init__(self, config: Optional[AWSConfig] = None):
        self.config = config or AWSConfig.from_env()
        self.session_manager = AWSSessionManager(self.config)
        
        # Initialize service managers
        self.ec2 = EC2Manager(self.session_manager)
        self.s3 = S3Manager(self.session_manager)
        self.lambda_mgr = LambdaManager(self.session_manager)
        self.dynamodb = DynamoDBManager(self.session_manager)
        self.cloudwatch = CloudWatchManager(self.session_manager)
        
        self.logger = logging.getLogger(__name__)
        
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    def get_account_info(self) -> Dict[str, Any]:
        """Get AWS account information."""
        try:
            sts_client = self.session_manager.get_client('sts')
            response = sts_client.get_caller_identity()
            
            return {
                'Account': response['Account'],
                'UserId': response['UserId'],
                'Arn': response['Arn'],
                'Region': self.config.region
            }
            
        except ClientError as e:
            self.logger.error(f"Failed to get account info: {e}")
            return {}
    
    def estimate_monthly_cost(self) -> float:
        """Estimate monthly AWS costs."""
        try:
            ce_client = self.session_manager.get_client('ce')
            
            # Get cost for current month
            start_date = datetime.now().replace(day=1).strftime('%Y-%m-%d')
            end_date = datetime.now().strftime('%Y-%m-%d')
            
            response = ce_client.get_cost_and_usage(
                TimePeriod={
                    'Start': start_date,
                    'End': end_date
                },
                Granularity='MONTHLY',
                Metrics=['UnblendedCost']
            )
            
            if response['ResultsByTime']:
                cost = float(response['ResultsByTime'][0]['Total']['UnblendedCost']['Amount'])
                return cost
            
            return 0.0
            
        except ClientError as e:
            self.logger.error(f"Failed to estimate cost: {e}")
            return 0.0

# Example usage
if __name__ == "__main__":
    print("ā˜ļø AWS SDK (boto3) Examples\n")
    
    # Example 1: Initialize AWS automation
    print("1ļøāƒ£ Initializing AWS Automation:")
    
    config = AWSConfig.from_env()
    aws = AWSAutomation(config)
    
    # Get account info
    account_info = aws.get_account_info()
    if account_info:
        print(f"   Account: {account_info.get('Account', 'N/A')}")
        print(f"   Region: {account_info.get('Region', 'N/A')}")
        print(f"   User: {account_info.get('UserId', 'N/A')}")
    
    # Example 2: EC2 operations
    print("\n2ļøāƒ£ EC2 Instance Operations:")
    
    print("   Creating instance...")
    print("   instance = aws.ec2.create_instance(")
    print("       name='web-server',")
    print("       instance_type='t2.micro',")
    print("       key_name='my-key'")
    print("   )")
    
    # List instances
    instances = aws.ec2.list_instances()
    print(f"\n   Active instances: {len(instances)}")
    for instance in instances[:3]:  # Show first 3
        print(f"     • {instance.get('Name', 'Unnamed')} ({instance['InstanceId']}) - {instance['State']}")
    
    # Example 3: S3 operations
    print("\n3ļøāƒ£ S3 Storage Operations:")
    
    print("   Creating bucket...")
    print("   aws.s3.create_bucket(")
    print("       bucket_name='my-app-data',")
    print("       versioning=True,")
    print("       encryption=True")
    print("   )")
    
    print("\n   Uploading file...")
    print("   aws.s3.upload_file(")
    print("       file_path='data.csv',")
    print("       bucket_name='my-app-data',")
    print("       object_key='data/2024/data.csv'")
    print("   )")
    
    # Example 4: Lambda operations
    print("\n4ļøāƒ£ Lambda Function Operations:")
    
    print("   Creating function...")
    print("   aws.lambda_mgr.create_function(")
    print("       function_name='data-processor',")
    print("       runtime='python3.9',")
    print("       handler='lambda_function.handler',")
    print("       code_zip_path='function.zip'")
    print("   )")
    
    # Example 5: DynamoDB operations
    print("\n5ļøāƒ£ DynamoDB Operations:")
    
    print("   Creating table...")
    print("   aws.dynamodb.create_table(")
    print("       table_name='users',")
    print("       partition_key=('user_id', 'S'),")
    print("       sort_key=('created_at', 'N')")
    print("   )")
    
    # Example 6: Common boto3 patterns
    print("\n6ļøāƒ£ Common boto3 Patterns:")
    
    patterns = [
        ("Client vs Resource", "Low-level vs High-level API"),
        ("Pagination", "Handle large result sets"),
        ("Waiters", "Wait for resource state changes"),
        ("Exception handling", "Handle ClientError properly"),
        ("Retry logic", "Implement exponential backoff"),
        ("Batch operations", "Process items in batches")
    ]
    
    for pattern, description in patterns:
        print(f"   {pattern}: {description}")
    
    # Example 7: Cost optimization
    print("\n7ļøāƒ£ Cost Optimization:")
    
    tips = [
        "Use appropriate instance types",
        "Enable S3 lifecycle policies",
        "Use Lambda for serverless workloads",
        "Implement auto-scaling",
        "Delete unused resources",
        "Use Reserved Instances for steady workloads",
        "Enable CloudWatch alarms for billing"
    ]
    
    for tip in tips:
        print(f"   • {tip}")
    
    # Example 8: Security best practices
    print("\n8ļøāƒ£ Security Best Practices:")
    
    practices = [
        "šŸ” Never hardcode credentials",
        "šŸ”‘ Use IAM roles instead of keys",
        "šŸ›”ļø Enable MFA for production",
        "šŸ“ Follow least privilege principle",
        "šŸ”’ Encrypt data at rest and in transit",
        "šŸ“Š Enable CloudTrail logging",
        "🚨 Set up security alerts",
        "šŸ”„ Rotate credentials regularly"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 9: Error handling
    print("\n9ļøāƒ£ Error Handling:")
    
    print("   try:")
    print("       response = ec2_client.describe_instances()")
    print("   except ClientError as e:")
    print("       error_code = e.response['Error']['Code']")
    print("       if error_code == 'UnauthorizedOperation':")
    print("           print('Permission denied')")
    print("       else:")
    print("           raise")
    
    # Example 10: Resource tagging
    print("\nšŸ”Ÿ Resource Tagging Strategy:")
    
    tags = [
        ("Environment", "dev/staging/prod"),
        ("Project", "Project name"),
        ("Owner", "Team or person"),
        ("CostCenter", "Billing department"),
        ("CreatedBy", "Automation tool"),
        ("CreatedAt", "Timestamp")
    ]
    
    for tag, value in tags:
        print(f"   {tag}: {value}")
    
    print("\nāœ… AWS boto3 demonstration complete!")
    print("\nšŸ“ Note: To use this code, you need:")
    print("   1. AWS Account and credentials")
    print("   2. Configure AWS CLI: aws configure")
    print("   3. Set environment variables or use IAM roles")
    print("   4. Install boto3: pip install boto3")

Key Takeaways and Best Practices šŸŽÆ

AWS Automation Best Practices šŸ“‹

Pro Tip: Think of boto3 as your Swiss Army knife for AWS - it can do everything, but you need to use it wisely. Always start with proper credential management: use IAM roles on EC2, environment variables locally, and never commit credentials to version control. Understand the difference between clients (low-level, full control) and resources (high-level, convenient) - use resources for simple operations and clients for advanced features. Implement comprehensive error handling - AWS operations can fail for many reasons (permissions, limits, network). Always tag your resources consistently for cost tracking and organization. Use waiters when creating resources to ensure they're ready before using them. Implement pagination for list operations to handle large result sets. Monitor your AWS costs regularly and set up billing alerts. Use CloudWatch for monitoring and logging. Test your automation in a development account before running in production. Most importantly: always follow the principle of least privilege - grant only the minimum permissions necessary for your automation to function!

Mastering AWS automation with boto3 enables you to harness the full power of cloud computing programmatically. You can now provision infrastructure, deploy applications, manage storage, process data, and orchestrate complex cloud workflows all through Python code. Whether you're building scalable web applications, data pipelines, or enterprise systems, these AWS automation skills empower you to leverage the cloud effectively! šŸš€