Skip to main content

ā˜ļø Google Cloud Automation: Master GCP with Python

Google Cloud Platform (GCP) offers powerful Python libraries that enable you to automate everything from Compute Engine VMs to BigQuery analytics, Cloud Storage to Kubernetes Engine. Like having a command center for Google's global infrastructure, mastering GCP automation allows you to deploy applications, process big data, manage machine learning models, and orchestrate cloud-native solutions at scale. Let's explore the comprehensive world of Google Cloud automation with Python! 🌐

The Google Cloud Automation Architecture

Think of GCP's Python libraries as your toolkit for building on Google's infrastructure - they provide intuitive interfaces to services like Compute Engine for VMs, Cloud Storage for objects, BigQuery for analytics, Cloud Functions for serverless, and Kubernetes Engine for containers. With built-in authentication, automatic retry logic, and comprehensive APIs, you can automate complex cloud workflows seamlessly. Understanding GCP's service ecosystem and Python client libraries is essential for effective cloud automation!

graph TB A[GCP Automation with Python] --> B[Compute Services] A --> C[Storage & Data] A --> D[Serverless] A --> E[Containers & Orchestration] B --> F[Compute Engine] B --> G[App Engine] B --> H[Cloud Run] B --> I[Instance Groups] C --> J[Cloud Storage] C --> K[BigQuery] C --> L[Firestore] C --> M[Cloud SQL] D --> N[Cloud Functions] D --> O[Cloud Tasks] D --> P[Cloud Scheduler] D --> Q[Pub/Sub] E --> R[Kubernetes Engine] E --> S[Container Registry] E --> T[Cloud Build] E --> U[Anthos] V[Operations] --> W[Monitoring] V --> X[Logging] V --> Y[IAM & Security] V --> Z[Cost Management] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Multi-Service GCP Platform šŸš€

You're building a comprehensive GCP automation platform that provisions Compute Engine instances for applications, manages Cloud Storage for data lakes, processes analytics with BigQuery, deploys serverless functions for event processing, orchestrates containers with Kubernetes Engine, implements Pub/Sub for messaging, monitors with Cloud Operations, and manages IAM security. Your system must handle petabyte-scale data, support global deployments, optimize costs, and maintain enterprise security. Let's build a production-ready GCP automation framework!

# First, install required packages:
# pip install google-cloud-compute google-cloud-storage google-cloud-bigquery 
# pip install google-cloud-functions google-cloud-pubsub google-cloud-firestore
# pip install google-cloud-monitoring google-cloud-logging google-auth

import os
import json
import time
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
from pathlib import Path
import tempfile
import uuid

# Google Cloud imports
from google.cloud import compute_v1
from google.cloud import storage
from google.cloud import bigquery
from google.cloud import firestore
from google.cloud import pubsub_v1
from google.cloud import functions_v1
from google.cloud import monitoring_v3
from google.cloud import logging as cloud_logging
from google.cloud import secretmanager
from google.oauth2 import service_account
from google.api_core import retry
from google.api_core.exceptions import GoogleAPIError, NotFound, AlreadyExists

# ==================== GCP Configuration ====================

@dataclass
class GCPConfig:
    """Google Cloud Platform configuration."""
    project_id: str
    region: str = "us-central1"
    zone: str = "us-central1-a"
    
    # Authentication
    credentials_path: Optional[str] = None
    credentials: Optional[service_account.Credentials] = None
    
    # Default settings
    default_labels: Dict[str, str] = field(default_factory=lambda: {
        "managed-by": "python-automation",
        "environment": "production",
        "created-at": datetime.now().strftime("%Y-%m-%d")
    })
    
    # Retry configuration
    max_retries: int = 3
    retry_delay: float = 1.0
    
    # Cost management
    enable_cost_optimization: bool = True
    max_monthly_budget: float = 1000.0
    
    # Network settings
    default_network: str = "default"
    default_subnetwork: str = "default"
    
    @classmethod
    def from_env(cls):
        """Load configuration from environment variables."""
        return cls(
            project_id=os.getenv("GCP_PROJECT_ID", ""),
            region=os.getenv("GCP_REGION", "us-central1"),
            zone=os.getenv("GCP_ZONE", "us-central1-a"),
            credentials_path=os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
        )
    
    def get_credentials(self):
        """Get or create credentials."""
        if self.credentials:
            return self.credentials
        
        if self.credentials_path:
            self.credentials = service_account.Credentials.from_service_account_file(
                self.credentials_path
            )
        
        return self.credentials

# ==================== Compute Engine Manager ====================

