āļø Google Cloud Automation: Master GCP with Python
Google Cloud Platform (GCP) offers powerful Python libraries that enable you to automate everything from Compute Engine VMs to BigQuery analytics, Cloud Storage to Kubernetes Engine. Like having a command center for Google's global infrastructure, mastering GCP automation allows you to deploy applications, process big data, manage machine learning models, and orchestrate cloud-native solutions at scale. Let's explore the comprehensive world of Google Cloud automation with Python! š
The Google Cloud Automation Architecture
Think of GCP's Python libraries as your toolkit for building on Google's infrastructure - they provide intuitive interfaces to services like Compute Engine for VMs, Cloud Storage for objects, BigQuery for analytics, Cloud Functions for serverless, and Kubernetes Engine for containers. With built-in authentication, automatic retry logic, and comprehensive APIs, you can automate complex cloud workflows seamlessly. Understanding GCP's service ecosystem and Python client libraries is essential for effective cloud automation!
Real-World Scenario: The Multi-Service GCP Platform š
You're building a comprehensive GCP automation platform that provisions Compute Engine instances for applications, manages Cloud Storage for data lakes, processes analytics with BigQuery, deploys serverless functions for event processing, orchestrates containers with Kubernetes Engine, implements Pub/Sub for messaging, monitors with Cloud Operations, and manages IAM security. Your system must handle petabyte-scale data, support global deployments, optimize costs, and maintain enterprise security. Let's build a production-ready GCP automation framework!
# First, install required packages:
# pip install google-cloud-compute google-cloud-storage google-cloud-bigquery
# pip install google-cloud-functions google-cloud-pubsub google-cloud-firestore
# pip install google-cloud-monitoring google-cloud-logging google-auth
import os
import json
import time
from typing import List, Dict, Optional, Any, Union, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import logging
from pathlib import Path
import tempfile
import uuid
# Google Cloud imports
from google.cloud import compute_v1
from google.cloud import storage
from google.cloud import bigquery
from google.cloud import firestore
from google.cloud import pubsub_v1
from google.cloud import functions_v1
from google.cloud import monitoring_v3
from google.cloud import logging as cloud_logging
from google.cloud import secretmanager
from google.oauth2 import service_account
from google.api_core import retry
from google.api_core.exceptions import GoogleAPIError, NotFound, AlreadyExists
# ==================== GCP Configuration ====================
@dataclass
class GCPConfig:
"""Google Cloud Platform configuration."""
project_id: str
region: str = "us-central1"
zone: str = "us-central1-a"
# Authentication
credentials_path: Optional[str] = None
credentials: Optional[service_account.Credentials] = None
# Default settings
default_labels: Dict[str, str] = field(default_factory=lambda: {
"managed-by": "python-automation",
"environment": "production",
"created-at": datetime.now().strftime("%Y-%m-%d")
})
# Retry configuration
max_retries: int = 3
retry_delay: float = 1.0
# Cost management
enable_cost_optimization: bool = True
max_monthly_budget: float = 1000.0
# Network settings
default_network: str = "default"
default_subnetwork: str = "default"
@classmethod
def from_env(cls):
"""Load configuration from environment variables."""
return cls(
project_id=os.getenv("GCP_PROJECT_ID", ""),
region=os.getenv("GCP_REGION", "us-central1"),
zone=os.getenv("GCP_ZONE", "us-central1-a"),
credentials_path=os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
)
def get_credentials(self):
"""Get or create credentials."""
if self.credentials:
return self.credentials
if self.credentials_path:
self.credentials = service_account.Credentials.from_service_account_file(
self.credentials_path
)
return self.credentials
# ==================== Compute Engine Manager ====================
class ComputeEngineManager:
"""Manage Google Compute Engine resources."""
def __init__(self, config: GCPConfig):
self.config = config
self.instances_client = compute_v1.InstancesClient()
self.images_client = compute_v1.ImagesClient()
self.disks_client = compute_v1.DisksClient()
self.firewalls_client = compute_v1.FirewallsClient()
self.networks_client = compute_v1.NetworksClient()
self.logger = logging.getLogger(__name__)
def create_instance(
self,
instance_name: str,
machine_type: str = "e2-micro",
source_image: str = "projects/debian-cloud/global/images/family/debian-11",
disk_size_gb: int = 10,
network: Optional[str] = None,
external_ip: bool = True,
startup_script: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
tags: Optional[List[str]] = None
) -> compute_v1.Instance:
"""
Create a Compute Engine instance.
Args:
instance_name: Name of the instance
machine_type: Machine type (e.g., 'e2-micro', 'n1-standard-1')
source_image: Source image for boot disk
disk_size_gb: Size of boot disk in GB
network: Network name (uses default if not specified)
external_ip: Whether to assign external IP
startup_script: Startup script to run on instance
metadata: Additional metadata
tags: Network tags for firewall rules
Returns:
Created instance object
"""
try:
# Machine type URL
machine_type_url = (
f"zones/{self.config.zone}/machineTypes/{machine_type}"
)
# Boot disk configuration
boot_disk = compute_v1.AttachedDisk(
auto_delete=True,
boot=True,
type_=compute_v1.AttachedDisk.Type.PERSISTENT,
initialize_params=compute_v1.AttachedDiskInitializeParams(
source_image=source_image,
disk_size_gb=disk_size_gb
)
)
# Network configuration
network_name = network or self.config.default_network
network_interface = compute_v1.NetworkInterface(
network=f"global/networks/{network_name}"
)
# Add external IP if requested
if external_ip:
access_config = compute_v1.AccessConfig(
name="External NAT",
type_="ONE_TO_ONE_NAT"
)
network_interface.access_configs = [access_config]
# Metadata items
metadata_items = []
# Add startup script if provided
if startup_script:
metadata_items.append(
compute_v1.Items(
key="startup-script",
value=startup_script
)
)
# Add additional metadata
if metadata:
for key, value in metadata.items():
metadata_items.append(
compute_v1.Items(key=key, value=value)
)
# Instance configuration
instance = compute_v1.Instance(
name=instance_name,
machine_type=machine_type_url,
disks=[boot_disk],
network_interfaces=[network_interface]
)
# Add metadata if any
if metadata_items:
instance.metadata = compute_v1.Metadata(items=metadata_items)
# Add tags if provided
if tags:
instance.tags = compute_v1.Tags(items=tags)
# Add labels
instance.labels = self.config.default_labels.copy()
instance.labels['name'] = instance_name
# Create instance
operation = self.instances_client.insert(
project=self.config.project_id,
zone=self.config.zone,
instance_resource=instance
)
# Wait for operation to complete
self._wait_for_operation(operation, self.config.zone)
# Get the created instance
created_instance = self.instances_client.get(
project=self.config.project_id,
zone=self.config.zone,
instance=instance_name
)
self.logger.info(f"Created instance: {instance_name}")
return created_instance
except Exception as e:
self.logger.error(f"Failed to create instance: {e}")
raise
def list_instances(
self,
zone: Optional[str] = None
) -> List[compute_v1.Instance]:
"""List all instances in a zone."""
try:
zone = zone or self.config.zone
instances = self.instances_client.list(
project=self.config.project_id,
zone=zone
)
return list(instances)
except Exception as e:
self.logger.error(f"Failed to list instances: {e}")
return []
def get_instance(self, instance_name: str) -> Optional[compute_v1.Instance]:
"""Get instance details."""
try:
return self.instances_client.get(
project=self.config.project_id,
zone=self.config.zone,
instance=instance_name
)
except NotFound:
return None
except Exception as e:
self.logger.error(f"Failed to get instance: {e}")
return None
def stop_instance(self, instance_name: str) -> bool:
"""Stop an instance."""
try:
operation = self.instances_client.stop(
project=self.config.project_id,
zone=self.config.zone,
instance=instance_name
)
self._wait_for_operation(operation, self.config.zone)
self.logger.info(f"Stopped instance: {instance_name}")
return True
except Exception as e:
self.logger.error(f"Failed to stop instance: {e}")
return False
def start_instance(self, instance_name: str) -> bool:
"""Start an instance."""
try:
operation = self.instances_client.start(
project=self.config.project_id,
zone=self.config.zone,
instance=instance_name
)
self._wait_for_operation(operation, self.config.zone)
self.logger.info(f"Started instance: {instance_name}")
return True
except Exception as e:
self.logger.error(f"Failed to start instance: {e}")
return False
def delete_instance(self, instance_name: str) -> bool:
"""Delete an instance."""
try:
operation = self.instances_client.delete(
project=self.config.project_id,
zone=self.config.zone,
instance=instance_name
)
self._wait_for_operation(operation, self.config.zone)
self.logger.info(f"Deleted instance: {instance_name}")
return True
except Exception as e:
self.logger.error(f"Failed to delete instance: {e}")
return False
def create_firewall_rule(
self,
rule_name: str,
network: str,
source_ranges: List[str],
allowed_ports: List[Dict[str, Any]],
target_tags: Optional[List[str]] = None
) -> bool:
"""
Create a firewall rule.
Args:
rule_name: Name of the firewall rule
network: Network name
source_ranges: List of source IP ranges (e.g., ['0.0.0.0/0'])
allowed_ports: List of allowed ports [{'protocol': 'tcp', 'ports': ['80', '443']}]
target_tags: Tags for target instances
Returns:
True if successful
"""
try:
# Build allowed list
allowed = []
for rule in allowed_ports:
allowed_item = compute_v1.Allowed(
I_p_protocol=rule['protocol']
)
if 'ports' in rule:
allowed_item.ports = rule['ports']
allowed.append(allowed_item)
# Create firewall rule
firewall_rule = compute_v1.Firewall(
name=rule_name,
network=f"global/networks/{network}",
source_ranges=source_ranges,
allowed=allowed,
direction="INGRESS"
)
# Add target tags if specified
if target_tags:
firewall_rule.target_tags = target_tags
# Create the rule
operation = self.firewalls_client.insert(
project=self.config.project_id,
firewall_resource=firewall_rule
)
self._wait_for_operation(operation)
self.logger.info(f"Created firewall rule: {rule_name}")
return True
except Exception as e:
self.logger.error(f"Failed to create firewall rule: {e}")
return False
def _wait_for_operation(
self,
operation: compute_v1.Operation,
zone: Optional[str] = None
):
"""Wait for an operation to complete."""
while not operation.done:
time.sleep(2)
if zone:
operation = self.instances_client.get(
project=self.config.project_id,
zone=zone,
operation=operation.name
)
else:
# Global operation
pass
# ==================== Cloud Storage Manager ====================
class CloudStorageManager:
"""Manage Google Cloud Storage resources."""
def __init__(self, config: GCPConfig):
self.config = config
self.client = storage.Client(
project=config.project_id,
credentials=config.get_credentials()
)
self.logger = logging.getLogger(__name__)
def create_bucket(
self,
bucket_name: str,
location: Optional[str] = None,
storage_class: str = "STANDARD",
versioning: bool = False,
lifecycle_rules: Optional[List[Dict]] = None
) -> storage.Bucket:
"""
Create a Cloud Storage bucket.
Args:
bucket_name: Name of the bucket
location: Location for bucket (uses default region if not specified)
storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
versioning: Enable versioning
lifecycle_rules: Lifecycle management rules
Returns:
Created bucket object
"""
try:
# Create bucket
bucket = self.client.bucket(bucket_name)
bucket.location = location or self.config.region
bucket.storage_class = storage_class
# Add labels
bucket.labels = self.config.default_labels
# Enable versioning if requested
if versioning:
bucket.versioning_enabled = True
# Add lifecycle rules if provided
if lifecycle_rules:
bucket.lifecycle_rules = lifecycle_rules
# Create the bucket
bucket = self.client.create_bucket(bucket)
self.logger.info(f"Created bucket: {bucket_name}")
return bucket
except AlreadyExists:
self.logger.warning(f"Bucket already exists: {bucket_name}")
return self.client.bucket(bucket_name)
except Exception as e:
self.logger.error(f"Failed to create bucket: {e}")
raise
def upload_file(
self,
bucket_name: str,
source_file_path: str,
destination_blob_name: Optional[str] = None,
content_type: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None
) -> storage.Blob:
"""Upload a file to Cloud Storage."""
try:
bucket = self.client.bucket(bucket_name)
# Use filename as blob name if not specified
if not destination_blob_name:
destination_blob_name = Path(source_file_path).name
blob = bucket.blob(destination_blob_name)
# Set content type if provided
if content_type:
blob.content_type = content_type
# Set metadata if provided
if metadata:
blob.metadata = metadata
# Upload file
blob.upload_from_filename(source_file_path)
self.logger.info(
f"Uploaded {source_file_path} to gs://{bucket_name}/{destination_blob_name}"
)
return blob
except Exception as e:
self.logger.error(f"Failed to upload file: {e}")
raise
def download_file(
self,
bucket_name: str,
source_blob_name: str,
destination_file_path: str
) -> bool:
"""Download a file from Cloud Storage."""
try:
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_path)
self.logger.info(
f"Downloaded gs://{bucket_name}/{source_blob_name} to {destination_file_path}"
)
return True
except Exception as e:
self.logger.error(f"Failed to download file: {e}")
return False
def list_blobs(
self,
bucket_name: str,
prefix: Optional[str] = None,
delimiter: Optional[str] = None
) -> List[storage.Blob]:
"""List blobs in a bucket."""
try:
bucket = self.client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
return list(blobs)
except Exception as e:
self.logger.error(f"Failed to list blobs: {e}")
return []
def delete_blob(self, bucket_name: str, blob_name: str) -> bool:
"""Delete a blob from Cloud Storage."""
try:
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.delete()
self.logger.info(f"Deleted gs://{bucket_name}/{blob_name}")
return True
except Exception as e:
self.logger.error(f"Failed to delete blob: {e}")
return False
def generate_signed_url(
self,
bucket_name: str,
blob_name: str,
expiration: timedelta = timedelta(hours=1),
method: str = "GET"
) -> str:
"""Generate a signed URL for a blob."""
try:
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(blob_name)
url = blob.generate_signed_url(
version="v4",
expiration=expiration,
method=method
)
return url
except Exception as e:
self.logger.error(f"Failed to generate signed URL: {e}")
return ""
# ==================== BigQuery Manager ====================
class BigQueryManager:
"""Manage BigQuery resources and queries."""
def __init__(self, config: GCPConfig):
self.config = config
self.client = bigquery.Client(
project=config.project_id,
credentials=config.get_credentials()
)
self.logger = logging.getLogger(__name__)
def create_dataset(
self,
dataset_id: str,
location: Optional[str] = None,
description: Optional[str] = None
) -> bigquery.Dataset:
"""Create a BigQuery dataset."""
try:
dataset_ref = f"{self.config.project_id}.{dataset_id}"
dataset = bigquery.Dataset(dataset_ref)
dataset.location = location or self.config.region
if description:
dataset.description = description
# Add labels
dataset.labels = self.config.default_labels
# Create dataset
dataset = self.client.create_dataset(dataset, timeout=30)
self.logger.info(f"Created dataset: {dataset_id}")
return dataset
except AlreadyExists:
self.logger.warning(f"Dataset already exists: {dataset_id}")
return self.client.get_dataset(dataset_ref)
except Exception as e:
self.logger.error(f"Failed to create dataset: {e}")
raise
def create_table(
self,
dataset_id: str,
table_id: str,
schema: List[bigquery.SchemaField],
partition_field: Optional[str] = None,
clustering_fields: Optional[List[str]] = None
) -> bigquery.Table:
"""
Create a BigQuery table.
Args:
dataset_id: Dataset ID
table_id: Table ID
schema: List of schema fields
partition_field: Field to partition by
clustering_fields: Fields to cluster by
Returns:
Created table object
"""
try:
table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(table_ref, schema=schema)
# Add partitioning if specified
if partition_field:
table.time_partitioning = bigquery.TimePartitioning(
field=partition_field
)
# Add clustering if specified
if clustering_fields:
table.clustering_fields = clustering_fields
# Add labels
table.labels = self.config.default_labels
# Create table
table = self.client.create_table(table)
self.logger.info(f"Created table: {dataset_id}.{table_id}")
return table
except AlreadyExists:
self.logger.warning(f"Table already exists: {dataset_id}.{table_id}")
return self.client.get_table(table_ref)
except Exception as e:
self.logger.error(f"Failed to create table: {e}")
raise
def query(
self,
query: str,
job_config: Optional[bigquery.QueryJobConfig] = None
) -> bigquery.QueryJob:
"""
Execute a BigQuery query.
Args:
query: SQL query string
job_config: Query job configuration
Returns:
Query job object
"""
try:
# Run query
query_job = self.client.query(query, job_config=job_config)
# Wait for completion
results = query_job.result()
self.logger.info(f"Query completed: {query_job.job_id}")
return query_job
except Exception as e:
self.logger.error(f"Query failed: {e}")
raise
def load_data_from_gcs(
self,
dataset_id: str,
table_id: str,
source_uri: str,
schema: Optional[List[bigquery.SchemaField]] = None,
source_format: str = "CSV",
skip_leading_rows: int = 0
) -> bigquery.LoadJob:
"""Load data from Cloud Storage into BigQuery."""
try:
table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig(
source_format=getattr(bigquery.SourceFormat, source_format),
skip_leading_rows=skip_leading_rows
)
if schema:
job_config.schema = schema
else:
job_config.autodetect = True
# Load data
load_job = self.client.load_table_from_uri(
source_uri,
table_ref,
job_config=job_config
)
# Wait for completion
load_job.result()
self.logger.info(f"Loaded data into {dataset_id}.{table_id}")
return load_job
except Exception as e:
self.logger.error(f"Failed to load data: {e}")
raise
def export_to_gcs(
self,
dataset_id: str,
table_id: str,
destination_uri: str,
export_format: str = "CSV"
) -> bigquery.ExtractJob:
"""Export BigQuery table to Cloud Storage."""
try:
table_ref = f"{self.config.project_id}.{dataset_id}.{table_id}"
job_config = bigquery.ExtractJobConfig(
destination_format=getattr(bigquery.DestinationFormat, export_format)
)
# Export data
extract_job = self.client.extract_table(
table_ref,
destination_uri,
job_config=job_config
)
# Wait for completion
extract_job.result()
self.logger.info(f"Exported {dataset_id}.{table_id} to {destination_uri}")
return extract_job
except Exception as e:
self.logger.error(f"Failed to export data: {e}")
raise
# ==================== Cloud Functions Manager ====================
class CloudFunctionsManager:
"""Manage Cloud Functions."""
def __init__(self, config: GCPConfig):
self.config = config
self.client = functions_v1.CloudFunctionsServiceClient()
self.logger = logging.getLogger(__name__)
def deploy_function(
self,
function_name: str,
source_archive_url: str,
entry_point: str,
runtime: str = "python39",
trigger_type: str = "https",
memory_mb: int = 256,
timeout_seconds: int = 60,
environment_variables: Optional[Dict[str, str]] = None
) -> functions_v1.CloudFunction:
"""
Deploy a Cloud Function.
Args:
function_name: Name of the function
source_archive_url: GCS URL of source code archive
entry_point: Name of function to execute
runtime: Runtime environment
trigger_type: Trigger type ('https', 'pubsub', 'storage')
memory_mb: Memory allocation
timeout_seconds: Timeout in seconds
environment_variables: Environment variables
Returns:
Deployed function object
"""
try:
parent = f"projects/{self.config.project_id}/locations/{self.config.region}"
function = functions_v1.CloudFunction(
name=f"{parent}/functions/{function_name}",
source_archive_url=source_archive_url,
entry_point=entry_point,
runtime=runtime,
available_memory_mb=memory_mb,
timeout=f"{timeout_seconds}s"
)
# Set trigger
if trigger_type == "https":
function.https_trigger = functions_v1.HttpsTrigger()
elif trigger_type == "pubsub":
# Configure Pub/Sub trigger
pass
elif trigger_type == "storage":
# Configure Storage trigger
pass
# Add environment variables
if environment_variables:
function.environment_variables = environment_variables
# Add labels
function.labels = self.config.default_labels
# Deploy function
operation = self.client.create_function(
parent=parent,
function=function
)
# Wait for deployment
result = operation.result()
self.logger.info(f"Deployed function: {function_name}")
return result
except Exception as e:
self.logger.error(f"Failed to deploy function: {e}")
raise
# ==================== Pub/Sub Manager ====================
class PubSubManager:
"""Manage Pub/Sub topics and subscriptions."""
def __init__(self, config: GCPConfig):
self.config = config
self.publisher = pubsub_v1.PublisherClient()
self.subscriber = pubsub_v1.SubscriberClient()
self.logger = logging.getLogger(__name__)
def create_topic(self, topic_name: str) -> str:
"""Create a Pub/Sub topic."""
try:
topic_path = self.publisher.topic_path(
self.config.project_id,
topic_name
)
topic = self.publisher.create_topic(request={"name": topic_path})
self.logger.info(f"Created topic: {topic_name}")
return topic.name
except AlreadyExists:
self.logger.warning(f"Topic already exists: {topic_name}")
return topic_path
except Exception as e:
self.logger.error(f"Failed to create topic: {e}")
raise
def create_subscription(
self,
subscription_name: str,
topic_name: str,
ack_deadline_seconds: int = 10
) -> str:
"""Create a Pub/Sub subscription."""
try:
topic_path = self.publisher.topic_path(
self.config.project_id,
topic_name
)
subscription_path = self.subscriber.subscription_path(
self.config.project_id,
subscription_name
)
subscription = self.subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": ack_deadline_seconds
}
)
self.logger.info(f"Created subscription: {subscription_name}")
return subscription.name
except AlreadyExists:
self.logger.warning(f"Subscription already exists: {subscription_name}")
return subscription_path
except Exception as e:
self.logger.error(f"Failed to create subscription: {e}")
raise
def publish_message(
self,
topic_name: str,
message: str,
attributes: Optional[Dict[str, str]] = None
) -> str:
"""Publish a message to a topic."""
try:
topic_path = self.publisher.topic_path(
self.config.project_id,
topic_name
)
# Encode message
data = message.encode("utf-8")
# Publish message
future = self.publisher.publish(
topic_path,
data,
**(attributes or {})
)
# Get message ID
message_id = future.result()
self.logger.info(f"Published message to {topic_name}: {message_id}")
return message_id
except Exception as e:
self.logger.error(f"Failed to publish message: {e}")
raise
def pull_messages(
self,
subscription_name: str,
max_messages: int = 10,
timeout: int = 30
) -> List[Dict[str, Any]]:
"""Pull messages from a subscription."""
try:
subscription_path = self.subscriber.subscription_path(
self.config.project_id,
subscription_name
)
# Pull messages
response = self.subscriber.pull(
request={
"subscription": subscription_path,
"max_messages": max_messages
},
timeout=timeout
)
messages = []
ack_ids = []
for msg in response.received_messages:
messages.append({
'data': msg.message.data.decode('utf-8'),
'attributes': dict(msg.message.attributes),
'message_id': msg.message.message_id,
'publish_time': msg.message.publish_time
})
ack_ids.append(msg.ack_id)
# Acknowledge messages
if ack_ids:
self.subscriber.acknowledge(
request={
"subscription": subscription_path,
"ack_ids": ack_ids
}
)
return messages
except Exception as e:
self.logger.error(f"Failed to pull messages: {e}")
return []
# ==================== Firestore Manager ====================
class FirestoreManager:
"""Manage Firestore database."""
def __init__(self, config: GCPConfig):
self.config = config
self.client = firestore.Client(
project=config.project_id,
credentials=config.get_credentials()
)
self.logger = logging.getLogger(__name__)
def create_document(
self,
collection: str,
document_id: Optional[str],
data: Dict[str, Any]
) -> str:
"""Create a document in Firestore."""
try:
# Add timestamp
data['created_at'] = firestore.SERVER_TIMESTAMP
data['updated_at'] = firestore.SERVER_TIMESTAMP
if document_id:
doc_ref = self.client.collection(collection).document(document_id)
doc_ref.set(data)
else:
doc_ref = self.client.collection(collection).add(data)[1]
self.logger.info(f"Created document in {collection}: {doc_ref.id}")
return doc_ref.id
except Exception as e:
self.logger.error(f"Failed to create document: {e}")
raise
def get_document(
self,
collection: str,
document_id: str
) -> Optional[Dict[str, Any]]:
"""Get a document from Firestore."""
try:
doc_ref = self.client.collection(collection).document(document_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()
return None
except Exception as e:
self.logger.error(f"Failed to get document: {e}")
return None
def query_documents(
self,
collection: str,
filters: Optional[List[Tuple[str, str, Any]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Query documents from Firestore.
Args:
collection: Collection name
filters: List of filters [(field, operator, value)]
order_by: Field to order by
limit: Maximum number of documents
Returns:
List of documents
"""
try:
query = self.client.collection(collection)
# Apply filters
if filters:
for field, operator, value in filters:
query = query.where(field, operator, value)
# Apply ordering
if order_by:
query = query.order_by(order_by)
# Apply limit
if limit:
query = query.limit(limit)
# Execute query
docs = query.get()
return [doc.to_dict() for doc in docs]
except Exception as e:
self.logger.error(f"Failed to query documents: {e}")
return []
# ==================== Main GCP Automation Class ====================
class GCPAutomation:
"""High-level GCP automation interface."""
def __init__(self, config: Optional[GCPConfig] = None):
self.config = config or GCPConfig.from_env()
# Initialize service managers
self.compute = ComputeEngineManager(self.config)
self.storage = CloudStorageManager(self.config)
self.bigquery = BigQueryManager(self.config)
self.functions = CloudFunctionsManager(self.config)
self.pubsub = PubSubManager(self.config)
self.firestore = FirestoreManager(self.config)
self.logger = logging.getLogger(__name__)
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def get_project_info(self) -> Dict[str, Any]:
"""Get GCP project information."""
return {
'project_id': self.config.project_id,
'region': self.config.region,
'zone': self.config.zone
}
def create_data_pipeline(
self,
pipeline_name: str,
source_bucket: str,
dataset_id: str,
table_id: str
):
"""
Create a data pipeline from GCS to BigQuery.
This demonstrates orchestrating multiple services.
"""
try:
# Create storage bucket
self.storage.create_bucket(source_bucket)
# Create BigQuery dataset
self.bigquery.create_dataset(dataset_id)
# Create Pub/Sub topic for notifications
topic_name = f"{pipeline_name}-topic"
self.pubsub.create_topic(topic_name)
# Create subscription
subscription_name = f"{pipeline_name}-subscription"
self.pubsub.create_subscription(subscription_name, topic_name)
self.logger.info(f"Created data pipeline: {pipeline_name}")
except Exception as e:
self.logger.error(f"Failed to create data pipeline: {e}")
raise
# Example usage
if __name__ == "__main__":
print("āļø Google Cloud Automation Examples\n")
# Example 1: Initialize GCP automation
print("1ļøā£ Initializing GCP Automation:")
config = GCPConfig.from_env()
if not config.project_id:
print(" ā ļø No GCP_PROJECT_ID found in environment")
print(" Set with: export GCP_PROJECT_ID='your-project-id'")
else:
gcp = GCPAutomation(config)
project_info = gcp.get_project_info()
print(f" Project: {project_info['project_id']}")
print(f" Region: {project_info['region']}")
print(f" Zone: {project_info['zone']}")
# Example 2: Compute Engine operations
print("\n2ļøā£ Compute Engine Operations:")
print(" Creating instance...")
print(" instance = gcp.compute.create_instance(")
print(" instance_name='web-server',")
print(" machine_type='e2-micro',")
print(" startup_script='apt-get update && apt-get install -y nginx'")
print(" )")
# Example 3: Cloud Storage operations
print("\n3ļøā£ Cloud Storage Operations:")
print(" Creating bucket...")
print(" gcp.storage.create_bucket(")
print(" bucket_name='my-data-lake',")
print(" storage_class='NEARLINE',")
print(" versioning=True")
print(" )")
print("\n Uploading file...")
print(" gcp.storage.upload_file(")
print(" bucket_name='my-data-lake',")
print(" source_file_path='data.csv',")
print(" destination_blob_name='2024/data.csv'")
print(" )")
# Example 4: BigQuery operations
print("\n4ļøā£ BigQuery Operations:")
print(" Creating dataset and table...")
print(" gcp.bigquery.create_dataset('analytics')")
print(" ")
print(" schema = [")
print(" bigquery.SchemaField('id', 'STRING'),")
print(" bigquery.SchemaField('name', 'STRING'),")
print(" bigquery.SchemaField('created_at', 'TIMESTAMP')")
print(" ]")
print(" gcp.bigquery.create_table('analytics', 'events', schema)")
# Example 5: Cloud Functions
print("\n5ļøā£ Cloud Functions Operations:")
print(" Deploying function...")
print(" gcp.functions.deploy_function(")
print(" function_name='data-processor',")
print(" source_archive_url='gs://my-bucket/function.zip',")
print(" entry_point='main',")
print(" runtime='python39'")
print(" )")
# Example 6: Pub/Sub messaging
print("\n6ļøā£ Pub/Sub Messaging:")
print(" Creating topic and publishing...")
print(" gcp.pubsub.create_topic('events')")
print(" gcp.pubsub.publish_message(")
print(" topic_name='events',")
print(" message=json.dumps({'type': 'user_signup'})")
print(" )")
# Example 7: Firestore database
print("\n7ļøā£ Firestore Operations:")
print(" Creating document...")
print(" gcp.firestore.create_document(")
print(" collection='users',")
print(" document_id='user123',")
print(" data={'name': 'John', 'email': 'john@example.com'}")
print(" )")
# Example 8: GCP service comparison
print("\n8ļøā£ GCP Service Comparison:")
services = [
("Compute Engine", "Virtual machines", "IaaS"),
("App Engine", "Managed apps", "PaaS"),
("Cloud Run", "Containerized apps", "Serverless"),
("Cloud Functions", "Event-driven code", "FaaS"),
("GKE", "Kubernetes clusters", "Container orchestration"),
("BigQuery", "Data warehouse", "Analytics"),
("Cloud Storage", "Object storage", "Data lake")
]
for service, description, category in services:
print(f" {service}: {description} ({category})")
# Example 9: Best practices
print("\n9ļøā£ GCP Best Practices:")
practices = [
"š Use service accounts for authentication",
"š·ļø Label all resources consistently",
"š Choose appropriate regions for latency",
"š¾ Use appropriate storage classes",
"ā” Leverage serverless for scalability",
"š Monitor with Cloud Operations",
"š Enable audit logging",
"š° Set up billing alerts",
"š Implement lifecycle policies",
"š”ļø Use VPC for network isolation"
]
for practice in practices:
print(f" {practice}")
print("\nā
GCP automation demonstration complete!")
print("\nš Note: To use this code, you need:")
print(" 1. GCP Account and project")
print(" 2. Install Google Cloud SDK: gcloud init")
print(" 3. Set up authentication: gcloud auth application-default login")
print(" 4. Install Python libraries: pip install google-cloud-*")
Key Takeaways and Best Practices šÆ
- Use Service Accounts: Prefer service accounts over user credentials.
- Handle Exceptions: Catch GoogleAPIError and specific exceptions.
- Label Resources: Apply consistent labels for organization and billing.
- Choose Regions Wisely: Consider latency and compliance requirements.
- Implement Retry Logic: Use built-in retry decorators for reliability.
- Monitor Costs: Set up billing alerts and track resource usage.
- Use Appropriate Services: Choose between IaaS, PaaS, and serverless options.
- Enable Audit Logging: Track all API calls for security and compliance.
GCP Automation Best Practices š
Mastering Google Cloud automation with Python enables you to leverage Google's powerful infrastructure and services programmatically. You can now provision compute resources, manage storage, process big data with BigQuery, deploy serverless functions, and orchestrate complex cloud workflows. Whether you're building data pipelines, machine learning systems, or global applications, these GCP automation skills empower you to harness the cloud effectively! š
Pro Tip: Think of GCP as a toolkit where each service has a specific purpose - choose the right tool for each job. Start with proper authentication using service accounts, not personal credentials. Understand the service hierarchy: projects contain resources, resources have labels, and IAM controls access. Use Compute Engine for full VMs, Cloud Run for containers, and Cloud Functions for event-driven code. For data, use Cloud Storage for objects, Firestore for documents, Cloud SQL for relational data, and BigQuery for analytics. Always apply consistent labels to track costs and organize resources. Implement proper error handling - GCP operations can fail due to quotas, permissions, or network issues. Use Cloud Operations (formerly Stackdriver) for comprehensive monitoring and logging. Take advantage of GCP's global infrastructure by choosing appropriate regions for your workloads. Leverage serverless options when possible to reduce operational overhead. Set up billing alerts early to avoid surprise charges. Most importantly: design for scalability from the start - GCP makes it easy to scale, but your architecture needs to support it!