Skip to main content

šŸ’¾ Cloud Storage Management: Master Data in the Cloud

Cloud storage is the backbone of modern applications - it provides infinitely scalable, globally accessible, and highly durable storage for everything from static websites to massive data lakes. Like managing a vast digital warehouse that spans the globe, mastering cloud storage involves understanding object storage, file systems, databases, caching strategies, and data lifecycle management. Whether using AWS S3, Google Cloud Storage, or Azure Blob Storage, these skills enable you to store, organize, and retrieve data efficiently at any scale. Let's explore the comprehensive world of cloud storage management! šŸŒ

The Cloud Storage Architecture

Think of cloud storage as a hierarchy of solutions optimized for different use cases - from hot storage for frequently accessed data to cold archives for long-term retention, each tier offers different performance, durability, and cost characteristics. Using services like S3 for objects, EFS for file systems, DynamoDB for NoSQL, and RDS for relational data, you can build storage architectures that scale infinitely while optimizing for both performance and cost. Understanding these storage patterns is essential for modern cloud applications!

graph TB A[Cloud Storage Management] --> B[Storage Types] A --> C[Data Operations] A --> D[Security & Access] A --> E[Optimization] B --> F[Object Storage] B --> G[Block Storage] B --> H[File Storage] B --> I[Database Storage] C --> J[Upload/Download] C --> K[Sync & Backup] C --> L[Migration] C --> M[Replication] D --> N[Encryption] D --> O[Access Control] D --> P[Signed URLs] D --> Q[CORS] E --> R[Lifecycle Policies] E --> S[CDN Integration] E --> T[Cost Management] E --> U[Performance Tuning] V[Use Cases] --> W[Static Websites] V --> X[Data Lakes] V --> Y[Backup & Archive] V --> Z[Media Streaming] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Enterprise Cloud Storage Platform šŸ¢

You're building an enterprise storage platform that manages petabytes of data across multiple cloud providers, handles file uploads from millions of users, implements intelligent tiering for cost optimization, provides secure sharing with expiring links, syncs data across regions for disaster recovery, processes media files with automatic transcoding, maintains compliance with data retention policies, and integrates with CDNs for global content delivery. Your system must ensure 99.999999999% durability, optimize for both performance and cost, and provide comprehensive monitoring and analytics. Let's build a production-ready cloud storage framework!

# First, install required packages:
# pip install boto3 google-cloud-storage azure-storage-blob
# pip install minio watchdog pandas matplotlib tqdm
# pip install aiofiles asyncio aioboto3 concurrent-futures

import os
import json
import hashlib
import mimetypes
import asyncio
import threading
from typing import List, Dict, Optional, Any, Union, Tuple, BinaryIO
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import io

# Cloud storage libraries
import boto3
from botocore.exceptions import ClientError
from google.cloud import storage as gcs
from azure.storage.blob import BlobServiceClient, BlobClient

# Async support
import aioboto3
import aiofiles

# Progress tracking
from tqdm import tqdm

# ==================== Storage Configuration ====================

@dataclass
class StorageConfig:
    """Cloud storage configuration."""
    provider: str = "aws"  # aws, gcp, azure, multi
    
    # AWS S3
    aws_region: str = "us-east-1"
    s3_bucket: Optional[str] = None
    
    # Google Cloud Storage
    gcp_project: Optional[str] = None
    gcs_bucket: Optional[str] = None
    
    # Azure Blob Storage
    azure_account: Optional[str] = None
    azure_container: Optional[str] = None
    
    # Storage classes
    default_storage_class: str = "STANDARD"
    archive_after_days: int = 90
    delete_after_days: int = 365
    
    # Performance
    multipart_threshold: int = 100 * 1024 * 1024  # 100MB
    multipart_chunksize: int = 10 * 1024 * 1024   # 10MB
    max_concurrent_uploads: int = 10
    
    # Security
    encryption: str = "AES256"
    enable_versioning: bool = True
    enable_mfa_delete: bool = False
    
    # CDN
    enable_cdn: bool = False
    cdn_domain: Optional[str] = None
    
    # Monitoring
    enable_metrics: bool = True
    enable_logging: bool = True

class StorageClass(Enum):
    """Storage class tiers."""
    STANDARD = "STANDARD"
    INFREQUENT_ACCESS = "STANDARD_IA"
    GLACIER = "GLACIER"
    DEEP_ARCHIVE = "DEEP_ARCHIVE"
    INTELLIGENT_TIERING = "INTELLIGENT_TIERING"