class ComputeEngineManager:
    """Manage Google Compute Engine resources."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.instances_client = compute_v1.InstancesClient()
        self.images_client = compute_v1.ImagesClient()
        self.disks_client = compute_v1.DisksClient()
        self.firewalls_client = compute_v1.FirewallsClient()
        self.networks_client = compute_v1.NetworksClient()
        self.logger = logging.getLogger(__name__)
    
    def create_instance(
        self,
        instance_name: str,
        machine_type: str = "e2-micro",
        source_image: str = "projects/debian-cloud/global/images/family/debian-11",
        disk_size_gb: int = 10,
        network: Optional[str] = None,
        external_ip: bool = True,
        startup_script: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
        tags: Optional[List[str]] = None
    ) -> compute_v1.Instance:
        """
        Create a Compute Engine instance.
        
        Args:
            instance_name: Name of the instance
            machine_type: Machine type (e.g., 'e2-micro', 'n1-standard-1')
            source_image: Source image for boot disk
            disk_size_gb: Size of boot disk in GB
            network: Network name (uses default if not specified)
            external_ip: Whether to assign external IP
            startup_script: Startup script to run on instance
            metadata: Additional metadata
            tags: Network tags for firewall rules
            
        Returns:
            Created instance object
        """
        try:
            # Machine type URL
            machine_type_url = (
                f"zones/{self.config.zone}/machineTypes/{machine_type}"
            )
            
            # Boot disk configuration
            boot_disk = compute_v1.AttachedDisk(
                auto_delete=True,
                boot=True,
                type_=compute_v1.AttachedDisk.Type.PERSISTENT,
                initialize_params=compute_v1.AttachedDiskInitializeParams(
                    source_image=source_image,
                    disk_size_gb=disk_size_gb
                )
            )
            
            # Network configuration
            network_name = network or self.config.default_network
            network_interface = compute_v1.NetworkInterface(
                network=f"global/networks/{network_name}"
            )
            
            # Add external IP if requested
            if external_ip:
                access_config = compute_v1.AccessConfig(
                    name="External NAT",
                    type_="ONE_TO_ONE_NAT"
                )
                network_interface.access_configs = [access_config]
            
            # Metadata items
            metadata_items = []
            
            # Add startup script if provided
            if startup_script:
                metadata_items.append(
                    compute_v1.Items(
                        key="startup-script",
                        value=startup_script
                    )
                )
            
            # Add additional metadata
            if metadata:
                for key, value in metadata.items():
                    metadata_items.append(
                        compute_v1.Items(key=key, value=value)
                    )
            
            # Instance configuration
            instance = compute_v1.Instance(
                name=instance_name,
                machine_type=machine_type_url,
                disks=[boot_disk],
                network_interfaces=[network_interface]
            )
            
            # Add metadata if any
            if metadata_items:
                instance.metadata = compute_v1.Metadata(items=metadata_items)
            
            # Add tags if provided
            if tags:
                instance.tags = compute_v1.Tags(items=tags)
            
            # Add labels
            instance.labels = self.config.default_labels.copy()
            instance.labels['name'] = instance_name
            
            # Create instance
            operation = self.instances_client.insert(
                project=self.config.project_id,
                zone=self.config.zone,
                instance_resource=instance
            )
            
            # Wait for operation to complete
            self._wait_for_operation(operation, self.config.zone)
            
            # Get the created instance
            created_instance = self.instances_client.get(
                project=self.config.project_id,
                zone=self.config.zone,
                instance=instance_name
            )
            
            self.logger.info(f"Created instance: {instance_name}")
            return created_instance
            
        except Exception as e:
            self.logger.error(f"Failed to create instance: {e}")
            raise
    
    def list_instances(
        self,
        zone: Optional[str] = None
    ) -> List[compute_v1.Instance]:
        """List all instances in a zone."""
        try:
            zone = zone or self.config.zone
            
            instances = self.instances_client.list(
                project=self.config.project_id,
                zone=zone
            )
            
            return list(instances)
            
        except Exception as e:
            self.logger.error(f"Failed to list instances: {e}")
            return []
    
    def get_instance(self, instance_name: str) -> Optional[compute_v1.Instance]:
        """Get instance details."""
        try:
            return self.instances_client.get(
                project=self.config.project_id,
                zone=self.config.zone,
                instance=instance_name
            )
        except NotFound:
            return None
        except Exception as e:
            self.logger.error(f"Failed to get instance: {e}")
            return None
    
    def stop_instance(self, instance_name: str) -> bool:
        """Stop an instance."""
        try:
            operation = self.instances_client.stop(
                project=self.config.project_id,
                zone=self.config.zone,
                instance=instance_name
            )
            
            self._wait_for_operation(operation, self.config.zone)
            
            self.logger.info(f"Stopped instance: {instance_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to stop instance: {e}")
            return False
    
    def start_instance(self, instance_name: str) -> bool:
        """Start an instance."""
        try:
            operation = self.instances_client.start(
                project=self.config.project_id,
                zone=self.config.zone,
                instance=instance_name
            )
            
            self._wait_for_operation(operation, self.config.zone)
            
            self.logger.info(f"Started instance: {instance_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to start instance: {e}")
            return False
    
    def delete_instance(self, instance_name: str) -> bool:
        """Delete an instance."""
        try:
            operation = self.instances_client.delete(
                project=self.config.project_id,
                zone=self.config.zone,
                instance=instance_name
            )
            
            self._wait_for_operation(operation, self.config.zone)
            
            self.logger.info(f"Deleted instance: {instance_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to delete instance: {e}")
            return False
    
    def create_firewall_rule(
        self,
        rule_name: str,
        network: str,
        source_ranges: List[str],
        allowed_ports: List[Dict[str, Any]],
        target_tags: Optional[List[str]] = None
    ) -> bool:
        """
        Create a firewall rule.
        
        Args:
            rule_name: Name of the firewall rule
            network: Network name
            source_ranges: List of source IP ranges (e.g., ['0.0.0.0/0'])
            allowed_ports: List of allowed ports [{'protocol': 'tcp', 'ports': ['80', '443']}]
            target_tags: Tags for target instances
            
        Returns:
            True if successful
        """
        try:
            # Build allowed list
            allowed = []
            for rule in allowed_ports:
                allowed_item = compute_v1.Allowed(
                    I_p_protocol=rule['protocol']
                )
                if 'ports' in rule:
                    allowed_item.ports = rule['ports']
                allowed.append(allowed_item)
            
            # Create firewall rule
            firewall_rule = compute_v1.Firewall(
                name=rule_name,
                network=f"global/networks/{network}",
                source_ranges=source_ranges,
                allowed=allowed,
                direction="INGRESS"
            )
            
            # Add target tags if specified
            if target_tags:
                firewall_rule.target_tags = target_tags
            
            # Create the rule
            operation = self.firewalls_client.insert(
                project=self.config.project_id,
                firewall_resource=firewall_rule
            )
            
            self._wait_for_operation(operation)
            
            self.logger.info(f"Created firewall rule: {rule_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create firewall rule: {e}")
            return False
    
    def _wait_for_operation(
        self,
        operation: compute_v1.Operation,
        zone: Optional[str] = None
    ):
        """Wait for an operation to complete."""
        while not operation.done:
            time.sleep(2)
            if zone:
                operation = self.instances_client.get(
                    project=self.config.project_id,
                    zone=zone,
                    operation=operation.name
                )
            else:
                # Global operation
                pass

# ==================== Cloud Storage Manager ====================

class CloudStorageManager:
    """Manage Google Cloud Storage resources."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.client = storage.Client(
            project=config.project_id,
            credentials=config.get_credentials()
        )
        self.logger = logging.getLogger(__name__)
    
    def create_bucket(
        self,
        bucket_name: str,
        location: Optional[str] = None,
        storage_class: str = "STANDARD",
        versioning: bool = False,
        lifecycle_rules: Optional[List[Dict]] = None
    ) -> storage.Bucket:
        """
        Create a Cloud Storage bucket.
        
        Args:
            bucket_name: Name of the bucket
            location: Location for bucket (uses default region if not specified)
            storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
            versioning: Enable versioning
            lifecycle_rules: Lifecycle management rules
            
        Returns:
            Created bucket object
        """
        try:
            # Create bucket
            bucket = self.client.bucket(bucket_name)
            bucket.location = location or self.config.region
            bucket.storage_class = storage_class
            
            # Add labels
            bucket.labels = self.config.default_labels
            
            # Enable versioning if requested
            if versioning:
                bucket.versioning_enabled = True
            
            # Add lifecycle rules if provided
            if lifecycle_rules:
                bucket.lifecycle_rules = lifecycle_rules
            
            # Create the bucket
            bucket = self.client.create_bucket(bucket)
            
            self.logger.info(f"Created bucket: {bucket_name}")
            return bucket
            
        except AlreadyExists:
            self.logger.warning(f"Bucket already exists: {bucket_name}")
            return self.client.bucket(bucket_name)
        except Exception as e:
            self.logger.error(f"Failed to create bucket: {e}")
            raise
    
    def upload_file(
        self,
        bucket_name: str,
        source_file_path: str,
        destination_blob_name: Optional[str] = None,
        content_type: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None
    ) -> storage.Blob:
        """Upload a file to Cloud Storage."""
        try:
            bucket = self.client.bucket(bucket_name)
            
            # Use filename as blob name if not specified
            if not destination_blob_name:
                destination_blob_name = Path(source_file_path).name
            
            blob = bucket.blob(destination_blob_name)
            
            # Set content type if provided
            if content_type:
                blob.content_type = content_type
            
            # Set metadata if provided
            if metadata:
                blob.metadata = metadata
            
            # Upload file
            blob.upload_from_filename(source_file_path)
            
            self.logger.info(
                f"Uploaded {source_file_path} to gs://{bucket_name}/{destination_blob_name}"
            )
            return blob
            
        except Exception as e:
            self.logger.error(f"Failed to upload file: {e}")
            raise
    
    def download_file(
        self,
        bucket_name: str,
        source_blob_name: str,
        destination_file_path: str
    ) -> bool:
        """Download a file from Cloud Storage."""
        try:
            bucket = self.client.bucket(bucket_name)
            blob = bucket.blob(source_blob_name)
            
            blob.download_to_filename(destination_file_path)
            
            self.logger.info(
                f"Downloaded gs://{bucket_name}/{source_blob_name} to {destination_file_path}"
            )
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to download file: {e}")
            return False
    
    def list_blobs(
        self,
        bucket_name: str,
        prefix: Optional[str] = None,
        delimiter: Optional[str] = None
    ) -> List[storage.Blob]:
        """List blobs in a bucket."""
        try:
            bucket = self.client.bucket(bucket_name)
            blobs = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
            
            return list(blobs)
            
        except Exception as e:
            self.logger.error(f"Failed to list blobs: {e}")
            return []
    
    def delete_blob(self, bucket_name: str, blob_name: str) -> bool:
        """Delete a blob from Cloud Storage."""
        try:
            bucket = self.client.bucket(bucket_name)
            blob = bucket.blob(blob_name)
            blob.delete()
            
            self.logger.info(f"Deleted gs://{bucket_name}/{blob_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to delete blob: {e}")
            return False
    
    def generate_signed_url(
        self,
        bucket_name: str,
        blob_name: str,
        expiration: timedelta = timedelta(hours=1),
        method: str = "GET"
    ) -> str:
        """Generate a signed URL for a blob."""
        try:
            bucket = self.client.bucket(bucket_name)
            blob = bucket.blob(blob_name)
            
            url = blob.generate_signed_url(
                version="v4",
                expiration=expiration,
                method=method
            )
            
            return url
            
        except Exception as e:
            self.logger.error(f"Failed to generate signed URL: {e}")
            return ""

