š Deploying to Cloud: Ship Your Applications to Production
Cloud deployment is the bridge between development and production - it transforms your local applications into scalable, globally-accessible services running on cloud infrastructure. Like launching a rocket into orbit, mastering cloud deployment involves understanding containerization, CI/CD pipelines, infrastructure as code, monitoring, and rollback strategies. Whether deploying to AWS, GCP, Azure, or hybrid clouds, these skills enable you to ship applications reliably and efficiently. Let's explore the comprehensive world of cloud deployment! š
The Cloud Deployment Architecture
Think of cloud deployment as orchestrating a symphony - you need to coordinate code packaging, environment configuration, infrastructure provisioning, security settings, monitoring setup, and traffic routing all in harmony. Using tools like Docker for containerization, Terraform for infrastructure as code, GitHub Actions for CI/CD, and Kubernetes for orchestration, you can create deployment pipelines that are reproducible, scalable, and resilient. Understanding these deployment patterns is essential for modern application delivery!
Real-World Scenario: The Multi-Cloud Application Deployment Platform šļø
You're building a deployment platform that packages applications in Docker containers, pushes to container registries, deploys to multiple cloud providers (AWS, GCP, Azure), manages Kubernetes clusters, implements blue-green deployments, sets up auto-scaling, configures monitoring and logging, manages secrets and configurations, handles database migrations, and provides instant rollback capabilities. Your system must support multiple environments, ensure zero-downtime deployments, maintain security compliance, and optimize costs. Let's build a production-ready deployment framework!
# First, install required packages:
# pip install docker kubernetes pyyaml jinja2 boto3 google-cloud-storage azure-storage-blob
# pip install paramiko fabric gitpython python-terraform prometheus-client
import os
import json
import yaml
import time
import subprocess
import shutil
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
import logging
import tempfile
import hashlib
import base64
# Docker and container management
import docker
from docker.errors import BuildError, APIError
# Kubernetes
from kubernetes import client as k8s_client, config as k8s_config
from kubernetes.client.rest import ApiException
# Cloud providers
import boto3 # AWS
from google.cloud import storage as gcs # GCP
from azure.storage.blob import BlobServiceClient # Azure
# Git operations
from git import Repo
# Template engine
from jinja2 import Template, Environment, FileSystemLoader
# ==================== Deployment Configuration ====================
@dataclass
class DeploymentConfig:
"""Deployment configuration."""
app_name: str
environment: str # dev, staging, production
# Container settings
docker_registry: str = "docker.io"
docker_repository: str = ""
docker_tag: str = "latest"
# Cloud provider
cloud_provider: str = "aws" # aws, gcp, azure, kubernetes
region: str = "us-east-1"
# Deployment settings
deployment_strategy: str = "rolling" # rolling, blue-green, canary
replicas: int = 3
min_replicas: int = 2
max_replicas: int = 10
# Resources
cpu_request: str = "100m"
memory_request: str = "128Mi"
cpu_limit: str = "500m"
memory_limit: str = "512Mi"
# Health checks
health_check_path: str = "/health"
health_check_interval: int = 30
health_check_timeout: int = 10
# Monitoring
enable_monitoring: bool = True
enable_logging: bool = True
# Security
secrets: Dict[str, str] = field(default_factory=dict)
config_maps: Dict[str, str] = field(default_factory=dict)
# Rollback
max_rollback_revisions: int = 5
rollback_on_failure: bool = True
class DeploymentStatus(Enum):
"""Deployment status."""
PENDING = "pending"
BUILDING = "building"
PUSHING = "pushing"
DEPLOYING = "deploying"
RUNNING = "running"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
# ==================== Docker Manager ====================
class DockerManager:
"""Manage Docker containers and images."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.client = docker.from_env()
self.logger = logging.getLogger(__name__)
def build_image(
self,
dockerfile_path: str,
context_path: str,
build_args: Optional[Dict[str, str]] = None,
multi_stage_target: Optional[str] = None
) -> str:
"""
Build Docker image.
Args:
dockerfile_path: Path to Dockerfile
context_path: Build context path
build_args: Build arguments
multi_stage_target: Target stage for multi-stage build
Returns:
Image tag
"""
try:
# Generate image tag
image_tag = self._generate_image_tag()
# Build arguments
build_params = {
'path': context_path,
'dockerfile': dockerfile_path,
'tag': image_tag,
'rm': True, # Remove intermediate containers
'forcerm': True # Force removal
}
if build_args:
build_params['buildargs'] = build_args
if multi_stage_target:
build_params['target'] = multi_stage_target
self.logger.info(f"Building Docker image: {image_tag}")
# Build image with progress
image, build_logs = self.client.images.build(**build_params)
# Print build logs
for log in build_logs:
if 'stream' in log:
self.logger.debug(log['stream'].strip())
self.logger.info(f"Successfully built image: {image_tag}")
return image_tag
except BuildError as e:
self.logger.error(f"Docker build failed: {e}")
raise
def push_image(self, image_tag: str, registry_auth: Optional[Dict] = None) -> bool:
"""Push Docker image to registry."""
try:
self.logger.info(f"Pushing image to registry: {image_tag}")
# Push with authentication if provided
response = self.client.images.push(
image_tag,
auth_config=registry_auth,
stream=True,
decode=True
)
# Process push response
for line in response:
if 'error' in line:
raise Exception(f"Push failed: {line['error']}")
if 'status' in line:
self.logger.debug(f"Push status: {line['status']}")
self.logger.info(f"Successfully pushed image: {image_tag}")
return True
except Exception as e:
self.logger.error(f"Failed to push image: {e}")
return False
def scan_image(self, image_tag: str) -> Dict[str, Any]:
"""Scan Docker image for vulnerabilities."""
try:
# Run security scan (using trivy as example)
result = subprocess.run(
['trivy', 'image', '--format', 'json', image_tag],
capture_output=True,
text=True
)
if result.returncode == 0:
scan_results = json.loads(result.stdout)
# Parse vulnerabilities
vulnerabilities = {
'critical': 0,
'high': 0,
'medium': 0,
'low': 0
}
for target in scan_results.get('Results', []):
for vuln in target.get('Vulnerabilities', []):
severity = vuln.get('Severity', 'UNKNOWN').lower()
if severity in vulnerabilities:
vulnerabilities[severity] += 1
return {
'status': 'completed',
'vulnerabilities': vulnerabilities,
'scan_date': datetime.now().isoformat()
}
return {'status': 'failed', 'error': result.stderr}
except Exception as e:
self.logger.error(f"Image scan failed: {e}")
return {'status': 'error', 'error': str(e)}
def _generate_image_tag(self) -> str:
"""Generate unique image tag."""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
short_hash = hashlib.sha256(timestamp.encode()).hexdigest()[:8]
if self.config.docker_repository:
return f"{self.config.docker_registry}/{self.config.docker_repository}/{self.config.app_name}:{timestamp}-{short_hash}"
else:
return f"{self.config.app_name}:{timestamp}-{short_hash}"
def create_dockerfile(
self,
base_image: str = "python:3.9-slim",
app_dir: str = "/app",
requirements_file: str = "requirements.txt",
entry_command: str = "python app.py"
) -> str:
"""
Create optimized Dockerfile.
Returns:
Dockerfile content
"""
dockerfile = f"""
# Multi-stage build for smaller image size
FROM {base_image} AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y \\
gcc \\
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR {app_dir}
# Copy requirements and install dependencies
COPY {requirements_file} .
RUN pip install --no-cache-dir --user -r {requirements_file}
# Final stage
FROM {base_image}
# Create non-root user
RUN useradd -m -u 1000 appuser
# Set working directory
WORKDIR {app_dir}
# Copy installed packages from builder
COPY --from=builder /root/.local /home/appuser/.local
# Copy application code
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Add local bin to PATH
ENV PATH=/home/appuser/.local/bin:$PATH
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
CMD python -c "import requests; requests.get('http://localhost:8080/health')" || exit 1
# Expose port
EXPOSE 8080
# Run application
CMD [{' '.join([f'"{c}"' for c in entry_command.split()])}]
"""
return dockerfile
# ==================== Kubernetes Deployment Manager ====================
class KubernetesDeploymentManager:
"""Manage Kubernetes deployments."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.logger = logging.getLogger(__name__)
# Load Kubernetes config
try:
k8s_config.load_incluster_config() # In-cluster config
except:
k8s_config.load_kube_config() # Local config
# Initialize API clients
self.apps_v1 = k8s_client.AppsV1Api()
self.core_v1 = k8s_client.CoreV1Api()
self.autoscaling_v1 = k8s_client.AutoscalingV1Api()
self.networking_v1 = k8s_client.NetworkingV1Api()
def deploy_application(
self,
image: str,
namespace: str = "default",
expose_service: bool = True,
ingress_host: Optional[str] = None
) -> Dict[str, Any]:
"""
Deploy application to Kubernetes.
Args:
image: Docker image to deploy
namespace: Kubernetes namespace
expose_service: Whether to create service
ingress_host: Hostname for ingress
Returns:
Deployment information
"""
try:
# Create namespace if it doesn't exist
self._create_namespace(namespace)
# Create ConfigMap
if self.config.config_maps:
self._create_config_map(namespace)
# Create Secret
if self.config.secrets:
self._create_secret(namespace)
# Create Deployment
deployment = self._create_deployment(image, namespace)
# Create Service
service = None
if expose_service:
service = self._create_service(namespace)
# Create Ingress
ingress = None
if ingress_host:
ingress = self._create_ingress(namespace, ingress_host)
# Create HPA (Horizontal Pod Autoscaler)
hpa = self._create_hpa(namespace)
return {
'deployment': deployment.metadata.name,
'service': service.metadata.name if service else None,
'ingress': ingress.metadata.name if ingress else None,
'hpa': hpa.metadata.name if hpa else None,
'namespace': namespace,
'status': 'deployed'
}
except ApiException as e:
self.logger.error(f"Kubernetes deployment failed: {e}")
raise
def _create_namespace(self, namespace: str):
"""Create Kubernetes namespace if it doesn't exist."""
try:
self.core_v1.read_namespace(namespace)
except ApiException as e:
if e.status == 404:
namespace_manifest = k8s_client.V1Namespace(
metadata=k8s_client.V1ObjectMeta(name=namespace)
)
self.core_v1.create_namespace(namespace_manifest)
self.logger.info(f"Created namespace: {namespace}")
def _create_deployment(self, image: str, namespace: str) -> k8s_client.V1Deployment:
"""Create Kubernetes deployment."""
# Container configuration
container = k8s_client.V1Container(
name=self.config.app_name,
image=image,
ports=[k8s_client.V1ContainerPort(container_port=8080)],
resources=k8s_client.V1ResourceRequirements(
requests={
'cpu': self.config.cpu_request,
'memory': self.config.memory_request
},
limits={
'cpu': self.config.cpu_limit,
'memory': self.config.memory_limit
}
),
liveness_probe=k8s_client.V1Probe(
http_get=k8s_client.V1HTTPGetAction(
path=self.config.health_check_path,
port=8080
),
initial_delay_seconds=30,
period_seconds=self.config.health_check_interval,
timeout_seconds=self.config.health_check_timeout
),
readiness_probe=k8s_client.V1Probe(
http_get=k8s_client.V1HTTPGetAction(
path=self.config.health_check_path,
port=8080
),
initial_delay_seconds=10,
period_seconds=10
)
)
# Add environment variables from ConfigMap
if self.config.config_maps:
container.env_from = [
k8s_client.V1EnvFromSource(
config_map_ref=k8s_client.V1ConfigMapEnvSource(
name=f"{self.config.app_name}-config"
)
)
]
# Add secrets as environment variables
if self.config.secrets:
if not container.env_from:
container.env_from = []
container.env_from.append(
k8s_client.V1EnvFromSource(
secret_ref=k8s_client.V1SecretEnvSource(
name=f"{self.config.app_name}-secret"
)
)
)
# Pod template
template = k8s_client.V1PodTemplateSpec(
metadata=k8s_client.V1ObjectMeta(
labels={'app': self.config.app_name}
),
spec=k8s_client.V1PodSpec(containers=[container])
)
# Deployment spec
spec = k8s_client.V1DeploymentSpec(
replicas=self.config.replicas,
selector=k8s_client.V1LabelSelector(
match_labels={'app': self.config.app_name}
),
template=template,
strategy=k8s_client.V1DeploymentStrategy(
type='RollingUpdate' if self.config.deployment_strategy == 'rolling' else 'Recreate',
rolling_update=k8s_client.V1RollingUpdateDeployment(
max_surge='25%',
max_unavailable='25%'
) if self.config.deployment_strategy == 'rolling' else None
)
)
# Deployment manifest
deployment = k8s_client.V1Deployment(
api_version='apps/v1',
kind='Deployment',
metadata=k8s_client.V1ObjectMeta(name=self.config.app_name),
spec=spec
)
# Create or update deployment
try:
existing = self.apps_v1.read_namespaced_deployment(
name=self.config.app_name,
namespace=namespace
)
# Update existing deployment
response = self.apps_v1.patch_namespaced_deployment(
name=self.config.app_name,
namespace=namespace,
body=deployment
)
self.logger.info(f"Updated deployment: {self.config.app_name}")
except ApiException as e:
if e.status == 404:
# Create new deployment
response = self.apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
self.logger.info(f"Created deployment: {self.config.app_name}")
else:
raise
return response
def _create_service(self, namespace: str) -> k8s_client.V1Service:
"""Create Kubernetes service."""
service = k8s_client.V1Service(
api_version='v1',
kind='Service',
metadata=k8s_client.V1ObjectMeta(name=self.config.app_name),
spec=k8s_client.V1ServiceSpec(
selector={'app': self.config.app_name},
ports=[
k8s_client.V1ServicePort(
port=80,
target_port=8080,
protocol='TCP'
)
],
type='ClusterIP'
)
)
try:
existing = self.core_v1.read_namespaced_service(
name=self.config.app_name,
namespace=namespace
)
# Update existing service
response = self.core_v1.patch_namespaced_service(
name=self.config.app_name,
namespace=namespace,
body=service
)
self.logger.info(f"Updated service: {self.config.app_name}")
except ApiException as e:
if e.status == 404:
# Create new service
response = self.core_v1.create_namespaced_service(
namespace=namespace,
body=service
)
self.logger.info(f"Created service: {self.config.app_name}")
else:
raise
return response
def _create_ingress(self, namespace: str, host: str) -> k8s_client.V1Ingress:
"""Create Kubernetes ingress."""
ingress = k8s_client.V1Ingress(
api_version='networking.k8s.io/v1',
kind='Ingress',
metadata=k8s_client.V1ObjectMeta(
name=self.config.app_name,
annotations={
'kubernetes.io/ingress.class': 'nginx',
'cert-manager.io/cluster-issuer': 'letsencrypt-prod'
}
),
spec=k8s_client.V1IngressSpec(
tls=[
k8s_client.V1IngressTLS(
hosts=[host],
secret_name=f"{self.config.app_name}-tls"
)
],
rules=[
k8s_client.V1IngressRule(
host=host,
http=k8s_client.V1HTTPIngressRuleValue(
paths=[
k8s_client.V1HTTPIngressPath(
path='/',
path_type='Prefix',
backend=k8s_client.V1IngressBackend(
service=k8s_client.V1IngressServiceBackend(
name=self.config.app_name,
port=k8s_client.V1ServiceBackendPort(number=80)
)
)
)
]
)
)
]
)
)
try:
existing = self.networking_v1.read_namespaced_ingress(
name=self.config.app_name,
namespace=namespace
)
# Update existing ingress
response = self.networking_v1.patch_namespaced_ingress(
name=self.config.app_name,
namespace=namespace,
body=ingress
)
self.logger.info(f"Updated ingress: {self.config.app_name}")
except ApiException as e:
if e.status == 404:
# Create new ingress
response = self.networking_v1.create_namespaced_ingress(
namespace=namespace,
body=ingress
)
self.logger.info(f"Created ingress: {self.config.app_name}")
else:
raise
return response
def _create_hpa(self, namespace: str) -> k8s_client.V1HorizontalPodAutoscaler:
"""Create Horizontal Pod Autoscaler."""
hpa = k8s_client.V1HorizontalPodAutoscaler(
api_version='autoscaling/v1',
kind='HorizontalPodAutoscaler',
metadata=k8s_client.V1ObjectMeta(name=self.config.app_name),
spec=k8s_client.V1HorizontalPodAutoscalerSpec(
scale_target_ref=k8s_client.V1CrossVersionObjectReference(
api_version='apps/v1',
kind='Deployment',
name=self.config.app_name
),
min_replicas=self.config.min_replicas,
max_replicas=self.config.max_replicas,
target_cpu_utilization_percentage=70
)
)
try:
existing = self.autoscaling_v1.read_namespaced_horizontal_pod_autoscaler(
name=self.config.app_name,
namespace=namespace
)
# Update existing HPA
response = self.autoscaling_v1.patch_namespaced_horizontal_pod_autoscaler(
name=self.config.app_name,
namespace=namespace,
body=hpa
)
self.logger.info(f"Updated HPA: {self.config.app_name}")
except ApiException as e:
if e.status == 404:
# Create new HPA
response = self.autoscaling_v1.create_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
body=hpa
)
self.logger.info(f"Created HPA: {self.config.app_name}")
else:
raise
return response
def _create_config_map(self, namespace: str):
"""Create ConfigMap from configuration."""
config_map = k8s_client.V1ConfigMap(
api_version='v1',
kind='ConfigMap',
metadata=k8s_client.V1ObjectMeta(name=f"{self.config.app_name}-config"),
data=self.config.config_maps
)
try:
existing = self.core_v1.read_namespaced_config_map(
name=f"{self.config.app_name}-config",
namespace=namespace
)
# Update existing ConfigMap
self.core_v1.patch_namespaced_config_map(
name=f"{self.config.app_name}-config",
namespace=namespace,
body=config_map
)
except ApiException as e:
if e.status == 404:
# Create new ConfigMap
self.core_v1.create_namespaced_config_map(
namespace=namespace,
body=config_map
)
def _create_secret(self, namespace: str):
"""Create Secret from sensitive data."""
# Encode secret values
encoded_data = {
key: base64.b64encode(value.encode()).decode()
for key, value in self.config.secrets.items()
}
secret = k8s_client.V1Secret(
api_version='v1',
kind='Secret',
metadata=k8s_client.V1ObjectMeta(name=f"{self.config.app_name}-secret"),
type='Opaque',
data=encoded_data
)
try:
existing = self.core_v1.read_namespaced_secret(
name=f"{self.config.app_name}-secret",
namespace=namespace
)
# Update existing Secret
self.core_v1.patch_namespaced_secret(
name=f"{self.config.app_name}-secret",
namespace=namespace,
body=secret
)
except ApiException as e:
if e.status == 404:
# Create new Secret
self.core_v1.create_namespaced_secret(
namespace=namespace,
body=secret
)
def rollback_deployment(self, namespace: str = "default", revision: int = 0):
"""Rollback deployment to previous version."""
try:
# Get deployment
deployment = self.apps_v1.read_namespaced_deployment(
name=self.config.app_name,
namespace=namespace
)
# Rollback to previous version
deployment.spec.revision_history_limit = self.config.max_rollback_revisions
# Update deployment
self.apps_v1.patch_namespaced_deployment(
name=self.config.app_name,
namespace=namespace,
body=deployment
)
self.logger.info(f"Rolled back deployment: {self.config.app_name}")
except ApiException as e:
self.logger.error(f"Rollback failed: {e}")
raise
# ==================== Cloud Provider Deployers ====================
class AWSDeployer:
"""Deploy to AWS services."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize AWS clients
self.ecs_client = boto3.client('ecs', region_name=config.region)
self.ecr_client = boto3.client('ecr', region_name=config.region)
self.elb_client = boto3.client('elbv2', region_name=config.region)
def deploy_to_ecs(
self,
cluster_name: str,
task_definition: Dict[str, Any],
service_name: Optional[str] = None
):
"""Deploy container to ECS."""
try:
# Register task definition
response = self.ecs_client.register_task_definition(**task_definition)
task_def_arn = response['taskDefinition']['taskDefinitionArn']
if service_name:
# Update service
self.ecs_client.update_service(
cluster=cluster_name,
service=service_name,
taskDefinition=task_def_arn,
desiredCount=self.config.replicas
)
self.logger.info(f"Updated ECS service: {service_name}")
else:
# Run task
self.ecs_client.run_task(
cluster=cluster_name,
taskDefinition=task_def_arn,
count=1
)
self.logger.info(f"Started ECS task: {task_def_arn}")
except Exception as e:
self.logger.error(f"ECS deployment failed: {e}")
raise
def deploy_to_lambda(
self,
function_name: str,
zip_file_path: str,
handler: str = "lambda_function.lambda_handler",
runtime: str = "python3.9"
):
"""Deploy function to AWS Lambda."""
lambda_client = boto3.client('lambda', region_name=self.config.region)
try:
with open(zip_file_path, 'rb') as f:
zip_content = f.read()
try:
# Update existing function
lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_content
)
self.logger.info(f"Updated Lambda function: {function_name}")
except lambda_client.exceptions.ResourceNotFoundException:
# Create new function
lambda_client.create_function(
FunctionName=function_name,
Runtime=runtime,
Role=f"arn:aws:iam::ACCOUNT_ID:role/{function_name}-role",
Handler=handler,
Code={'ZipFile': zip_content},
Timeout=60,
MemorySize=512
)
self.logger.info(f"Created Lambda function: {function_name}")
except Exception as e:
self.logger.error(f"Lambda deployment failed: {e}")
raise
class GCPDeployer:
"""Deploy to Google Cloud Platform."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.logger = logging.getLogger(__name__)
def deploy_to_cloud_run(
self,
service_name: str,
image: str,
region: Optional[str] = None
):
"""Deploy container to Cloud Run."""
try:
region = region or self.config.region
# Deploy using gcloud command
cmd = [
'gcloud', 'run', 'deploy', service_name,
'--image', image,
'--region', region,
'--platform', 'managed',
'--allow-unauthenticated',
'--min-instances', str(self.config.min_replicas),
'--max-instances', str(self.config.max_replicas)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"Deployed to Cloud Run: {service_name}")
else:
raise Exception(f"Cloud Run deployment failed: {result.stderr}")
except Exception as e:
self.logger.error(f"Cloud Run deployment failed: {e}")
raise
# ==================== CI/CD Pipeline ====================
class CICDPipeline:
"""CI/CD pipeline orchestrator."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.docker_manager = DockerManager(config)
self.k8s_manager = KubernetesDeploymentManager(config)
self.logger = logging.getLogger(__name__)
# Deployment status
self.status = DeploymentStatus.PENDING
self.deployment_id = self._generate_deployment_id()
def _generate_deployment_id(self) -> str:
"""Generate unique deployment ID."""
return f"{self.config.app_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
def run_pipeline(
self,
source_dir: str,
dockerfile_path: Optional[str] = None,
skip_tests: bool = False
) -> Dict[str, Any]:
"""
Run complete CI/CD pipeline.
Args:
source_dir: Source code directory
dockerfile_path: Path to Dockerfile
skip_tests: Skip running tests
Returns:
Deployment result
"""
try:
self.status = DeploymentStatus.BUILDING
# Step 1: Run tests
if not skip_tests:
self._run_tests(source_dir)
# Step 2: Build Docker image
if not dockerfile_path:
# Generate Dockerfile
dockerfile_content = self.docker_manager.create_dockerfile()
dockerfile_path = Path(source_dir) / "Dockerfile"
dockerfile_path.write_text(dockerfile_content)
image_tag = self.docker_manager.build_image(
dockerfile_path=str(dockerfile_path),
context_path=source_dir
)
# Step 3: Security scan
scan_results = self.docker_manager.scan_image(image_tag)
if scan_results['status'] == 'completed':
if scan_results['vulnerabilities']['critical'] > 0:
raise Exception("Critical vulnerabilities found in image")
self.status = DeploymentStatus.PUSHING
# Step 4: Push to registry
self.docker_manager.push_image(image_tag)
self.status = DeploymentStatus.DEPLOYING
# Step 5: Deploy to target platform
deployment_result = self._deploy_to_platform(image_tag)
self.status = DeploymentStatus.RUNNING
# Step 6: Health check
if not self._health_check(deployment_result):
if self.config.rollback_on_failure:
self._rollback()
self.status = DeploymentStatus.ROLLED_BACK
else:
self.status = DeploymentStatus.FAILED
return {
'deployment_id': self.deployment_id,
'status': self.status.value,
'image': image_tag,
'deployment': deployment_result,
'scan_results': scan_results,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.status = DeploymentStatus.FAILED
self.logger.error(f"Pipeline failed: {e}")
if self.config.rollback_on_failure:
self._rollback()
raise
def _run_tests(self, source_dir: str):
"""Run application tests."""
self.logger.info("Running tests...")
# Run pytest
result = subprocess.run(
['pytest', source_dir, '-v'],
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Tests failed: {result.stdout}")
self.logger.info("All tests passed")
def _deploy_to_platform(self, image: str) -> Dict[str, Any]:
"""Deploy to configured platform."""
if self.config.cloud_provider == "kubernetes":
return self.k8s_manager.deploy_application(
image=image,
namespace=self.config.environment
)
elif self.config.cloud_provider == "aws":
deployer = AWSDeployer(self.config)
# Deploy to ECS or Lambda
return {"platform": "aws", "status": "deployed"}
elif self.config.cloud_provider == "gcp":
deployer = GCPDeployer(self.config)
# Deploy to Cloud Run
return {"platform": "gcp", "status": "deployed"}
else:
raise ValueError(f"Unsupported platform: {self.config.cloud_provider}")
def _health_check(self, deployment: Dict[str, Any]) -> bool:
"""Check deployment health."""
self.logger.info("Performing health check...")
# Wait for deployment to stabilize
time.sleep(30)
# Check health endpoint
# Implementation depends on platform
return True
def _rollback(self):
"""Rollback deployment."""
self.logger.info("Rolling back deployment...")
if self.config.cloud_provider == "kubernetes":
self.k8s_manager.rollback_deployment(
namespace=self.config.environment
)
# ==================== Infrastructure as Code ====================
class InfrastructureAsCode:
"""Manage infrastructure using Terraform."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.logger = logging.getLogger(__name__)
def generate_terraform_config(self) -> str:
"""Generate Terraform configuration."""
if self.config.cloud_provider == "aws":
return self._generate_aws_terraform()
elif self.config.cloud_provider == "gcp":
return self._generate_gcp_terraform()
elif self.config.cloud_provider == "azure":
return self._generate_azure_terraform()
else:
raise ValueError(f"Unsupported provider: {self.config.cloud_provider}")
def _generate_aws_terraform(self) -> str:
"""Generate Terraform config for AWS."""
return f"""
terraform {{
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 4.0"
}}
}}
}}
provider "aws" {{
region = "{self.config.region}"
}}
# ECS Cluster
resource "aws_ecs_cluster" "main" {{
name = "{self.config.app_name}-cluster"
}}
# Task Definition
resource "aws_ecs_task_definition" "app" {{
family = "{self.config.app_name}"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = "256"
memory = "512"
container_definitions = jsonencode([{{
name = "{self.config.app_name}"
image = "${{var.docker_image}}"
portMappings = [{{
containerPort = 8080
protocol = "tcp"
}}]
environment = [
{{
name = "ENVIRONMENT"
value = "{self.config.environment}"
}}
]
logConfiguration = {{
logDriver = "awslogs"
options = {{
"awslogs-group" = "/ecs/{self.config.app_name}"
"awslogs-region" = "{self.config.region}"
"awslogs-stream-prefix" = "ecs"
}}
}}
}}])
}}
# ECS Service
resource "aws_ecs_service" "app" {{
name = "{self.config.app_name}"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.app.arn
desired_count = {self.config.replicas}
launch_type = "FARGATE"
network_configuration {{
subnets = var.subnet_ids
security_groups = [aws_security_group.app.id]
}}
}}
# Security Group
resource "aws_security_group" "app" {{
name = "{self.config.app_name}-sg"
description = "Security group for {self.config.app_name}"
vpc_id = var.vpc_id
ingress {{
from_port = 8080
to_port = 8080
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}}
egress {{
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}}
}}
"""
def _generate_gcp_terraform(self) -> str:
"""Generate Terraform config for GCP."""
return f"""
terraform {{
required_providers {{
google = {{
source = "hashicorp/google"
version = "~> 4.0"
}}
}}
}}
provider "google" {{
project = var.project_id
region = "{self.config.region}"
}}
# Cloud Run Service
resource "google_cloud_run_service" "app" {{
name = "{self.config.app_name}"
location = "{self.config.region}"
template {{
spec {{
containers {{
image = var.docker_image
resources {{
limits = {{
cpu = "1"
memory = "512Mi"
}}
}}
env {{
name = "ENVIRONMENT"
value = "{self.config.environment}"
}}
}}
}}
metadata {{
annotations = {{
"autoscaling.knative.dev/minScale" = "{self.config.min_replicas}"
"autoscaling.knative.dev/maxScale" = "{self.config.max_replicas}"
}}
}}
}}
traffic {{
percent = 100
latest_revision = true
}}
}}
# IAM binding to allow public access
resource "google_cloud_run_service_iam_member" "public" {{
service = google_cloud_run_service.app.name
location = google_cloud_run_service.app.location
role = "roles/run.invoker"
member = "allUsers"
}}
"""
def _generate_azure_terraform(self) -> str:
"""Generate Terraform config for Azure."""
return f"""
terraform {{
required_providers {{
azurerm = {{
source = "hashicorp/azurerm"
version = "~> 3.0"
}}
}}
}}
provider "azurerm" {{
features {{}}
}}
# Resource Group
resource "azurerm_resource_group" "main" {{
name = "{self.config.app_name}-rg"
location = "{self.config.region}"
}}
# Container Instance
resource "azurerm_container_group" "app" {{
name = "{self.config.app_name}"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
os_type = "Linux"
container {{
name = "{self.config.app_name}"
image = var.docker_image
cpu = "0.5"
memory = "1"
ports {{
port = 8080
protocol = "TCP"
}}
environment_variables = {{
ENVIRONMENT = "{self.config.environment}"
}}
}}
}}
"""
# Example usage
if __name__ == "__main__":
print("š Cloud Deployment Examples\n")
# Example 1: Initialize deployment
print("1ļøā£ Initializing Deployment Configuration:")
config = DeploymentConfig(
app_name="my-app",
environment="production",
cloud_provider="kubernetes",
replicas=3
)
print(f" App: {config.app_name}")
print(f" Environment: {config.environment}")
print(f" Provider: {config.cloud_provider}")
print(f" Replicas: {config.replicas}")
# Example 2: Docker operations
print("\n2ļøā£ Docker Build & Push:")
print(" docker_manager = DockerManager(config)")
print(" image = docker_manager.build_image(")
print(" dockerfile_path='Dockerfile',")
print(" context_path='.'")
print(" )")
print(" docker_manager.push_image(image)")
# Example 3: Kubernetes deployment
print("\n3ļøā£ Kubernetes Deployment:")
print(" k8s_manager = KubernetesDeploymentManager(config)")
print(" result = k8s_manager.deploy_application(")
print(" image='myapp:latest',")
print(" namespace='production',")
print(" ingress_host='myapp.example.com'")
print(" )")
# Example 4: CI/CD pipeline
print("\n4ļøā£ Running CI/CD Pipeline:")
print(" pipeline = CICDPipeline(config)")
print(" result = pipeline.run_pipeline(")
print(" source_dir='./src',")
print(" skip_tests=False")
print(" )")
# Example 5: Deployment strategies
print("\n5ļøā£ Deployment Strategies:")
strategies = [
("Rolling Update", "Gradual replacement of instances"),
("Blue-Green", "Switch between two identical environments"),
("Canary", "Deploy to small percentage first"),
("Recreate", "Stop all, then start all"),
("A/B Testing", "Route traffic based on rules")
]
for strategy, description in strategies:
print(f" {strategy}: {description}")
# Example 6: Health checks
print("\n6ļøā£ Health Check Types:")
checks = [
("Liveness", "Is the container running?"),
("Readiness", "Can it handle requests?"),
("Startup", "Has it finished starting?")
]
for check_type, description in checks:
print(f" {check_type}: {description}")
# Example 7: Dockerfile best practices
print("\n7ļøā£ Dockerfile Best Practices:")
practices = [
"Use multi-stage builds",
"Run as non-root user",
"Minimize layers",
"Use specific base image tags",
"Add health checks",
"Copy requirements first",
"Use .dockerignore",
"Label your images"
]
for practice in practices:
print(f" ⢠{practice}")
# Example 8: Kubernetes manifest
print("\n8ļøā£ Kubernetes Resources Created:")
resources = [
"Deployment",
"Service",
"Ingress",
"ConfigMap",
"Secret",
"HorizontalPodAutoscaler"
]
for resource in resources:
print(f" ⢠{resource}")
# Example 9: Monitoring setup
print("\n9ļøā£ Monitoring & Observability:")
monitoring = [
"š Metrics (Prometheus)",
"š Logs (ELK/Fluentd)",
"š Traces (Jaeger/Zipkin)",
"šØ Alerts (Alertmanager)",
"š Dashboards (Grafana)"
]
for item in monitoring:
print(f" {item}")
# Example 10: Security checklist
print("\nš Deployment Security Checklist:")
security = [
"ā
Scan images for vulnerabilities",
"ā
Use secrets management",
"ā
Enable RBAC",
"ā
Use network policies",
"ā
Enable TLS/HTTPS",
"ā
Run as non-root",
"ā
Use read-only filesystems",
"ā
Implement pod security policies",
"ā
Regular security updates",
"ā
Audit logging enabled"
]
for item in security:
print(f" {item}")
print("\nā
Cloud deployment demonstration complete!")
print("\nš Note: For production deployments:")
print(" 1. Always use CI/CD pipelines")
print(" 2. Implement proper secret management")
print(" 3. Set up monitoring and alerting")
print(" 4. Have rollback procedures ready")
print(" 5. Test in staging first")
Key Takeaways and Best Practices šÆ
- Containerize Everything: Use Docker for consistent deployments across environments.
- Automate CI/CD: Implement pipelines for reliable, repeatable deployments.
- Use Infrastructure as Code: Define infrastructure in version-controlled files.
- Implement Health Checks: Ensure applications are running correctly.
- Plan for Rollbacks: Have quick rollback procedures for failed deployments.
- Monitor Everything: Set up comprehensive logging and monitoring.
- Secure by Default: Scan images, manage secrets, use least privilege.
- Test Thoroughly: Test in staging before production deployment.
Cloud Deployment Best Practices š
Mastering cloud deployment enables you to ship applications reliably to any cloud platform with confidence. You can now containerize applications, implement CI/CD pipelines, deploy to Kubernetes, manage infrastructure as code, and ensure zero-downtime deployments. Whether you're deploying microservices, monoliths, or serverless functions, these deployment skills ensure your applications reach production safely and efficiently! š
Pro Tip: Think of cloud deployment as shipping code to production safely and efficiently - it requires automation, monitoring, and the ability to roll back quickly. Start with containerization using Docker - it ensures consistency across environments. Implement a proper CI/CD pipeline that builds, tests, scans, and deploys automatically. Use Kubernetes for orchestration when you need advanced features like auto-scaling and self-healing. Choose the right deployment strategy: rolling updates for zero downtime, blue-green for instant rollback, canary for gradual rollout. Always implement comprehensive health checks - liveness, readiness, and startup probes. Use Infrastructure as Code (Terraform/CloudFormation) to make infrastructure reproducible. Implement proper secret management - never hardcode credentials. Set up monitoring and logging from day one - you can't fix what you can't see. Plan for failure with rollback procedures and disaster recovery. Use multi-stage Docker builds to minimize image size. Run containers as non-root users for security. Most importantly: treat deployment as code - version control everything, review changes, and test thoroughly before production!