# ==================== Base Storage Manager ====================

class CloudStorageManager:
    """Base class for cloud storage management."""
    
    def __init__(self, config: StorageConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
    async def upload_file_async(
        self,
        file_path: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> bool:
        """Upload file asynchronously."""
        raise NotImplementedError
    
    async def download_file_async(
        self,
        key: str,
        file_path: str
    ) -> bool:
        """Download file asynchronously."""
        raise NotImplementedError
    
    def list_objects(
        self,
        prefix: Optional[str] = None,
        max_keys: int = 1000
    ) -> List[Dict[str, Any]]:
        """List objects in storage."""
        raise NotImplementedError
    
    def delete_object(self, key: str) -> bool:
        """Delete object from storage."""
        raise NotImplementedError
    
    def generate_presigned_url(
        self,
        key: str,
        expiration: int = 3600
    ) -> str:
        """Generate presigned URL for object access."""
        raise NotImplementedError

# ==================== AWS S3 Storage Manager ====================

class S3StorageManager(CloudStorageManager):
    """AWS S3 storage management."""
    
    def __init__(self, config: StorageConfig):
        super().__init__(config)
        self.s3_client = boto3.client('s3', region_name=config.aws_region)
        self.s3_resource = boto3.resource('s3', region_name=config.aws_region)
        self.bucket_name = config.s3_bucket
        
    def create_bucket(
        self,
        bucket_name: Optional[str] = None,
        enable_versioning: bool = True,
        enable_encryption: bool = True,
        lifecycle_rules: Optional[List[Dict]] = None
    ) -> bool:
        """Create S3 bucket with best practices."""
        try:
            bucket_name = bucket_name or self.bucket_name
            
            # Create bucket
            if self.config.aws_region == 'us-east-1':
                self.s3_client.create_bucket(Bucket=bucket_name)
            else:
                self.s3_client.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration={'LocationConstraint': self.config.aws_region}
                )
            
            # Enable versioning
            if enable_versioning:
                self.s3_client.put_bucket_versioning(
                    Bucket=bucket_name,
                    VersioningConfiguration={'Status': 'Enabled'}
                )
            
            # Enable encryption
            if enable_encryption:
                self.s3_client.put_bucket_encryption(
                    Bucket=bucket_name,
                    ServerSideEncryptionConfiguration={
                        'Rules': [{
                            'ApplyServerSideEncryptionByDefault': {
                                'SSEAlgorithm': self.config.encryption
                            }
                        }]
                    }
                )
            
            # Block public access
            self.s3_client.put_public_access_block(
                Bucket=bucket_name,
                PublicAccessBlockConfiguration={
                    'BlockPublicAcls': True,
                    'IgnorePublicAcls': True,
                    'BlockPublicPolicy': True,
                    'RestrictPublicBuckets': True
                }
            )
            
            # Add lifecycle rules
            if lifecycle_rules or self.config.archive_after_days:
                self._setup_lifecycle_rules(bucket_name)
            
            # Enable logging
            if self.config.enable_logging:
                self._enable_bucket_logging(bucket_name)
            
            # Enable metrics
            if self.config.enable_metrics:
                self._enable_bucket_metrics(bucket_name)
            
            self.logger.info(f"Created S3 bucket: {bucket_name}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to create bucket: {e}")
            return False
    
    def upload_file(
        self,
        file_path: str,
        key: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
        storage_class: StorageClass = StorageClass.STANDARD,
        progress_callback: Optional[callable] = None
    ) -> bool:
        """Upload file to S3 with progress tracking."""
        try:
            file_path = Path(file_path)
            if not file_path.exists():
                raise FileNotFoundError(f"File not found: {file_path}")
            
            # Use filename as key if not specified
            key = key or file_path.name
            
            # Get file size
            file_size = file_path.stat().st_size
            
            # Prepare upload parameters
            extra_args = {
                'StorageClass': storage_class.value,
                'ServerSideEncryption': self.config.encryption
            }
            
            if metadata:
                extra_args['Metadata'] = metadata
            
            # Set content type
            content_type, _ = mimetypes.guess_type(str(file_path))
            if content_type:
                extra_args['ContentType'] = content_type
            
            # Upload with progress
            if file_size > self.config.multipart_threshold:
                # Multipart upload for large files
                self._multipart_upload(file_path, key, extra_args, progress_callback)
            else:
                # Simple upload for small files
                with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {file_path.name}") as pbar:
                    def upload_callback(bytes_transferred):
                        pbar.update(bytes_transferred - pbar.n)
                        if progress_callback:
                            progress_callback(bytes_transferred, file_size)
                    
                    self.s3_client.upload_file(
                        str(file_path),
                        self.bucket_name,
                        key,
                        ExtraArgs=extra_args,
                        Callback=upload_callback
                    )
            
            self.logger.info(f"Uploaded {file_path} to s3://{self.bucket_name}/{key}")
            return True
            
        except Exception as e:
            self.logger.error(f"Upload failed: {e}")
            return False
    
    def _multipart_upload(
        self,
        file_path: Path,
        key: str,
        extra_args: Dict,
        progress_callback: Optional[callable] = None
    ):
        """Perform multipart upload for large files."""
        from boto3.s3.transfer import TransferConfig
        
        config = TransferConfig(
            multipart_threshold=self.config.multipart_threshold,
            multipart_chunksize=self.config.multipart_chunksize,
            max_concurrency=self.config.max_concurrent_uploads,
            use_threads=True
        )
        
        file_size = file_path.stat().st_size
        
        with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {file_path.name}") as pbar:
            def upload_callback(bytes_transferred):
                pbar.update(bytes_transferred - pbar.n)
                if progress_callback:
                    progress_callback(bytes_transferred, file_size)
            
            self.s3_client.upload_file(
                str(file_path),
                self.bucket_name,
                key,
                ExtraArgs=extra_args,
                Config=config,
                Callback=upload_callback
            )
    
    async def upload_file_async(
        self,
        file_path: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> bool:
        """Upload file asynchronously."""
        async with aioboto3.Session().client('s3', region_name=self.config.aws_region) as s3:
            try:
                async with aiofiles.open(file_path, 'rb') as f:
                    data = await f.read()
                
                extra_args = {'ServerSideEncryption': self.config.encryption}
                if metadata:
                    extra_args['Metadata'] = metadata
                
                await s3.put_object(
                    Bucket=self.bucket_name,
                    Key=key,
                    Body=data,
                    **extra_args
                )
                
                self.logger.info(f"Async uploaded {file_path} to s3://{self.bucket_name}/{key}")
                return True
                
            except Exception as e:
                self.logger.error(f"Async upload failed: {e}")
                return False
    
    def download_file(
        self,
        key: str,
        file_path: str,
        progress_callback: Optional[callable] = None
    ) -> bool:
        """Download file from S3 with progress tracking."""
        try:
            # Get object size
            response = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
            file_size = response['ContentLength']
            
            with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Downloading {key}") as pbar:
                def download_callback(bytes_transferred):
                    pbar.update(bytes_transferred - pbar.n)
                    if progress_callback:
                        progress_callback(bytes_transferred, file_size)
                
                self.s3_client.download_file(
                    self.bucket_name,
                    key,
                    file_path,
                    Callback=download_callback
                )
            
            self.logger.info(f"Downloaded s3://{self.bucket_name}/{key} to {file_path}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Download failed: {e}")
            return False
    
    def sync_directory(
        self,
        local_dir: str,
        s3_prefix: str,
        delete: bool = False,
        exclude: Optional[List[str]] = None
    ) -> Dict[str, int]:
        """Sync local directory with S3."""
        local_dir = Path(local_dir)
        stats = {'uploaded': 0, 'skipped': 0, 'deleted': 0}
        
        # Get local files
        local_files = {}
        for file_path in local_dir.rglob('*'):
            if file_path.is_file():
                relative_path = file_path.relative_to(local_dir)
                
                # Check exclusions
                if exclude and any(pattern in str(relative_path) for pattern in exclude):
                    continue
                
                local_files[str(relative_path)] = {
                    'path': file_path,
                    'size': file_path.stat().st_size,
                    'mtime': file_path.stat().st_mtime
                }
        
        # Get S3 objects
        s3_objects = {}
        paginator = self.s3_client.get_paginator('list_objects_v2')
        
        for page in paginator.paginate(Bucket=self.bucket_name, Prefix=s3_prefix):
            for obj in page.get('Contents', []):
                key = obj['Key'].replace(s3_prefix + '/', '', 1)
                s3_objects[key] = {
                    'size': obj['Size'],
                    'etag': obj['ETag'].strip('"')
                }
        
        # Upload new or modified files
        for relative_path, local_info in local_files.items():
            s3_key = f"{s3_prefix}/{relative_path}"
            
            if relative_path in s3_objects:
                # Check if file needs update
                local_md5 = self._calculate_md5(local_info['path'])
                if local_md5 == s3_objects[relative_path]['etag']:
                    stats['skipped'] += 1
                    continue
            
            # Upload file
            if self.upload_file(str(local_info['path']), s3_key):
                stats['uploaded'] += 1
        
        # Delete removed files if requested
        if delete:
            for s3_key in s3_objects:
                if s3_key not in local_files:
                    if self.delete_object(f"{s3_prefix}/{s3_key}"):
                        stats['deleted'] += 1
        
        self.logger.info(f"Sync complete: {stats}")
        return stats
    
    def _calculate_md5(self, file_path: Path) -> str:
        """Calculate MD5 hash of file."""
        hash_md5 = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def list_objects(
        self,
        prefix: Optional[str] = None,
        max_keys: int = 1000
    ) -> List[Dict[str, Any]]:
        """List objects in S3 bucket."""
        try:
            params = {
                'Bucket': self.bucket_name,
                'MaxKeys': max_keys
            }
            
            if prefix:
                params['Prefix'] = prefix
            
            response = self.s3_client.list_objects_v2(**params)
            
            objects = []
            for obj in response.get('Contents', []):
                objects.append({
                    'key': obj['Key'],
                    'size': obj['Size'],
                    'last_modified': obj['LastModified'].isoformat(),
                    'etag': obj['ETag'].strip('"'),
                    'storage_class': obj.get('StorageClass', 'STANDARD')
                })
            
            return objects
            
        except ClientError as e:
            self.logger.error(f"Failed to list objects: {e}")
            return []
    
    def delete_object(self, key: str) -> bool:
        """Delete object from S3."""
        try:
            self.s3_client.delete_object(Bucket=self.bucket_name, Key=key)
            self.logger.info(f"Deleted s3://{self.bucket_name}/{key}")
            return True
            
        except ClientError as e:
            self.logger.error(f"Failed to delete object: {e}")
            return False
    
    def generate_presigned_url(
        self,
        key: str,
        expiration: int = 3600,
        http_method: str = 'GET'
    ) -> str:
        """Generate presigned URL for object access."""
        try:
            url = self.s3_client.generate_presigned_url(
                ClientMethod='get_object' if http_method == 'GET' else 'put_object',
                Params={'Bucket': self.bucket_name, 'Key': key},
                ExpiresIn=expiration
            )
            
            return url
            
        except ClientError as e:
            self.logger.error(f"Failed to generate presigned URL: {e}")
            return ""
    
    def _setup_lifecycle_rules(self, bucket_name: str):
        """Setup lifecycle rules for automatic archival and deletion."""
        rules = []
        
        # Archive to Glacier after specified days
        if self.config.archive_after_days:
            rules.append({
                'ID': 'archive-old-objects',
                'Status': 'Enabled',
                'Transitions': [{
                    'Days': self.config.archive_after_days,
                    'StorageClass': 'GLACIER'
                }]
            })
        
        # Delete after specified days
        if self.config.delete_after_days:
            rules.append({
                'ID': 'delete-expired-objects',
                'Status': 'Enabled',
                'Expiration': {
                    'Days': self.config.delete_after_days
                }
            })
        
        if rules:
            self.s3_client.put_bucket_lifecycle_configuration(
                Bucket=bucket_name,
                LifecycleConfiguration={'Rules': rules}
            )
    
    def _enable_bucket_logging(self, bucket_name: str):
        """Enable S3 bucket logging."""
        self.s3_client.put_bucket_logging(
            Bucket=bucket_name,
            BucketLoggingStatus={
                'LoggingEnabled': {
                    'TargetBucket': bucket_name,
                    'TargetPrefix': 'logs/'
                }
            }
        )
    
    def _enable_bucket_metrics(self, bucket_name: str):
        """Enable S3 bucket metrics."""
        self.s3_client.put_bucket_metrics_configuration(
            Bucket=bucket_name,
            Id='EntireBucket',
            MetricsConfiguration={
                'Id': 'EntireBucket',
                'Filter': {'Prefix': ''}
            }
        )

# ==================== Google Cloud Storage Manager ====================

class GCSStorageManager(CloudStorageManager):
    """Google Cloud Storage management."""
    
    def __init__(self, config: StorageConfig):
        super().__init__(config)
        self.client = gcs.Client(project=config.gcp_project)
        self.bucket_name = config.gcs_bucket
        
    def create_bucket(
        self,
        bucket_name: Optional[str] = None,
        location: str = "us",
        storage_class: str = "STANDARD"
    ) -> bool:
        """Create GCS bucket."""
        try:
            bucket_name = bucket_name or self.bucket_name
            
            bucket = self.client.bucket(bucket_name)
            bucket.location = location
            bucket.storage_class = storage_class
            
            # Enable versioning
            if self.config.enable_versioning:
                bucket.versioning_enabled = True
            
            # Create bucket
            bucket = self.client.create_bucket(bucket)
            
            # Set lifecycle rules
            if self.config.archive_after_days:
                bucket.add_lifecycle_rule({
                    'action': {'type': 'SetStorageClass', 'storageClass': 'NEARLINE'},
                    'condition': {'age': self.config.archive_after_days}
                })
            
            if self.config.delete_after_days:
                bucket.add_lifecycle_rule({
                    'action': {'type': 'Delete'},
                    'condition': {'age': self.config.delete_after_days}
                })
            
            bucket.patch()
            
            self.logger.info(f"Created GCS bucket: {bucket_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create GCS bucket: {e}")
            return False
    
    def upload_file(
        self,
        file_path: str,
        blob_name: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None
    ) -> bool:
        """Upload file to GCS."""
        try:
            file_path = Path(file_path)
            blob_name = blob_name or file_path.name
            
            bucket = self.client.bucket(self.bucket_name)
            blob = bucket.blob(blob_name)
            
            # Set metadata
            if metadata:
                blob.metadata = metadata
            
            # Set content type
            content_type, _ = mimetypes.guess_type(str(file_path))
            if content_type:
                blob.content_type = content_type
            
            # Upload with resumable upload for large files
            file_size = file_path.stat().st_size
            if file_size > self.config.multipart_threshold:
                blob.chunk_size = self.config.multipart_chunksize
            
            blob.upload_from_filename(str(file_path))
            
            self.logger.info(f"Uploaded {file_path} to gs://{self.bucket_name}/{blob_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"GCS upload failed: {e}")
            return False
    
    def download_file(self, blob_name: str, file_path: str) -> bool:
        """Download file from GCS."""
        try:
            bucket = self.client.bucket(self.bucket_name)
            blob = bucket.blob(blob_name)
            
            blob.download_to_filename(file_path)
            
            self.logger.info(f"Downloaded gs://{self.bucket_name}/{blob_name} to {file_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"GCS download failed: {e}")
            return False
    
    def list_objects(
        self,
        prefix: Optional[str] = None,
        max_keys: int = 1000
    ) -> List[Dict[str, Any]]:
        """List objects in GCS bucket."""
        try:
            bucket = self.client.bucket(self.bucket_name)
            blobs = bucket.list_blobs(prefix=prefix, max_results=max_keys)
            
            objects = []
            for blob in blobs:
                objects.append({
                    'key': blob.name,
                    'size': blob.size,
                    'last_modified': blob.updated.isoformat() if blob.updated else None,
                    'etag': blob.etag,
                    'storage_class': blob.storage_class
                })
            
            return objects
            
        except Exception as e:
            self.logger.error(f"Failed to list GCS objects: {e}")
            return []

# ==================== Azure Blob Storage Manager ====================

class AzureStorageManager(CloudStorageManager):
    """Azure Blob Storage management."""
    
    def __init__(self, config: StorageConfig):
        super().__init__(config)
        self.blob_service_client = BlobServiceClient.from_connection_string(
            os.getenv('AZURE_STORAGE_CONNECTION_STRING')
        )
        self.container_name = config.azure_container
    
    def create_container(
        self,
        container_name: Optional[str] = None,
        public_access: str = 'off'
    ) -> bool:
        """Create Azure container."""
        try:
            container_name = container_name or self.container_name
            
            container_client = self.blob_service_client.create_container(
                container_name,
                public_access=public_access
            )
            
            self.logger.info(f"Created Azure container: {container_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create Azure container: {e}")
            return False
    
    def upload_file(
        self,
        file_path: str,
        blob_name: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None
    ) -> bool:
        """Upload file to Azure Blob Storage."""
        try:
            file_path = Path(file_path)
            blob_name = blob_name or file_path.name
            
            blob_client = self.blob_service_client.get_blob_client(
                container=self.container_name,
                blob=blob_name
            )
            
            with open(file_path, 'rb') as f:
                blob_client.upload_blob(
                    f,
                    overwrite=True,
                    metadata=metadata
                )
            
            self.logger.info(f"Uploaded {file_path} to Azure container {self.container_name}/{blob_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Azure upload failed: {e}")
            return False

# ==================== Multi-Cloud Storage Manager ====================

class MultiCloudStorageManager:
    """Manage storage across multiple cloud providers."""
    
    def __init__(self, config: StorageConfig):
        self.config = config
        self.providers = {}
        self.logger = logging.getLogger(__name__)
        
        # Initialize providers
        if config.s3_bucket:
            self.providers['aws'] = S3StorageManager(config)
        
        if config.gcs_bucket:
            self.providers['gcp'] = GCSStorageManager(config)
        
        if config.azure_container:
            self.providers['azure'] = AzureStorageManager(config)
    
    def upload_to_all(
        self,
        file_path: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> Dict[str, bool]:
        """Upload file to all configured providers."""
        results = {}
        
        with ThreadPoolExecutor(max_workers=len(self.providers)) as executor:
            futures = {}
            
            for provider_name, provider in self.providers.items():
                future = executor.submit(provider.upload_file, file_path, key, metadata)
                futures[future] = provider_name
            
            for future in as_completed(futures):
                provider_name = futures[future]
                try:
                    results[provider_name] = future.result()
                except Exception as e:
                    self.logger.error(f"Upload to {provider_name} failed: {e}")
                    results[provider_name] = False
        
        return results
    
    def replicate_across_clouds(
        self,
        source_provider: str,
        source_key: str,
        target_providers: List[str]
    ) -> Dict[str, bool]:
        """Replicate object across cloud providers."""
        results = {}
        
        # Download from source
        temp_file = f"/tmp/{source_key}"
        source = self.providers[source_provider]
        
        if not source.download_file(source_key, temp_file):
            self.logger.error("Failed to download from source")
            return results
        
        # Upload to targets
        for target_name in target_providers:
            if target_name in self.providers:
                target = self.providers[target_name]
                results[target_name] = target.upload_file(temp_file, source_key)
        
        # Clean up temp file
        os.remove(temp_file)
        
        return results

# ==================== Storage Analytics ====================

class StorageAnalytics:
    """Analyze storage usage and costs."""
    
    def __init__(self, storage_manager: CloudStorageManager):
        self.storage = storage_manager
        self.logger = logging.getLogger(__name__)
    
    def analyze_usage(self) -> Dict[str, Any]:
        """Analyze storage usage patterns."""
        objects = self.storage.list_objects()
        
        if not objects:
            return {}
        
        total_size = sum(obj['size'] for obj in objects)
        total_count = len(objects)
        
        # Group by storage class
        by_class = {}
        for obj in objects:
            storage_class = obj.get('storage_class', 'STANDARD')
            if storage_class not in by_class:
                by_class[storage_class] = {'count': 0, 'size': 0}
            by_class[storage_class]['count'] += 1
            by_class[storage_class]['size'] += obj['size']
        
        # File type analysis
        by_type = {}
        for obj in objects:
            ext = Path(obj['key']).suffix.lower()
            if ext not in by_type:
                by_type[ext] = {'count': 0, 'size': 0}
            by_type[ext]['count'] += 1
            by_type[ext]['size'] += obj['size']
        
        return {
            'total_size': total_size,
            'total_count': total_count,
            'average_size': total_size / total_count if total_count > 0 else 0,
            'by_storage_class': by_class,
            'by_file_type': by_type
        }
    
    def estimate_costs(self) -> Dict[str, float]:
        """Estimate storage costs."""
        # Simplified cost estimation (actual costs vary by region and usage)
        pricing = {
            'STANDARD': 0.023,      # per GB per month
            'STANDARD_IA': 0.0125,
            'GLACIER': 0.004,
            'DEEP_ARCHIVE': 0.00099
        }
        
        usage = self.analyze_usage()
        costs = {}
        
        for storage_class, data in usage.get('by_storage_class', {}).items():
            size_gb = data['size'] / (1024 ** 3)
            rate = pricing.get(storage_class, 0.023)
            costs[storage_class] = size_gb * rate
        
        costs['total'] = sum(costs.values())
        
        return costs
    
    def generate_report(self) -> str:
        """Generate storage analytics report."""
        usage = self.analyze_usage()
        costs = self.estimate_costs()
        
        report = f"""
Storage Analytics Report
========================

Total Storage Usage:
- Objects: {usage.get('total_count', 0):,}
- Size: {self._format_bytes(usage.get('total_size', 0))}
- Average Size: {self._format_bytes(usage.get('average_size', 0))}

Storage Class Distribution:
"""
        
        for storage_class, data in usage.get('by_storage_class', {}).items():
            report += f"  {storage_class}:\n"
            report += f"    Count: {data['count']:,}\n"
            report += f"    Size: {self._format_bytes(data['size'])}\n"
            report += f"    Cost: ${costs.get(storage_class, 0):.2f}/month\n"
        
        report += f"\nEstimated Monthly Cost: ${costs.get('total', 0):.2f}\n"
        
        return report
    
    def _format_bytes(self, bytes_value: int) -> str:
        """Format bytes to human readable string."""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} PB"

# ==================== CDN Integration ====================

class CDNIntegration:
    """Integrate storage with CDN."""
    
    def __init__(self, storage_manager: CloudStorageManager):
        self.storage = storage_manager
        self.logger = logging.getLogger(__name__)
    
    def setup_cloudfront_distribution(
        self,
        bucket_name: str,
        distribution_comment: str = "CDN Distribution"
    ):
        """Setup AWS CloudFront distribution."""
        cloudfront = boto3.client('cloudfront')
        
        origin_id = f"S3-{bucket_name}"
        
        distribution_config = {
            'CallerReference': str(datetime.now().timestamp()),
            'Comment': distribution_comment,
            'Enabled': True,
            'Origins': {
                'Quantity': 1,
                'Items': [{
                    'Id': origin_id,
                    'DomainName': f"{bucket_name}.s3.amazonaws.com",
                    'S3OriginConfig': {
                        'OriginAccessIdentity': ''
                    }
                }]
            },
            'DefaultCacheBehavior': {
                'TargetOriginId': origin_id,
                'ViewerProtocolPolicy': 'redirect-to-https',
                'TrustedSigners': {
                    'Enabled': False,
                    'Quantity': 0
                },
                'ForwardedValues': {
                    'QueryString': False,
                    'Cookies': {'Forward': 'none'}
                },
                'MinTTL': 0,
                'DefaultTTL': 86400,
                'MaxTTL': 31536000
            }
        }
        
        try:
            response = cloudfront.create_distribution(
                DistributionConfig=distribution_config
            )
            
            distribution_id = response['Distribution']['Id']
            domain_name = response['Distribution']['DomainName']
            
            self.logger.info(f"Created CloudFront distribution: {distribution_id}")
            self.logger.info(f"CDN Domain: {domain_name}")
            
            return domain_name
            
        except Exception as e:
            self.logger.error(f"Failed to create CloudFront distribution: {e}")
            return None

# Example usage
if __name__ == "__main__":
    print("šŸ’¾ Cloud Storage Management Examples\n")
    
    # Example 1: Initialize storage
    print("1ļøāƒ£ Initializing Cloud Storage:")
    
    config = StorageConfig(
        provider="aws",
        s3_bucket="my-data-bucket",
        enable_versioning=True,
        archive_after_days=90
    )
    
    s3_storage = S3StorageManager(config)
    
    print(f"   Provider: AWS S3")
    print(f"   Bucket: {config.s3_bucket}")
    print(f"   Versioning: {config.enable_versioning}")
    print(f"   Archive after: {config.archive_after_days} days")
    
    # Example 2: Upload operations
    print("\n2ļøāƒ£ Upload Operations:")
    
    print("   # Simple upload")
    print("   s3_storage.upload_file('data.csv', 'data/2024/data.csv')")
    print("\n   # Multipart upload for large files")
    print("   s3_storage.upload_file('video.mp4', storage_class=StorageClass.GLACIER)")
    print("\n   # Async upload")
    print("   await s3_storage.upload_file_async('image.jpg', 'images/image.jpg')")
    
    # Example 3: Storage classes
    print("\n3ļøāƒ£ Storage Classes (Cost vs Performance):")
    
    classes = [
        ("STANDARD", "Frequent access", "$0.023/GB"),
        ("STANDARD_IA", "Infrequent access", "$0.0125/GB"),
        ("GLACIER", "Archive", "$0.004/GB"),
        ("DEEP_ARCHIVE", "Long-term archive", "$0.00099/GB"),
        ("INTELLIGENT_TIERING", "Automatic tiering", "Variable")
    ]
    
    for storage_class, use_case, cost in classes:
        print(f"   {storage_class}: {use_case} ({cost}/month)")
    
    # Example 4: Lifecycle policies
    print("\n4ļøāƒ£ Lifecycle Policy Example:")
    
    print("   Days 0-30: STANDARD (hot data)")
    print("   Days 31-90: STANDARD_IA (warm data)")
    print("   Days 91-365: GLACIER (cold data)")
    print("   After 365 days: DELETE")
    
    # Example 5: Sync operations
    print("\n5ļøāƒ£ Directory Sync:")
    
    print("   # Sync local to cloud")
    print("   s3_storage.sync_directory(")
    print("       local_dir='/data/backup',")
    print("       s3_prefix='backups/2024',")
    print("       delete=True  # Remove deleted files")
    print("   )")
    
    # Example 6: Presigned URLs
    print("\n6ļøāƒ£ Secure Sharing with Presigned URLs:")
    
    print("   # Generate temporary download link")
    print("   url = s3_storage.generate_presigned_url(")
    print("       key='private/document.pdf',")
    print("       expiration=3600  # 1 hour")
    print("   )")
    
    # Example 7: Multi-cloud replication
    print("\n7ļøāƒ£ Multi-Cloud Replication:")
    
    print("   multi_cloud = MultiCloudStorageManager(config)")
    print("   multi_cloud.replicate_across_clouds(")
    print("       source_provider='aws',")
    print("       source_key='important-data.zip',")
    print("       target_providers=['gcp', 'azure']")
    print("   )")
    
    # Example 8: Storage analytics
    print("\n8ļøāƒ£ Storage Analytics:")
    
    print("   analytics = StorageAnalytics(s3_storage)")
    print("   usage = analytics.analyze_usage()")
    print("   costs = analytics.estimate_costs()")
    print("   report = analytics.generate_report()")
    
    # Example 9: Best practices
    print("\n9ļøāƒ£ Cloud Storage Best Practices:")
    
    practices = [
        "šŸ”’ Enable encryption at rest",
        "šŸ“ Use versioning for critical data",
        "šŸ·ļø Tag resources for organization",
        "šŸ“Š Monitor access logs",
        "šŸ’° Implement lifecycle policies",
        "šŸŒ Use CDN for global distribution",
        "šŸ” Use IAM for access control",
        "šŸ“¦ Compress data before storage",
        "šŸ”„ Regular backups to different regions",
        "⚔ Use appropriate storage classes"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    # Example 10: Performance tips
    print("\nšŸ”Ÿ Performance Optimization:")
    
    tips = [
        "Use multipart uploads for files > 100MB",
        "Enable transfer acceleration for global uploads",
        "Use byte-range fetches for partial downloads",
        "Implement request coalescing",
        "Cache frequently accessed data",
        "Use parallel uploads/downloads",
        "Optimize file formats (Parquet for analytics)",
        "Enable S3 Select for query pushdown"
    ]
    
    for tip in tips:
        print(f"   • {tip}")
    
    print("\nāœ… Cloud storage management demonstration complete!")

Key Takeaways and Best Practices šŸŽÆ

Cloud Storage Best Practices šŸ“‹

Pro Tip: Think of cloud storage as a hierarchy of solutions - each tier offers different trade-offs between cost, performance, and durability. Start by understanding your data access patterns: hot data needs fast storage (STANDARD), warm data can use cheaper tiers (INFREQUENT_ACCESS), and cold data should be archived (GLACIER). Always enable encryption at rest and use IAM for fine-grained access control. Implement lifecycle policies from day one to automatically move data to appropriate tiers and delete expired data. Use versioning for critical data to protect against accidental deletion or corruption. For large files, use multipart uploads to improve reliability and performance. Implement proper tagging for cost allocation and organization. Use presigned URLs for secure temporary access instead of making objects public. Monitor your storage costs regularly - they can grow quickly. Consider multi-cloud replication for critical data to avoid vendor lock-in. Use CDN integration for frequently accessed public content. Most importantly: treat cloud storage as code - version control your bucket configurations, lifecycle policies, and access controls!

Mastering cloud storage management enables you to build scalable, reliable, and cost-effective data solutions in the cloud. You can now handle everything from simple file uploads to petabyte-scale data lakes, implement intelligent tiering for cost optimization, ensure data durability with replication, and provide global access through CDN integration. Whether you're building backup systems, content delivery platforms, or big data solutions, these cloud storage skills are essential for modern applications! šŸš€