# ==================== BigQuery Manager ====================

class BigQueryManager:
    """Manage BigQuery resources and queries."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.client = bigquery.Client(
            project=config.project_id,
            credentials=config.get_credentials()
        )
        self.logger = logging.getLogger(__name__)
    
    def create_dataset(
        self,
        dataset_id: str,
        location: Optional[str] = None,
        description: Optional[str] = None
    ) -> bigquery.Dataset:
        """Create a BigQuery dataset."""
        try:
            dataset_ref = f"{self.config.project_id}.{dataset_id}"
            dataset = bigquery.Dataset(dataset_ref)
            
            dataset.location = location or self.config.region
            
            if description:
                dataset.description = description
            
            # Add labels
            dataset.labels = self.config.default_labels
            
            # Create dataset
            dataset = self.client.create_dataset(dataset, timeout=30)
            
            self.logger.info(f"Created dataset: {dataset_id}")
            return dataset
            
        except AlreadyExists:
            self.logger.warning(f"Dataset already exists: {dataset_id}")
            return self.client.get_dataset(dataset_ref)
        except Exception as e:
            self.logger.error(f"Failed to create dataset: {e}")
            raise
    
    def create_table(
        self,
        dataset_id: str,
        table_id: str,
        schema: List[bigquery.SchemaField],
        partition_field: Optional[str] = None,
        clustering_fields: Optional[List[str]] = None
    ) -> bigquery.Table:
        """
        Create a BigQuery table.
        
        Args:
            dataset_id: Dataset ID
            table_id: Table ID
            schema: List of schema fields
            partition_field: Field to partition by
            clustering_fields: Fields to cluster by
            
        Returns:
            Created table object
        """
        try:
            table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
            table = bigquery.Table(table_ref, schema=schema)
            
            # Add partitioning if specified
            if partition_field:
                table.time_partitioning = bigquery.TimePartitioning(
                    field=partition_field
                )
            
            # Add clustering if specified
            if clustering_fields:
                table.clustering_fields = clustering_fields
            
            # Add labels
            table.labels = self.config.default_labels
            
            # Create table
            table = self.client.create_table(table)
            
            self.logger.info(f"Created table: {dataset_id}.{table_id}")
            return table
            
        except AlreadyExists:
            self.logger.warning(f"Table already exists: {dataset_id}.{table_id}")
            return self.client.get_table(table_ref)
        except Exception as e:
            self.logger.error(f"Failed to create table: {e}")
            raise
    
    def query(
        self,
        query: str,
        job_config: Optional[bigquery.QueryJobConfig] = None
    ) -> bigquery.QueryJob:
        """
        Execute a BigQuery query.
        
        Args:
            query: SQL query string
            job_config: Query job configuration
            
        Returns:
            Query job object
        """
        try:
            # Run query
            query_job = self.client.query(query, job_config=job_config)
            
            # Wait for completion
            results = query_job.result()
            
            self.logger.info(f"Query completed: {query_job.job_id}")
            return query_job
            
        except Exception as e:
            self.logger.error(f"Query failed: {e}")
            raise
    
    def load_data_from_gcs(
        self,
        dataset_id: str,
        table_id: str,
        source_uri: str,
        schema: Optional[List[bigquery.SchemaField]] = None,
        source_format: str = "CSV",
        skip_leading_rows: int = 0
    ) -> bigquery.LoadJob:
        """Load data from Cloud Storage into BigQuery."""
        try:
            table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
            
            job_config = bigquery.LoadJobConfig(
                source_format=getattr(bigquery.SourceFormat, source_format),
                skip_leading_rows=skip_leading_rows
            )
            
            if schema:
                job_config.schema = schema
            else:
                job_config.autodetect = True
            
            # Load data
            load_job = self.client.load_table_from_uri(
                source_uri,
                table_ref,
                job_config=job_config
            )
            
            # Wait for completion
            load_job.result()
            
            self.logger.info(f"Loaded data into {dataset_id}.{table_id}")
            return load_job
            
        except Exception as e:
            self.logger.error(f"Failed to load data: {e}")
            raise
    
    def export_to_gcs(
        self,
        dataset_id: str,
        table_id: str,
        destination_uri: str,
        export_format: str = "CSV"
    ) -> bigquery.ExtractJob:
        """Export BigQuery table to Cloud Storage."""
        try:
            table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
            
            job_config = bigquery.ExtractJobConfig(
                destination_format=getattr(bigquery.DestinationFormat, export_format)
            )
            
            # Export data
            extract_job = self.client.extract_table(
                table_ref,
                destination_uri,
                job_config=job_config
            )
            
            # Wait for completion
            extract_job.result()
            
            self.logger.info(f"Exported {dataset_id}.{table_id} to {destination_uri}")
            return extract_job
            
        except Exception as e:
            self.logger.error(f"Failed to export data: {e}")
            raise

# ==================== Cloud Functions Manager ====================

class CloudFunctionsManager:
    """Manage Cloud Functions."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.client = functions_v1.CloudFunctionsServiceClient()
        self.logger = logging.getLogger(__name__)
    
    def deploy_function(
        self,
        function_name: str,
        source_archive_url: str,
        entry_point: str,
        runtime: str = "python39",
        trigger_type: str = "https",
        memory_mb: int = 256,
        timeout_seconds: int = 60,
        environment_variables: Optional[Dict[str, str]] = None
    ) -> functions_v1.CloudFunction:
        """
        Deploy a Cloud Function.
        
        Args:
            function_name: Name of the function
            source_archive_url: GCS URL of source code archive
            entry_point: Name of function to execute
            runtime: Runtime environment
            trigger_type: Trigger type ('https', 'pubsub', 'storage')
            memory_mb: Memory allocation
            timeout_seconds: Timeout in seconds
            environment_variables: Environment variables
            
        Returns:
            Deployed function object
        """
        try:
            parent = f"projects/{self.config.project_id}/locations/{self.config.region}"
            
            function = functions_v1.CloudFunction(
                name=f"{parent}/functions/{function_name}",
                source_archive_url=source_archive_url,
                entry_point=entry_point,
                runtime=runtime,
                available_memory_mb=memory_mb,
                timeout=f"{timeout_seconds}s"
            )
            
            # Set trigger
            if trigger_type == "https":
                function.https_trigger = functions_v1.HttpsTrigger()
            elif trigger_type == "pubsub":
                # Configure Pub/Sub trigger
                pass
            elif trigger_type == "storage":
                # Configure Storage trigger
                pass
            
            # Add environment variables
            if environment_variables:
                function.environment_variables = environment_variables
            
            # Add labels
            function.labels = self.config.default_labels
            
            # Deploy function
            operation = self.client.create_function(
                parent=parent,
                function=function
            )
            
            # Wait for deployment
            result = operation.result()
            
            self.logger.info(f"Deployed function: {function_name}")
            return result
            
        except Exception as e:
            self.logger.error(f"Failed to deploy function: {e}")
            raise

