š¾ Cloud Storage Management: Master Data in the Cloud
Cloud storage is the backbone of modern applications - it provides infinitely scalable, globally accessible, and highly durable storage for everything from static websites to massive data lakes. Like managing a vast digital warehouse that spans the globe, mastering cloud storage involves understanding object storage, file systems, databases, caching strategies, and data lifecycle management. Whether using AWS S3, Google Cloud Storage, or Azure Blob Storage, these skills enable you to store, organize, and retrieve data efficiently at any scale. Let's explore the comprehensive world of cloud storage management! š
The Cloud Storage Architecture
Think of cloud storage as a hierarchy of solutions optimized for different use cases - from hot storage for frequently accessed data to cold archives for long-term retention, each tier offers different performance, durability, and cost characteristics. Using services like S3 for objects, EFS for file systems, DynamoDB for NoSQL, and RDS for relational data, you can build storage architectures that scale infinitely while optimizing for both performance and cost. Understanding these storage patterns is essential for modern cloud applications!
Real-World Scenario: The Enterprise Cloud Storage Platform š¢
You're building an enterprise storage platform that manages petabytes of data across multiple cloud providers, handles file uploads from millions of users, implements intelligent tiering for cost optimization, provides secure sharing with expiring links, syncs data across regions for disaster recovery, processes media files with automatic transcoding, maintains compliance with data retention policies, and integrates with CDNs for global content delivery. Your system must ensure 99.999999999% durability, optimize for both performance and cost, and provide comprehensive monitoring and analytics. Let's build a production-ready cloud storage framework!
# First, install required packages:
# pip install boto3 google-cloud-storage azure-storage-blob
# pip install minio watchdog pandas matplotlib tqdm
# pip install aiofiles asyncio aioboto3 concurrent-futures
import os
import json
import hashlib
import mimetypes
import asyncio
import threading
from typing import List, Dict, Optional, Any, Union, Tuple, BinaryIO
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import io
# Cloud storage libraries
import boto3
from botocore.exceptions import ClientError
from google.cloud import storage as gcs
from azure.storage.blob import BlobServiceClient, BlobClient
# Async support
import aioboto3
import aiofiles
# Progress tracking
from tqdm import tqdm
# ==================== Storage Configuration ====================
@dataclass
class StorageConfig:
"""Cloud storage configuration."""
provider: str = "aws" # aws, gcp, azure, multi
# AWS S3
aws_region: str = "us-east-1"
s3_bucket: Optional[str] = None
# Google Cloud Storage
gcp_project: Optional[str] = None
gcs_bucket: Optional[str] = None
# Azure Blob Storage
azure_account: Optional[str] = None
azure_container: Optional[str] = None
# Storage classes
default_storage_class: str = "STANDARD"
archive_after_days: int = 90
delete_after_days: int = 365
# Performance
multipart_threshold: int = 100 * 1024 * 1024 # 100MB
multipart_chunksize: int = 10 * 1024 * 1024 # 10MB
max_concurrent_uploads: int = 10
# Security
encryption: str = "AES256"
enable_versioning: bool = True
enable_mfa_delete: bool = False
# CDN
enable_cdn: bool = False
cdn_domain: Optional[str] = None
# Monitoring
enable_metrics: bool = True
enable_logging: bool = True
class StorageClass(Enum):
"""Storage class tiers."""
STANDARD = "STANDARD"
INFREQUENT_ACCESS = "STANDARD_IA"
GLACIER = "GLACIER"
DEEP_ARCHIVE = "DEEP_ARCHIVE"
INTELLIGENT_TIERING = "INTELLIGENT_TIERING"
# ==================== Base Storage Manager ====================
class CloudStorageManager:
"""Base class for cloud storage management."""
def __init__(self, config: StorageConfig):
self.config = config
self.logger = logging.getLogger(__name__)
async def upload_file_async(
self,
file_path: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> bool:
"""Upload file asynchronously."""
raise NotImplementedError
async def download_file_async(
self,
key: str,
file_path: str
) -> bool:
"""Download file asynchronously."""
raise NotImplementedError
def list_objects(
self,
prefix: Optional[str] = None,
max_keys: int = 1000
) -> List[Dict[str, Any]]:
"""List objects in storage."""
raise NotImplementedError
def delete_object(self, key: str) -> bool:
"""Delete object from storage."""
raise NotImplementedError
def generate_presigned_url(
self,
key: str,
expiration: int = 3600
) -> str:
"""Generate presigned URL for object access."""
raise NotImplementedError
# ==================== AWS S3 Storage Manager ====================
class S3StorageManager(CloudStorageManager):
"""AWS S3 storage management."""
def __init__(self, config: StorageConfig):
super().__init__(config)
self.s3_client = boto3.client('s3', region_name=config.aws_region)
self.s3_resource = boto3.resource('s3', region_name=config.aws_region)
self.bucket_name = config.s3_bucket
def create_bucket(
self,
bucket_name: Optional[str] = None,
enable_versioning: bool = True,
enable_encryption: bool = True,
lifecycle_rules: Optional[List[Dict]] = None
) -> bool:
"""Create S3 bucket with best practices."""
try:
bucket_name = bucket_name or self.bucket_name
# Create bucket
if self.config.aws_region == 'us-east-1':
self.s3_client.create_bucket(Bucket=bucket_name)
else:
self.s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': self.config.aws_region}
)
# Enable versioning
if enable_versioning:
self.s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# Enable encryption
if enable_encryption:
self.s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': self.config.encryption
}
}]
}
)
# Block public access
self.s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
# Add lifecycle rules
if lifecycle_rules or self.config.archive_after_days:
self._setup_lifecycle_rules(bucket_name)
# Enable logging
if self.config.enable_logging:
self._enable_bucket_logging(bucket_name)
# Enable metrics
if self.config.enable_metrics:
self._enable_bucket_metrics(bucket_name)
self.logger.info(f"Created S3 bucket: {bucket_name}")
return True
except ClientError as e:
self.logger.error(f"Failed to create bucket: {e}")
return False
def upload_file(
self,
file_path: str,
key: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
storage_class: StorageClass = StorageClass.STANDARD,
progress_callback: Optional[callable] = None
) -> bool:
"""Upload file to S3 with progress tracking."""
try:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Use filename as key if not specified
key = key or file_path.name
# Get file size
file_size = file_path.stat().st_size
# Prepare upload parameters
extra_args = {
'StorageClass': storage_class.value,
'ServerSideEncryption': self.config.encryption
}
if metadata:
extra_args['Metadata'] = metadata
# Set content type
content_type, _ = mimetypes.guess_type(str(file_path))
if content_type:
extra_args['ContentType'] = content_type
# Upload with progress
if file_size > self.config.multipart_threshold:
# Multipart upload for large files
self._multipart_upload(file_path, key, extra_args, progress_callback)
else:
# Simple upload for small files
with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {file_path.name}") as pbar:
def upload_callback(bytes_transferred):
pbar.update(bytes_transferred - pbar.n)
if progress_callback:
progress_callback(bytes_transferred, file_size)
self.s3_client.upload_file(
str(file_path),
self.bucket_name,
key,
ExtraArgs=extra_args,
Callback=upload_callback
)
self.logger.info(f"Uploaded {file_path} to s3://{self.bucket_name}/{key}")
return True
except Exception as e:
self.logger.error(f"Upload failed: {e}")
return False
def _multipart_upload(
self,
file_path: Path,
key: str,
extra_args: Dict,
progress_callback: Optional[callable] = None
):
"""Perform multipart upload for large files."""
from boto3.s3.transfer import TransferConfig
config = TransferConfig(
multipart_threshold=self.config.multipart_threshold,
multipart_chunksize=self.config.multipart_chunksize,
max_concurrency=self.config.max_concurrent_uploads,
use_threads=True
)
file_size = file_path.stat().st_size
with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {file_path.name}") as pbar:
def upload_callback(bytes_transferred):
pbar.update(bytes_transferred - pbar.n)
if progress_callback:
progress_callback(bytes_transferred, file_size)
self.s3_client.upload_file(
str(file_path),
self.bucket_name,
key,
ExtraArgs=extra_args,
Config=config,
Callback=upload_callback
)
async def upload_file_async(
self,
file_path: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> bool:
"""Upload file asynchronously."""
async with aioboto3.Session().client('s3', region_name=self.config.aws_region) as s3:
try:
async with aiofiles.open(file_path, 'rb') as f:
data = await f.read()
extra_args = {'ServerSideEncryption': self.config.encryption}
if metadata:
extra_args['Metadata'] = metadata
await s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=data,
**extra_args
)
self.logger.info(f"Async uploaded {file_path} to s3://{self.bucket_name}/{key}")
return True
except Exception as e:
self.logger.error(f"Async upload failed: {e}")
return False
def download_file(
self,
key: str,
file_path: str,
progress_callback: Optional[callable] = None
) -> bool:
"""Download file from S3 with progress tracking."""
try:
# Get object size
response = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
file_size = response['ContentLength']
with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Downloading {key}") as pbar:
def download_callback(bytes_transferred):
pbar.update(bytes_transferred - pbar.n)
if progress_callback:
progress_callback(bytes_transferred, file_size)
self.s3_client.download_file(
self.bucket_name,
key,
file_path,
Callback=download_callback
)
self.logger.info(f"Downloaded s3://{self.bucket_name}/{key} to {file_path}")
return True
except ClientError as e:
self.logger.error(f"Download failed: {e}")
return False
def sync_directory(
self,
local_dir: str,
s3_prefix: str,
delete: bool = False,
exclude: Optional[List[str]] = None
) -> Dict[str, int]:
"""Sync local directory with S3."""
local_dir = Path(local_dir)
stats = {'uploaded': 0, 'skipped': 0, 'deleted': 0}
# Get local files
local_files = {}
for file_path in local_dir.rglob('*'):
if file_path.is_file():
relative_path = file_path.relative_to(local_dir)
# Check exclusions
if exclude and any(pattern in str(relative_path) for pattern in exclude):
continue
local_files[str(relative_path)] = {
'path': file_path,
'size': file_path.stat().st_size,
'mtime': file_path.stat().st_mtime
}
# Get S3 objects
s3_objects = {}
paginator = self.s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket_name, Prefix=s3_prefix):
for obj in page.get('Contents', []):
key = obj['Key'].replace(s3_prefix + '/', '', 1)
s3_objects[key] = {
'size': obj['Size'],
'etag': obj['ETag'].strip('"')
}
# Upload new or modified files
for relative_path, local_info in local_files.items():
s3_key = f"{s3_prefix}/{relative_path}"
if relative_path in s3_objects:
# Check if file needs update
local_md5 = self._calculate_md5(local_info['path'])
if local_md5 == s3_objects[relative_path]['etag']:
stats['skipped'] += 1
continue
# Upload file
if self.upload_file(str(local_info['path']), s3_key):
stats['uploaded'] += 1
# Delete removed files if requested
if delete:
for s3_key in s3_objects:
if s3_key not in local_files:
if self.delete_object(f"{s3_prefix}/{s3_key}"):
stats['deleted'] += 1
self.logger.info(f"Sync complete: {stats}")
return stats
def _calculate_md5(self, file_path: Path) -> str:
"""Calculate MD5 hash of file."""
hash_md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def list_objects(
self,
prefix: Optional[str] = None,
max_keys: int = 1000
) -> List[Dict[str, Any]]:
"""List objects in S3 bucket."""
try:
params = {
'Bucket': self.bucket_name,
'MaxKeys': max_keys
}
if prefix:
params['Prefix'] = prefix
response = self.s3_client.list_objects_v2(**params)
objects = []
for obj in response.get('Contents', []):
objects.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'etag': obj['ETag'].strip('"'),
'storage_class': obj.get('StorageClass', 'STANDARD')
})
return objects
except ClientError as e:
self.logger.error(f"Failed to list objects: {e}")
return []
def delete_object(self, key: str) -> bool:
"""Delete object from S3."""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=key)
self.logger.info(f"Deleted s3://{self.bucket_name}/{key}")
return True
except ClientError as e:
self.logger.error(f"Failed to delete object: {e}")
return False
def generate_presigned_url(
self,
key: str,
expiration: int = 3600,
http_method: str = 'GET'
) -> str:
"""Generate presigned URL for object access."""
try:
url = self.s3_client.generate_presigned_url(
ClientMethod='get_object' if http_method == 'GET' else 'put_object',
Params={'Bucket': self.bucket_name, 'Key': key},
ExpiresIn=expiration
)
return url
except ClientError as e:
self.logger.error(f"Failed to generate presigned URL: {e}")
return ""
def _setup_lifecycle_rules(self, bucket_name: str):
"""Setup lifecycle rules for automatic archival and deletion."""
rules = []
# Archive to Glacier after specified days
if self.config.archive_after_days:
rules.append({
'ID': 'archive-old-objects',
'Status': 'Enabled',
'Transitions': [{
'Days': self.config.archive_after_days,
'StorageClass': 'GLACIER'
}]
})
# Delete after specified days
if self.config.delete_after_days:
rules.append({
'ID': 'delete-expired-objects',
'Status': 'Enabled',
'Expiration': {
'Days': self.config.delete_after_days
}
})
if rules:
self.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration={'Rules': rules}
)
def _enable_bucket_logging(self, bucket_name: str):
"""Enable S3 bucket logging."""
self.s3_client.put_bucket_logging(
Bucket=bucket_name,
BucketLoggingStatus={
'LoggingEnabled': {
'TargetBucket': bucket_name,
'TargetPrefix': 'logs/'
}
}
)
def _enable_bucket_metrics(self, bucket_name: str):
"""Enable S3 bucket metrics."""
self.s3_client.put_bucket_metrics_configuration(
Bucket=bucket_name,
Id='EntireBucket',
MetricsConfiguration={
'Id': 'EntireBucket',
'Filter': {'Prefix': ''}
}
)
# ==================== Google Cloud Storage Manager ====================
class GCSStorageManager(CloudStorageManager):
"""Google Cloud Storage management."""
def __init__(self, config: StorageConfig):
super().__init__(config)
self.client = gcs.Client(project=config.gcp_project)
self.bucket_name = config.gcs_bucket
def create_bucket(
self,
bucket_name: Optional[str] = None,
location: str = "us",
storage_class: str = "STANDARD"
) -> bool:
"""Create GCS bucket."""
try:
bucket_name = bucket_name or self.bucket_name
bucket = self.client.bucket(bucket_name)
bucket.location = location
bucket.storage_class = storage_class
# Enable versioning
if self.config.enable_versioning:
bucket.versioning_enabled = True
# Create bucket
bucket = self.client.create_bucket(bucket)
# Set lifecycle rules
if self.config.archive_after_days:
bucket.add_lifecycle_rule({
'action': {'type': 'SetStorageClass', 'storageClass': 'NEARLINE'},
'condition': {'age': self.config.archive_after_days}
})
if self.config.delete_after_days:
bucket.add_lifecycle_rule({
'action': {'type': 'Delete'},
'condition': {'age': self.config.delete_after_days}
})
bucket.patch()
self.logger.info(f"Created GCS bucket: {bucket_name}")
return True
except Exception as e:
self.logger.error(f"Failed to create GCS bucket: {e}")
return False
def upload_file(
self,
file_path: str,
blob_name: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None
) -> bool:
"""Upload file to GCS."""
try:
file_path = Path(file_path)
blob_name = blob_name or file_path.name
bucket = self.client.bucket(self.bucket_name)
blob = bucket.blob(blob_name)
# Set metadata
if metadata:
blob.metadata = metadata
# Set content type
content_type, _ = mimetypes.guess_type(str(file_path))
if content_type:
blob.content_type = content_type
# Upload with resumable upload for large files
file_size = file_path.stat().st_size
if file_size > self.config.multipart_threshold:
blob.chunk_size = self.config.multipart_chunksize
blob.upload_from_filename(str(file_path))
self.logger.info(f"Uploaded {file_path} to gs://{self.bucket_name}/{blob_name}")
return True
except Exception as e:
self.logger.error(f"GCS upload failed: {e}")
return False
def download_file(self, blob_name: str, file_path: str) -> bool:
"""Download file from GCS."""
try:
bucket = self.client.bucket(self.bucket_name)
blob = bucket.blob(blob_name)
blob.download_to_filename(file_path)
self.logger.info(f"Downloaded gs://{self.bucket_name}/{blob_name} to {file_path}")
return True
except Exception as e:
self.logger.error(f"GCS download failed: {e}")
return False
def list_objects(
self,
prefix: Optional[str] = None,
max_keys: int = 1000
) -> List[Dict[str, Any]]:
"""List objects in GCS bucket."""
try:
bucket = self.client.bucket(self.bucket_name)
blobs = bucket.list_blobs(prefix=prefix, max_results=max_keys)
objects = []
for blob in blobs:
objects.append({
'key': blob.name,
'size': blob.size,
'last_modified': blob.updated.isoformat() if blob.updated else None,
'etag': blob.etag,
'storage_class': blob.storage_class
})
return objects
except Exception as e:
self.logger.error(f"Failed to list GCS objects: {e}")
return []
# ==================== Azure Blob Storage Manager ====================
class AzureStorageManager(CloudStorageManager):
"""Azure Blob Storage management."""
def __init__(self, config: StorageConfig):
super().__init__(config)
self.blob_service_client = BlobServiceClient.from_connection_string(
os.getenv('AZURE_STORAGE_CONNECTION_STRING')
)
self.container_name = config.azure_container
def create_container(
self,
container_name: Optional[str] = None,
public_access: str = 'off'
) -> bool:
"""Create Azure container."""
try:
container_name = container_name or self.container_name
container_client = self.blob_service_client.create_container(
container_name,
public_access=public_access
)
self.logger.info(f"Created Azure container: {container_name}")
return True
except Exception as e:
self.logger.error(f"Failed to create Azure container: {e}")
return False
def upload_file(
self,
file_path: str,
blob_name: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None
) -> bool:
"""Upload file to Azure Blob Storage."""
try:
file_path = Path(file_path)
blob_name = blob_name or file_path.name
blob_client = self.blob_service_client.get_blob_client(
container=self.container_name,
blob=blob_name
)
with open(file_path, 'rb') as f:
blob_client.upload_blob(
f,
overwrite=True,
metadata=metadata
)
self.logger.info(f"Uploaded {file_path} to Azure container {self.container_name}/{blob_name}")
return True
except Exception as e:
self.logger.error(f"Azure upload failed: {e}")
return False
# ==================== Multi-Cloud Storage Manager ====================
class MultiCloudStorageManager:
"""Manage storage across multiple cloud providers."""
def __init__(self, config: StorageConfig):
self.config = config
self.providers = {}
self.logger = logging.getLogger(__name__)
# Initialize providers
if config.s3_bucket:
self.providers['aws'] = S3StorageManager(config)
if config.gcs_bucket:
self.providers['gcp'] = GCSStorageManager(config)
if config.azure_container:
self.providers['azure'] = AzureStorageManager(config)
def upload_to_all(
self,
file_path: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> Dict[str, bool]:
"""Upload file to all configured providers."""
results = {}
with ThreadPoolExecutor(max_workers=len(self.providers)) as executor:
futures = {}
for provider_name, provider in self.providers.items():
future = executor.submit(provider.upload_file, file_path, key, metadata)
futures[future] = provider_name
for future in as_completed(futures):
provider_name = futures[future]
try:
results[provider_name] = future.result()
except Exception as e:
self.logger.error(f"Upload to {provider_name} failed: {e}")
results[provider_name] = False
return results
def replicate_across_clouds(
self,
source_provider: str,
source_key: str,
target_providers: List[str]
) -> Dict[str, bool]:
"""Replicate object across cloud providers."""
results = {}
# Download from source
temp_file = f"/tmp/{source_key}"
source = self.providers[source_provider]
if not source.download_file(source_key, temp_file):
self.logger.error("Failed to download from source")
return results
# Upload to targets
for target_name in target_providers:
if target_name in self.providers:
target = self.providers[target_name]
results[target_name] = target.upload_file(temp_file, source_key)
# Clean up temp file
os.remove(temp_file)
return results
# ==================== Storage Analytics ====================
class StorageAnalytics:
"""Analyze storage usage and costs."""
def __init__(self, storage_manager: CloudStorageManager):
self.storage = storage_manager
self.logger = logging.getLogger(__name__)
def analyze_usage(self) -> Dict[str, Any]:
"""Analyze storage usage patterns."""
objects = self.storage.list_objects()
if not objects:
return {}
total_size = sum(obj['size'] for obj in objects)
total_count = len(objects)
# Group by storage class
by_class = {}
for obj in objects:
storage_class = obj.get('storage_class', 'STANDARD')
if storage_class not in by_class:
by_class[storage_class] = {'count': 0, 'size': 0}
by_class[storage_class]['count'] += 1
by_class[storage_class]['size'] += obj['size']
# File type analysis
by_type = {}
for obj in objects:
ext = Path(obj['key']).suffix.lower()
if ext not in by_type:
by_type[ext] = {'count': 0, 'size': 0}
by_type[ext]['count'] += 1
by_type[ext]['size'] += obj['size']
return {
'total_size': total_size,
'total_count': total_count,
'average_size': total_size / total_count if total_count > 0 else 0,
'by_storage_class': by_class,
'by_file_type': by_type
}
def estimate_costs(self) -> Dict[str, float]:
"""Estimate storage costs."""
# Simplified cost estimation (actual costs vary by region and usage)
pricing = {
'STANDARD': 0.023, # per GB per month
'STANDARD_IA': 0.0125,
'GLACIER': 0.004,
'DEEP_ARCHIVE': 0.00099
}
usage = self.analyze_usage()
costs = {}
for storage_class, data in usage.get('by_storage_class', {}).items():
size_gb = data['size'] / (1024 ** 3)
rate = pricing.get(storage_class, 0.023)
costs[storage_class] = size_gb * rate
costs['total'] = sum(costs.values())
return costs
def generate_report(self) -> str:
"""Generate storage analytics report."""
usage = self.analyze_usage()
costs = self.estimate_costs()
report = f"""
Storage Analytics Report
========================
Total Storage Usage:
- Objects: {usage.get('total_count', 0):,}
- Size: {self._format_bytes(usage.get('total_size', 0))}
- Average Size: {self._format_bytes(usage.get('average_size', 0))}
Storage Class Distribution:
"""
for storage_class, data in usage.get('by_storage_class', {}).items():
report += f" {storage_class}:\n"
report += f" Count: {data['count']:,}\n"
report += f" Size: {self._format_bytes(data['size'])}\n"
report += f" Cost: ${costs.get(storage_class, 0):.2f}/month\n"
report += f"\nEstimated Monthly Cost: ${costs.get('total', 0):.2f}\n"
return report
def _format_bytes(self, bytes_value: int) -> str:
"""Format bytes to human readable string."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
# ==================== CDN Integration ====================
class CDNIntegration:
"""Integrate storage with CDN."""
def __init__(self, storage_manager: CloudStorageManager):
self.storage = storage_manager
self.logger = logging.getLogger(__name__)
def setup_cloudfront_distribution(
self,
bucket_name: str,
distribution_comment: str = "CDN Distribution"
):
"""Setup AWS CloudFront distribution."""
cloudfront = boto3.client('cloudfront')
origin_id = f"S3-{bucket_name}"
distribution_config = {
'CallerReference': str(datetime.now().timestamp()),
'Comment': distribution_comment,
'Enabled': True,
'Origins': {
'Quantity': 1,
'Items': [{
'Id': origin_id,
'DomainName': f"{bucket_name}.s3.amazonaws.com",
'S3OriginConfig': {
'OriginAccessIdentity': ''
}
}]
},
'DefaultCacheBehavior': {
'TargetOriginId': origin_id,
'ViewerProtocolPolicy': 'redirect-to-https',
'TrustedSigners': {
'Enabled': False,
'Quantity': 0
},
'ForwardedValues': {
'QueryString': False,
'Cookies': {'Forward': 'none'}
},
'MinTTL': 0,
'DefaultTTL': 86400,
'MaxTTL': 31536000
}
}
try:
response = cloudfront.create_distribution(
DistributionConfig=distribution_config
)
distribution_id = response['Distribution']['Id']
domain_name = response['Distribution']['DomainName']
self.logger.info(f"Created CloudFront distribution: {distribution_id}")
self.logger.info(f"CDN Domain: {domain_name}")
return domain_name
except Exception as e:
self.logger.error(f"Failed to create CloudFront distribution: {e}")
return None
# Example usage
if __name__ == "__main__":
print("š¾ Cloud Storage Management Examples\n")
# Example 1: Initialize storage
print("1ļøā£ Initializing Cloud Storage:")
config = StorageConfig(
provider="aws",
s3_bucket="my-data-bucket",
enable_versioning=True,
archive_after_days=90
)
s3_storage = S3StorageManager(config)
print(f" Provider: AWS S3")
print(f" Bucket: {config.s3_bucket}")
print(f" Versioning: {config.enable_versioning}")
print(f" Archive after: {config.archive_after_days} days")
# Example 2: Upload operations
print("\n2ļøā£ Upload Operations:")
print(" # Simple upload")
print(" s3_storage.upload_file('data.csv', 'data/2024/data.csv')")
print("\n # Multipart upload for large files")
print(" s3_storage.upload_file('video.mp4', storage_class=StorageClass.GLACIER)")
print("\n # Async upload")
print(" await s3_storage.upload_file_async('image.jpg', 'images/image.jpg')")
# Example 3: Storage classes
print("\n3ļøā£ Storage Classes (Cost vs Performance):")
classes = [
("STANDARD", "Frequent access", "$0.023/GB"),
("STANDARD_IA", "Infrequent access", "$0.0125/GB"),
("GLACIER", "Archive", "$0.004/GB"),
("DEEP_ARCHIVE", "Long-term archive", "$0.00099/GB"),
("INTELLIGENT_TIERING", "Automatic tiering", "Variable")
]
for storage_class, use_case, cost in classes:
print(f" {storage_class}: {use_case} ({cost}/month)")
# Example 4: Lifecycle policies
print("\n4ļøā£ Lifecycle Policy Example:")
print(" Days 0-30: STANDARD (hot data)")
print(" Days 31-90: STANDARD_IA (warm data)")
print(" Days 91-365: GLACIER (cold data)")
print(" After 365 days: DELETE")
# Example 5: Sync operations
print("\n5ļøā£ Directory Sync:")
print(" # Sync local to cloud")
print(" s3_storage.sync_directory(")
print(" local_dir='/data/backup',")
print(" s3_prefix='backups/2024',")
print(" delete=True # Remove deleted files")
print(" )")
# Example 6: Presigned URLs
print("\n6ļøā£ Secure Sharing with Presigned URLs:")
print(" # Generate temporary download link")
print(" url = s3_storage.generate_presigned_url(")
print(" key='private/document.pdf',")
print(" expiration=3600 # 1 hour")
print(" )")
# Example 7: Multi-cloud replication
print("\n7ļøā£ Multi-Cloud Replication:")
print(" multi_cloud = MultiCloudStorageManager(config)")
print(" multi_cloud.replicate_across_clouds(")
print(" source_provider='aws',")
print(" source_key='important-data.zip',")
print(" target_providers=['gcp', 'azure']")
print(" )")
# Example 8: Storage analytics
print("\n8ļøā£ Storage Analytics:")
print(" analytics = StorageAnalytics(s3_storage)")
print(" usage = analytics.analyze_usage()")
print(" costs = analytics.estimate_costs()")
print(" report = analytics.generate_report()")
# Example 9: Best practices
print("\n9ļøā£ Cloud Storage Best Practices:")
practices = [
"š Enable encryption at rest",
"š Use versioning for critical data",
"š·ļø Tag resources for organization",
"š Monitor access logs",
"š° Implement lifecycle policies",
"š Use CDN for global distribution",
"š Use IAM for access control",
"š¦ Compress data before storage",
"š Regular backups to different regions",
"ā” Use appropriate storage classes"
]
for practice in practices:
print(f" {practice}")
# Example 10: Performance tips
print("\nš Performance Optimization:")
tips = [
"Use multipart uploads for files > 100MB",
"Enable transfer acceleration for global uploads",
"Use byte-range fetches for partial downloads",
"Implement request coalescing",
"Cache frequently accessed data",
"Use parallel uploads/downloads",
"Optimize file formats (Parquet for analytics)",
"Enable S3 Select for query pushdown"
]
for tip in tips:
print(f" ⢠{tip}")
print("\nā
Cloud storage management demonstration complete!")
Key Takeaways and Best Practices šÆ
- Choose Appropriate Storage Classes: Match storage tier to access patterns.
- Implement Lifecycle Policies: Automate data archival and deletion.
- Enable Versioning: Protect against accidental deletion.
- Use Encryption: Always encrypt sensitive data at rest and in transit.
- Monitor Costs: Track storage usage and optimize regularly.
- Implement Multi-Cloud: Avoid vendor lock-in with replication.
- Use CDN: Improve global performance with content delivery networks.
- Regular Backups: Maintain backups in different regions.
Cloud Storage Best Practices š
Mastering cloud storage management enables you to build scalable, reliable, and cost-effective data solutions in the cloud. You can now handle everything from simple file uploads to petabyte-scale data lakes, implement intelligent tiering for cost optimization, ensure data durability with replication, and provide global access through CDN integration. Whether you're building backup systems, content delivery platforms, or big data solutions, these cloud storage skills are essential for modern applications! š
Pro Tip: Think of cloud storage as a hierarchy of solutions - each tier offers different trade-offs between cost, performance, and durability. Start by understanding your data access patterns: hot data needs fast storage (STANDARD), warm data can use cheaper tiers (INFREQUENT_ACCESS), and cold data should be archived (GLACIER). Always enable encryption at rest and use IAM for fine-grained access control. Implement lifecycle policies from day one to automatically move data to appropriate tiers and delete expired data. Use versioning for critical data to protect against accidental deletion or corruption. For large files, use multipart uploads to improve reliability and performance. Implement proper tagging for cost allocation and organization. Use presigned URLs for secure temporary access instead of making objects public. Monitor your storage costs regularly - they can grow quickly. Consider multi-cloud replication for critical data to avoid vendor lock-in. Use CDN integration for frequently accessed public content. Most importantly: treat cloud storage as code - version control your bucket configurations, lifecycle policies, and access controls!