# ==================== Pub/Sub Manager ====================

class PubSubManager:
    """Manage Pub/Sub topics and subscriptions."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.publisher = pubsub_v1.PublisherClient()
        self.subscriber = pubsub_v1.SubscriberClient()
        self.logger = logging.getLogger(__name__)
    
    def create_topic(self, topic_name: str) -> str:
        """Create a Pub/Sub topic."""
        try:
            topic_path = self.publisher.topic_path(
                self.config.project_id,
                topic_name
            )
            
            topic = self.publisher.create_topic(request={"name": topic_path})
            
            self.logger.info(f"Created topic: {topic_name}")
            return topic.name
            
        except AlreadyExists:
            self.logger.warning(f"Topic already exists: {topic_name}")
            return topic_path
        except Exception as e:
            self.logger.error(f"Failed to create topic: {e}")
            raise
    
    def create_subscription(
        self,
        subscription_name: str,
        topic_name: str,
        ack_deadline_seconds: int = 10
    ) -> str:
        """Create a Pub/Sub subscription."""
        try:
            topic_path = self.publisher.topic_path(
                self.config.project_id,
                topic_name
            )
            
            subscription_path = self.subscriber.subscription_path(
                self.config.project_id,
                subscription_name
            )
            
            subscription = self.subscriber.create_subscription(
                request={
                    "name": subscription_path,
                    "topic": topic_path,
                    "ack_deadline_seconds": ack_deadline_seconds
                }
            )
            
            self.logger.info(f"Created subscription: {subscription_name}")
            return subscription.name
            
        except AlreadyExists:
            self.logger.warning(f"Subscription already exists: {subscription_name}")
            return subscription_path
        except Exception as e:
            self.logger.error(f"Failed to create subscription: {e}")
            raise
    
    def publish_message(
        self,
        topic_name: str,
        message: str,
        attributes: Optional[Dict[str, str]] = None
    ) -> str:
        """Publish a message to a topic."""
        try:
            topic_path = self.publisher.topic_path(
                self.config.project_id,
                topic_name
            )
            
            # Encode message
            data = message.encode("utf-8")
            
            # Publish message
            future = self.publisher.publish(
                topic_path,
                data,
                **(attributes or {})
            )
            
            # Get message ID
            message_id = future.result()
            
            self.logger.info(f"Published message to {topic_name}: {message_id}")
            return message_id
            
        except Exception as e:
            self.logger.error(f"Failed to publish message: {e}")
            raise
    
    def pull_messages(
        self,
        subscription_name: str,
        max_messages: int = 10,
        timeout: int = 30
    ) -> List[Dict[str, Any]]:
        """Pull messages from a subscription."""
        try:
            subscription_path = self.subscriber.subscription_path(
                self.config.project_id,
                subscription_name
            )
            
            # Pull messages
            response = self.subscriber.pull(
                request={
                    "subscription": subscription_path,
                    "max_messages": max_messages
                },
                timeout=timeout
            )
            
            messages = []
            ack_ids = []
            
            for msg in response.received_messages:
                messages.append({
                    'data': msg.message.data.decode('utf-8'),
                    'attributes': dict(msg.message.attributes),
                    'message_id': msg.message.message_id,
                    'publish_time': msg.message.publish_time
                })
                ack_ids.append(msg.ack_id)
            
            # Acknowledge messages
            if ack_ids:
                self.subscriber.acknowledge(
                    request={
                        "subscription": subscription_path,
                        "ack_ids": ack_ids
                    }
                )
            
            return messages
            
        except Exception as e:
            self.logger.error(f"Failed to pull messages: {e}")
            return []

# ==================== Firestore Manager ====================

class FirestoreManager:
    """Manage Firestore database."""
    
    def __init__(self, config: GCPConfig):
        self.config = config
        self.client = firestore.Client(
            project=config.project_id,
            credentials=config.get_credentials()
        )
        self.logger = logging.getLogger(__name__)
    
    def create_document(
        self,
        collection: str,
        document_id: Optional[str],
        data: Dict[str, Any]
    ) -> str:
        """Create a document in Firestore."""
        try:
            # Add timestamp
            data['created_at'] = firestore.SERVER_TIMESTAMP
            data['updated_at'] = firestore.SERVER_TIMESTAMP
            
            if document_id:
                doc_ref = self.client.collection(collection).document(document_id)
                doc_ref.set(data)
            else:
                doc_ref = self.client.collection(collection).add(data)[1]
            
            self.logger.info(f"Created document in {collection}: {doc_ref.id}")
            return doc_ref.id
            
        except Exception as e:
            self.logger.error(f"Failed to create document: {e}")
            raise
    
    def get_document(
        self,
        collection: str,
        document_id: str
    ) -> Optional[Dict[str, Any]]:
        """Get a document from Firestore."""
        try:
            doc_ref = self.client.collection(collection).document(document_id)
            doc = doc_ref.get()
            
            if doc.exists:
                return doc.to_dict()
            
            return None
            
        except Exception as e:
            self.logger.error(f"Failed to get document: {e}")
            return None
    
    def query_documents(
        self,
        collection: str,
        filters: Optional[List[Tuple[str, str, Any]]] = None,
        order_by: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """
        Query documents from Firestore.
        
        Args:
            collection: Collection name
            filters: List of filters [(field, operator, value)]
            order_by: Field to order by
            limit: Maximum number of documents
            
        Returns:
            List of documents
        """
        try:
            query = self.client.collection(collection)
            
            # Apply filters
            if filters:
                for field, operator, value in filters:
                    query = query.where(field, operator, value)
            
            # Apply ordering
            if order_by:
                query = query.order_by(order_by)
            
            # Apply limit
            if limit:
                query = query.limit(limit)
            
            # Execute query
            docs = query.get()
            
            return [doc.to_dict() for doc in docs]
            
        except Exception as e:
            self.logger.error(f"Failed to query documents: {e}")
            return []

# ==================== Main GCP Automation Class ====================

class GCPAutomation:
    """High-level GCP automation interface."""
    
    def __init__(self, config: Optional[GCPConfig] = None):
        self.config = config or GCPConfig.from_env()
        
        # Initialize service managers
        self.compute = ComputeEngineManager(self.config)
        self.storage = CloudStorageManager(self.config)
        self.bigquery = BigQueryManager(self.config)
        self.functions = CloudFunctionsManager(self.config)
        self.pubsub = PubSubManager(self.config)
        self.firestore = FirestoreManager(self.config)
        
        self.logger = logging.getLogger(__name__)
        
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    def get_project_info(self) -> Dict[str, Any]:
        """Get GCP project information."""
        return {
            'project_id': self.config.project_id,
            'region': self.config.region,
            'zone': self.config.zone
        }
    
    def create_data_pipeline(
        self,
        pipeline_name: str,
        source_bucket: str,
        dataset_id: str,
        table_id: str
    ):
        """
        Create a data pipeline from GCS to BigQuery.
        
        This demonstrates orchestrating multiple services.
        """
        try:
            # Create storage bucket
            self.storage.create_bucket(source_bucket)
            
            # Create BigQuery dataset
            self.bigquery.create_dataset(dataset_id)
            
            # Create Pub/Sub topic for notifications
            topic_name = f"{pipeline_name}-topic"
            self.pubsub.create_topic(topic_name)
            
            # Create subscription
            subscription_name = f"{pipeline_name}-subscription"
            self.pubsub.create_subscription(subscription_name, topic_name)
            
            self.logger.info(f"Created data pipeline: {pipeline_name}")
            
        except Exception as e:
            self.logger.error(f"Failed to create data pipeline: {e}")
            raise

# Example usage
if __name__ == "__main__":
    print("ā˜ļø Google Cloud Automation Examples\n")
    
    # Example 1: Initialize GCP automation
    print("1ļøāƒ£ Initializing GCP Automation:")
    
    config = GCPConfig.from_env()
    if not config.project_id:
        print("   āš ļø  No GCP_PROJECT_ID found in environment")
        print("   Set with: export GCP_PROJECT_ID='your-project-id'")
    else:
        gcp = GCPAutomation(config)
        
        project_info = gcp.get_project_info()
        print(f"   Project: {project_info['project_id']}")
        print(f"   Region: {project_info['region']}")
        print(f"   Zone: {project_info['zone']}")
    
    # Example 2: Compute Engine operations
    print("\n2ļøāƒ£ Compute Engine Operations:")
    
    print("   Creating instance...")
    print("   instance = gcp.compute.create_instance(")
    print("       instance_name='web-server',")
    print("       machine_type='e2-micro',")
    print("       startup_script='apt-get update && apt-get install -y nginx'")
    print("   )")
    
    # Example 3: Cloud Storage operations
    print("\n3ļøāƒ£ Cloud Storage Operations:")
    
    print("   Creating bucket...")
    print("   gcp.storage.create_bucket(")
    print("       bucket_name='my-data-lake',")
    print("       storage_class='NEARLINE',")
    print("       versioning=True")
    print("   )")
    
    print("\n   Uploading file...")
    print("   gcp.storage.upload_file(")
    print("       bucket_name='my-data-lake',")
    print("       source_file_path='data.csv',")
    print("       destination_blob_name='2024/data.csv'")
    print("   )")
    
    # Example 4: BigQuery operations
    print("\n4ļøāƒ£ BigQuery Operations:")
    
    print("   Creating dataset and table...")
    print("   gcp.bigquery.create_dataset('analytics')")
    print("   ")
    print("   schema = [")
    print("       bigquery.SchemaField('id', 'STRING'),")
    print("       bigquery.SchemaField('name', 'STRING'),")
    print("       bigquery.SchemaField('created_at', 'TIMESTAMP')")
    print("   ]")
    print("   gcp.bigquery.create_table('analytics', 'events', schema)")
    
    # Example 5: Cloud Functions
    print("\n5ļøāƒ£ Cloud Functions Operations:")
    
    print("   Deploying function...")
    print("   gcp.functions.deploy_function(")
    print("       function_name='data-processor',")
    print("       source_archive_url='gs://my-bucket/function.zip',")
    print("       entry_point='main',")
    print("       runtime='python39'")
    print("   )")
    
    # Example 6: Pub/Sub messaging
    print("\n6ļøāƒ£ Pub/Sub Messaging:")
    
    print("   Creating topic and publishing...")
    print("   gcp.pubsub.create_topic('events')")
    print("   gcp.pubsub.publish_message(")
    print("       topic_name='events',")
    print("       message=json.dumps({'type': 'user_signup'})")
    print("   )")
    
    # Example 7: Firestore database
    print("\n7ļøāƒ£ Firestore Operations:")
    
    print("   Creating document...")
    print("   gcp.firestore.create_document(")
    print("       collection='users',")
    print("       document_id='user123',")
    print("       data={'name': 'John', 'email': 'john@example.com'}")
    print("   )")
    
    # Example 8: GCP service comparison
    print("\n8ļøāƒ£ GCP Service Comparison:")
    
    services = [
        ("Compute Engine", "Virtual machines", "IaaS"),
        ("App Engine", "Managed apps", "PaaS"),
        ("Cloud Run", "Containerized apps", "Serverless"),
        ("Cloud Functions", "Event-driven code", "FaaS"),
        ("GKE", "Kubernetes clusters", "Container orchestration"),
        ("BigQuery", "Data warehouse", "Analytics"),
        ("Cloud Storage", "Object storage", "Data lake")
    ]
    
    for service, description, category in services:
        print(f"   {service}: {description} ({category})")
    
    # Example 9: Best practices
    print("\n9ļøāƒ£ GCP Best Practices:")
    
    practices = [
        "šŸ” Use service accounts for authentication",
        "šŸ·ļø Label all resources consistently",
        "šŸŒ Choose appropriate regions for latency",
        "šŸ’¾ Use appropriate storage classes",
        "⚔ Leverage serverless for scalability",
        "šŸ“Š Monitor with Cloud Operations",
        "šŸ”’ Enable audit logging",
        "šŸ’° Set up billing alerts",
        "šŸ”„ Implement lifecycle policies",
        "šŸ›”ļø Use VPC for network isolation"
    ]
    
    for practice in practices:
        print(f"   {practice}")
    
    print("\nāœ… GCP automation demonstration complete!")
    print("\nšŸ“ Note: To use this code, you need:")
    print("   1. GCP Account and project")
    print("   2. Install Google Cloud SDK: gcloud init")
    print("   3. Set up authentication: gcloud auth application-default login")
    print("   4. Install Python libraries: pip install google-cloud-*")

Key Takeaways and Best Practices šŸŽÆ

GCP Automation Best Practices šŸ“‹

Pro Tip: Think of GCP as a toolkit where each service has a specific purpose - choose the right tool for each job. Start with proper authentication using service accounts, not personal credentials. Understand the service hierarchy: projects contain resources, resources have labels, and IAM controls access. Use Compute Engine for full VMs, Cloud Run for containers, and Cloud Functions for event-driven code. For data, use Cloud Storage for objects, Firestore for documents, Cloud SQL for relational data, and BigQuery for analytics. Always apply consistent labels to track costs and organize resources. Implement proper error handling - GCP operations can fail due to quotas, permissions, or network issues. Use Cloud Operations (formerly Stackdriver) for comprehensive monitoring and logging. Take advantage of GCP's global infrastructure by choosing appropriate regions for your workloads. Leverage serverless options when possible to reduce operational overhead. Set up billing alerts early to avoid surprise charges. Most importantly: design for scalability from the start - GCP makes it easy to scale, but your architecture needs to support it!

Mastering Google Cloud automation with Python enables you to leverage Google's powerful infrastructure and services programmatically. You can now provision compute resources, manage storage, process big data with BigQuery, deploy serverless functions, and orchestrate complex cloud workflows. Whether you're building data pipelines, machine learning systems, or global applications, these GCP automation skills empower you to harness the cloud effectively! šŸš€