Skip to main content

๐Ÿ”— UiPath/Python Integration: Bridge Enterprise RPA Platforms

Integrating Python with enterprise RPA platforms like UiPath unlocks the best of both worlds - the visual workflow design and enterprise features of UiPath combined with Python's powerful libraries for data science, machine learning, and custom processing. Like adding a turbocharger to an already powerful engine, Python integration extends UiPath's capabilities to handle complex scenarios that require advanced algorithms, API integrations, or specialized processing. Whether you're enhancing workflows with ML models, processing complex data, or integrating with systems UiPath doesn't natively support, mastering this integration is crucial for enterprise automation. Let's explore the comprehensive world of UiPath and Python integration! ๐ŸŒ‰

The Integration Architecture

Think of UiPath/Python integration as creating a hybrid automation ecosystem where UiPath orchestrates the workflow while Python handles specialized processing - UiPath manages the process flow, UI automation, and enterprise features, while Python provides computational power, libraries, and custom logic. Using various integration methods like Python activities, REST APIs, and shared databases, you can create sophisticated automation solutions. Understanding integration patterns, data exchange, and error handling across platforms is essential for successful implementation!

graph TB A[UiPath/Python Integration] --> B[Integration Methods] A --> C[Architecture] A --> D[Use Cases] A --> E[Components] B --> F[Python Activity] B --> G[Python Script] B --> H[REST API] B --> I[Message Queue] C --> J[UiPath Orchestrator] C --> K[Python Services] C --> L[Data Exchange] C --> M[Error Handling] D --> N[ML Models] D --> O[Data Processing] D --> P[Custom Logic] D --> Q[System Integration] E --> R[UiPath Robot] E --> S[Python Runtime] E --> T[Libraries] E --> U[Monitoring] V[Implementation] --> W[Design] V --> X[Development] V --> Y[Testing] V --> Z[Deployment] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Intelligent Automation Platform ๐Ÿ—๏ธ

You're building an intelligent automation platform that combines UiPath's enterprise RPA capabilities with Python's advanced processing power to handle document classification using ML models, extract data from complex unstructured documents, perform real-time fraud detection on transactions, integrate with legacy systems via custom APIs, process natural language customer requests, generate dynamic reports with data visualization, optimize business processes using analytics, and scale processing based on workload. Your platform must seamlessly pass data between UiPath and Python, handle errors gracefully across platforms, maintain performance under load, and provide unified monitoring. Let's build a comprehensive integration framework!

# Comprehensive UiPath/Python Integration Framework
# pip install flask fastapi uvicorn pydantic
# pip install pandas numpy scikit-learn transformers
# pip install python-dotenv redis celery
# pip install prometheus-client structlog

import os
import sys
import json
import logging
import traceback
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
import base64
import pickle
import asyncio
import threading
import queue

# Web frameworks for API endpoints
from flask import Flask, request, jsonify
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
import uvicorn

# Data processing and ML
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

# Message queuing and caching
import redis
from celery import Celery

# Monitoring and logging
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import structlog

# ==================== Configuration ====================

@dataclass
class IntegrationConfig:
    """Configuration for UiPath/Python integration."""
    # UiPath Orchestrator settings
    orchestrator_url: str = "https://orchestrator.example.com"
    orchestrator_tenant: str = "Default"
    orchestrator_api_key: Optional[str] = None
    
    # Python service settings
    service_host: str = "0.0.0.0"
    service_port: int = 8000
    
    # Redis settings for caching
    redis_host: str = "localhost"
    redis_port: int = 6379
    redis_db: int = 0
    
    # Message queue settings
    celery_broker: str = "redis://localhost:6379/1"
    celery_backend: str = "redis://localhost:6379/2"
    
    # Performance settings
    max_workers: int = 4
    request_timeout: int = 300
    cache_ttl: int = 3600
    
    # Monitoring
    enable_metrics: bool = True
    log_level: str = "INFO"

# Load configuration
config = IntegrationConfig()

# ==================== Logging Setup ====================

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# ==================== Metrics Setup ====================

# Prometheus metrics
request_count = Counter('uipath_python_requests_total', 'Total requests', ['method', 'endpoint'])
request_duration = Histogram('uipath_python_request_duration_seconds', 'Request duration')
active_tasks = Gauge('uipath_python_active_tasks', 'Number of active tasks')
error_count = Counter('uipath_python_errors_total', 'Total errors', ['error_type'])

# ==================== Data Models ====================

class RequestStatus(str, Enum):
    """Status of a request."""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class UiPathRequest(BaseModel):
    """Request from UiPath."""
    request_id: str = Field(..., description="Unique request ID")
    process_name: str = Field(..., description="Process to execute")
    input_data: Dict[str, Any] = Field(..., description="Input parameters")
    callback_url: Optional[str] = Field(None, description="Webhook for async response")
    timeout: Optional[int] = Field(300, description="Timeout in seconds")
    
    @validator('timeout')
    def validate_timeout(cls, v):
        """Validate timeout range."""
        if v < 1 or v > 3600:
            raise ValueError("Timeout must be between 1 and 3600 seconds")
        return v

class UiPathResponse(BaseModel):
    """Response to UiPath."""
    request_id: str
    status: RequestStatus
    output_data: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    processing_time: Optional[float] = None
    
    class Config:
        use_enum_values = True

# ==================== Python Service Core ====================

class PythonService:
    """Core Python service for UiPath integration."""
    
    def __init__(self, config: IntegrationConfig):
        self.config = config
        self.processors = {}
        self.redis_client = self._init_redis()
        self.celery_app = self._init_celery()
        
    def _init_redis(self) -> redis.Redis:
        """Initialize Redis client."""
        return redis.Redis(
            host=self.config.redis_host,
            port=self.config.redis_port,
            db=self.config.redis_db,
            decode_responses=True
        )
    
    def _init_celery(self) -> Celery:
        """Initialize Celery for async processing."""
        app = Celery('uipath_python')
        app.config_from_object({
            'broker_url': self.config.celery_broker,
            'result_backend': self.config.celery_backend,
            'task_serializer': 'json',
            'accept_content': ['json'],
            'result_serializer': 'json',
            'timezone': 'UTC',
            'enable_utc': True,
        })
        return app
    
    def register_processor(self, name: str, processor: Callable):
        """Register a process processor."""
        self.processors[name] = processor
        logger.info("Registered processor", processor=name)
    
    def process_request(self, request: UiPathRequest) -> UiPathResponse:
        """Process a request from UiPath."""
        start_time = datetime.now()
        
        try:
            # Check cache
            cached_result = self._get_cached_result(request.request_id)
            if cached_result:
                return cached_result
            
            # Get processor
            processor = self.processors.get(request.process_name)
            if not processor:
                raise ValueError(f"Unknown process: {request.process_name}")
            
            # Update metrics
            active_tasks.inc()
            
            # Process request
            logger.info("Processing request", 
                       request_id=request.request_id,
                       process=request.process_name)
            
            result = processor(request.input_data)
            
            # Create response
            processing_time = (datetime.now() - start_time).total_seconds()
            response = UiPathResponse(
                request_id=request.request_id,
                status=RequestStatus.COMPLETED,
                output_data=result,
                processing_time=processing_time
            )
            
            # Cache result
            self._cache_result(request.request_id, response)
            
            return response
            
        except Exception as e:
            error_count.labels(error_type=type(e).__name__).inc()
            logger.error("Processing failed", 
                        request_id=request.request_id,
                        error=str(e),
                        traceback=traceback.format_exc())
            
            return UiPathResponse(
                request_id=request.request_id,
                status=RequestStatus.FAILED,
                error=str(e),
                processing_time=(datetime.now() - start_time).total_seconds()
            )
        finally:
            active_tasks.dec()
    
    def _get_cached_result(self, request_id: str) -> Optional[UiPathResponse]:
        """Get cached result if available."""
        cached = self.redis_client.get(f"result:{request_id}")
        if cached:
            return UiPathResponse(**json.loads(cached))
        return None
    
    def _cache_result(self, request_id: str, response: UiPathResponse):
        """Cache result for future retrieval."""
        self.redis_client.setex(
            f"result:{request_id}",
            self.config.cache_ttl,
            response.json()
        )

# ==================== Process Implementations ====================

class DocumentProcessor:
    """Process documents with ML."""
    
    def __init__(self):
        self.classifier = self._load_classifier()
        self.scaler = StandardScaler()
    
    def _load_classifier(self) -> RandomForestClassifier:
        """Load or train document classifier."""
        # In production, load from file
        classifier = RandomForestClassifier(n_estimators=100)
        
        # Dummy training for example
        X_train = np.random.rand(100, 10)
        y_train = np.random.randint(0, 3, 100)
        classifier.fit(X_train, y_train)
        
        return classifier
    
    def classify_document(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Classify a document."""
        # Extract features from document
        document_text = data.get('text', '')
        features = self._extract_features(document_text)
        
        # Scale features
        features_scaled = self.scaler.fit_transform([features])
        
        # Predict class
        prediction = self.classifier.predict(features_scaled)[0]
        confidence = self.classifier.predict_proba(features_scaled)[0].max()
        
        # Map to document type
        doc_types = {0: 'invoice', 1: 'contract', 2: 'report'}
        
        return {
            'document_type': doc_types.get(prediction, 'unknown'),
            'confidence': float(confidence),
            'features': features
        }
    
    def _extract_features(self, text: str) -> List[float]:
        """Extract features from text."""
        # Simplified feature extraction
        features = [
            len(text),
            text.count(' '),
            text.count('\n'),
            text.count('.'),
            text.count(','),
            len(text.split()),
            text.count('$'),
            text.count('%'),
            text.count('@'),
            1 if 'invoice' in text.lower() else 0
        ]
        return features

class DataProcessor:
    """Process and transform data."""
    
    def clean_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Clean and validate data."""
        df = pd.DataFrame(data.get('records', []))
        
        # Remove duplicates
        df = df.drop_duplicates()
        
        # Handle missing values
        df = df.fillna(method='ffill')
        
        # Standardize columns
        df.columns = [col.lower().replace(' ', '_') for col in df.columns]
        
        return {
            'cleaned_records': df.to_dict('records'),
            'original_count': len(data.get('records', [])),
            'cleaned_count': len(df),
            'duplicates_removed': len(data.get('records', [])) - len(df)
        }
    
    def aggregate_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Aggregate data with various statistics."""
        df = pd.DataFrame(data.get('records', []))
        group_by = data.get('group_by', [])
        
        if not group_by:
            # Overall aggregation
            return {
                'count': len(df),
                'numeric_columns': df.select_dtypes(include=[np.number]).columns.tolist(),
                'statistics': df.describe().to_dict()
            }
        
        # Group aggregation
        aggregated = df.groupby(group_by).agg({
            col: ['mean', 'sum', 'count'] 
            for col in df.select_dtypes(include=[np.number]).columns
        })
        
        return {
            'aggregated_data': aggregated.to_dict(),
            'groups': df[group_by].nunique()
        }

class MLProcessor:
    """Machine learning processing."""
    
    def predict(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Make predictions using ML model."""
        model_name = data.get('model_name', 'default')
        input_features = data.get('features', [])
        
        # Load model (simplified)
        model = self._load_model(model_name)
        
        # Make prediction
        prediction = model.predict([input_features])[0]
        probability = model.predict_proba([input_features])[0].tolist()
        
        return {
            'prediction': int(prediction),
            'probability': probability,
            'model_name': model_name,
            'timestamp': datetime.now().isoformat()
        }
    
    def _load_model(self, model_name: str):
        """Load ML model."""
        # In production, load from model registry
        model = RandomForestClassifier()
        
        # Dummy training
        X = np.random.rand(100, 5)
        y = np.random.randint(0, 2, 100)
        model.fit(X, y)
        
        return model

# ==================== REST API Server ====================

# Flask application
flask_app = Flask(__name__)
service = PythonService(config)

# Register processors
doc_processor = DocumentProcessor()
data_processor = DataProcessor()
ml_processor = MLProcessor()

service.register_processor('classify_document', doc_processor.classify_document)
service.register_processor('clean_data', data_processor.clean_data)
service.register_processor('aggregate_data', data_processor.aggregate_data)
service.register_processor('predict', ml_processor.predict)

@flask_app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint."""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'processors': list(service.processors.keys())
    })

@flask_app.route('/process', methods=['POST'])
def process():
    """Process request from UiPath."""
    try:
        # Parse request
        request_data = request.get_json()
        uipath_request = UiPathRequest(**request_data)
        
        # Update metrics
        request_count.labels(method='POST', endpoint='/process').inc()
        
        # Process request
        with request_duration.time():
            response = service.process_request(uipath_request)
        
        return jsonify(response.dict())
        
    except Exception as e:
        logger.error("API error", error=str(e))
        return jsonify({
            'error': str(e),
            'status': 'failed'
        }), 400

@flask_app.route('/async/process', methods=['POST'])
def async_process():
    """Async process request from UiPath."""
    try:
        request_data = request.get_json()
        uipath_request = UiPathRequest(**request_data)
        
        # Queue for async processing
        task = process_async_task.delay(uipath_request.dict())
        
        return jsonify({
            'request_id': uipath_request.request_id,
            'task_id': task.id,
            'status': 'queued'
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 400

@flask_app.route('/result/', methods=['GET'])
def get_result(request_id: str):
    """Get processing result."""
    result = service._get_cached_result(request_id)
    
    if result:
        return jsonify(result.dict())
    
    return jsonify({
        'request_id': request_id,
        'status': 'not_found'
    }), 404

@flask_app.route('/metrics', methods=['GET'])
def metrics():
    """Prometheus metrics endpoint."""
    return generate_latest()

# ==================== Celery Tasks ====================

celery_app = Celery('uipath_tasks')
celery_app.config_from_object({
    'broker_url': config.celery_broker,
    'result_backend': config.celery_backend,
})

@celery_app.task
def process_async_task(request_dict: Dict):
    """Process task asynchronously."""
    request = UiPathRequest(**request_dict)
    response = service.process_request(request)
    
    # Send callback if provided
    if request.callback_url:
        import requests
        requests.post(request.callback_url, json=response.dict())
    
    return response.dict()

# ==================== UiPath Integration Client ====================

class UiPathClient:
    """Client for UiPath Orchestrator integration."""
    
    def __init__(self, orchestrator_url: str, api_key: str):
        self.orchestrator_url = orchestrator_url
        self.api_key = api_key
        self.session = self._init_session()
    
    def _init_session(self):
        """Initialize HTTP session."""
        import requests
        session = requests.Session()
        session.headers.update({
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        })
        return session
    
    def get_queue_item(self, queue_name: str) -> Optional[Dict]:
        """Get item from Orchestrator queue."""
        url = f"{self.orchestrator_url}/odata/QueueItems"
        params = {
            '$filter': f"QueueDefinitionId eq '{queue_name}' and Status eq 'New'",
            '$top': 1
        }
        
        response = self.session.get(url, params=params)
        
        if response.status_code == 200:
            items = response.json().get('value', [])
            return items[0] if items else None
        
        return None
    
    def update_queue_item(self, item_id: str, status: str, output: Dict):
        """Update queue item status."""
        url = f"{self.orchestrator_url}/odata/QueueItems({item_id})"
        
        data = {
            'Status': status,
            'OutputData': json.dumps(output)
        }
        
        response = self.session.patch(url, json=data)
        return response.status_code == 200
    
    def trigger_job(self, process_name: str, input_args: Dict) -> str:
        """Trigger UiPath job."""
        url = f"{self.orchestrator_url}/odata/Jobs/UiPath.Server.Configuration.OData.StartJobs"
        
        data = {
            'startInfo': {
                'ReleaseKey': process_name,
                'Strategy': 'Specific',
                'RobotIds': [],
                'InputArguments': json.dumps(input_args)
            }
        }
        
        response = self.session.post(url, json=data)
        
        if response.status_code == 201:
            return response.json().get('value', [{}])[0].get('Id')
        
        raise Exception(f"Failed to trigger job: {response.text}")

# ==================== Integration Patterns ====================

class IntegrationPatterns:
    """Common integration patterns between UiPath and Python."""
    
    @staticmethod
    def sync_processing_pattern():
        """Synchronous processing pattern."""
        return """
        UiPath Workflow:
        1. Prepare data in UiPath
        2. Call Python HTTP Request activity
        3. Wait for response
        4. Process response in UiPath
        
        Python Service:
        1. Receive request
        2. Process synchronously
        3. Return response immediately
        """
    
    @staticmethod
    def async_processing_pattern():
        """Asynchronous processing pattern."""
        return """
        UiPath Workflow:
        1. Submit request to Python
        2. Get task ID
        3. Continue with other activities
        4. Poll for result or wait for callback
        
        Python Service:
        1. Queue task
        2. Return task ID
        3. Process asynchronously
        4. Store result or send callback
        """
    
    @staticmethod
    def queue_based_pattern():
        """Queue-based integration pattern."""
        return """
        UiPath Workflow:
        1. Add items to Orchestrator queue
        2. Python polls queue
        3. Python processes items
        4. Python updates queue items
        
        Python Service:
        1. Poll Orchestrator queue
        2. Process queue items
        3. Update item status
        4. Add results to output queue
        """
    
    @staticmethod
    def file_based_pattern():
        """File-based integration pattern."""
        return """
        UiPath Workflow:
        1. Save data to shared location
        2. Trigger Python via HTTP/Queue
        3. Python processes file
        4. Python saves result file
        5. UiPath reads result file
        
        Python Service:
        1. Monitor file location
        2. Process new files
        3. Save results
        4. Notify completion
        """

# ==================== Error Handling ====================

class ErrorHandler:
    """Unified error handling for integration."""
    
    @staticmethod
    def handle_uipath_error(error: Exception) -> Dict:
        """Handle UiPath-related errors."""
        error_mapping = {
            ConnectionError: "Cannot connect to UiPath Orchestrator",
            TimeoutError: "Request to UiPath timed out",
            ValueError: "Invalid data from UiPath",
            KeyError: "Missing required field from UiPath"
        }
        
        error_type = type(error)
        message = error_mapping.get(error_type, str(error))
        
        return {
            'error_type': error_type.__name__,
            'message': message,
            'details': str(error),
            'timestamp': datetime.now().isoformat()
        }
    
    @staticmethod
    def handle_python_error(error: Exception) -> Dict:
        """Handle Python processing errors."""
        return {
            'error_type': type(error).__name__,
            'message': str(error),
            'traceback': traceback.format_exc(),
            'timestamp': datetime.now().isoformat()
        }

# ==================== Deployment Configuration ====================

def create_docker_compose():
    """Create Docker Compose configuration for deployment."""
    docker_compose = """
version: '3.8'

services:
  python-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_HOST=redis
      - ORCHESTRATOR_URL=${ORCHESTRATOR_URL}
      - ORCHESTRATOR_API_KEY=${ORCHESTRATOR_API_KEY}
    depends_on:
      - redis
    volumes:
      - ./models:/app/models
      - ./logs:/app/logs
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
  
  celery-worker:
    build: .
    command: celery -A uipath_tasks worker --loglevel=info
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis
    volumes:
      - ./models:/app/models
    restart: unless-stopped
  
  celery-beat:
    build: .
    command: celery -A uipath_tasks beat --loglevel=info
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis
    restart: unless-stopped
  
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
  
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
    depends_on:
      - prometheus

volumes:
  redis-data:
  prometheus-data:
  grafana-data:
"""
    
    with open('docker-compose.yml', 'w') as f:
        f.write(docker_compose)
    
    return docker_compose

def create_dockerfile():
    """Create Dockerfile for Python service."""
    dockerfile = """
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create directories
RUN mkdir -p /app/logs /app/models

# Run service
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
    
    with open('Dockerfile', 'w') as f:
        f.write(dockerfile)
    
    return dockerfile

# Example usage
if __name__ == "__main__":
    print("๐Ÿ”— UiPath/Python Integration Examples\n")
    
    # Example 1: Integration methods
    print("1๏ธโƒฃ Integration Methods:")
    methods = [
        ("Python Activity", "Direct Python script execution in UiPath"),
        ("HTTP Request", "REST API calls between platforms"),
        ("Queue Based", "Orchestrator queues for communication"),
        ("File Based", "Shared file system for data exchange"),
        ("Database", "Shared database for data transfer"),
        ("Message Queue", "RabbitMQ/Kafka for async messaging")
    ]
    for method, description in methods:
        print(f"   {method}: {description}")
    
    # Example 2: Use cases
    print("\n2๏ธโƒฃ Common Integration Use Cases:")
    use_cases = [
        "Document classification with ML models",
        "Natural language processing for text",
        "Complex data transformation and analysis",
        "Custom API integrations",
        "Computer vision for image processing",
        "Predictive analytics and forecasting",
        "Report generation with visualization",
        "Legacy system integration"
    ]
    for use_case in use_cases:
        print(f"   โ€ข {use_case}")
    
    # Example 3: Integration patterns
    print("\n3๏ธโƒฃ Integration Patterns:")
    patterns = ["Synchronous", "Asynchronous", "Queue-based", "File-based"]
    for pattern in patterns:
        print(f"   โ€ข {pattern} Processing Pattern")
    
    # Example 4: Data exchange formats
    print("\n4๏ธโƒฃ Data Exchange Formats:")
    formats = [
        ("JSON", "Lightweight, human-readable"),
        ("XML", "Structured, schema validation"),
        ("CSV", "Tabular data, Excel compatible"),
        ("Binary", "Efficient for large data"),
        ("Pickle", "Python object serialization"),
        ("Parquet", "Columnar storage for analytics")
    ]
    for format_name, description in formats:
        print(f"   {format_name}: {description}")
    
    # Example 5: Create sample request
    print("\n5๏ธโƒฃ Sample UiPath Request:")
    sample_request = UiPathRequest(
        request_id="REQ001",
        process_name="classify_document",
        input_data={
            "text": "Invoice #12345\nDate: 2024-01-15\nAmount: $1,500.00"
        }
    )
    print(f"   Request ID: {sample_request.request_id}")
    print(f"   Process: {sample_request.process_name}")
    print(f"   Timeout: {sample_request.timeout}s")
    
    # Example 6: Process the request
    print("\n6๏ธโƒฃ Processing Request:")
    service = PythonService(config)
    doc_processor = DocumentProcessor()
    service.register_processor('classify_document', doc_processor.classify_document)
    
    response = service.process_request(sample_request)
    print(f"   Status: {response.status}")
    if response.output_data:
        print(f"   Document Type: {response.output_data.get('document_type')}")
        print(f"   Confidence: {response.output_data.get('confidence', 0):.2%}")
    
    # Example 7: Error handling
    print("\n7๏ธโƒฃ Error Handling:")
    error_scenarios = [
        "Connection timeout to UiPath",
        "Invalid data format",
        "Python processing exception",
        "Queue item not found",
        "Model prediction failure"
    ]
    for scenario in error_scenarios:
        print(f"   โ€ข {scenario}")
    
    # Example 8: Performance optimization
    print("\n8๏ธโƒฃ Performance Optimization:")
    optimizations = [
        "Use async processing for long tasks",
        "Implement caching for repeated requests",
        "Batch process multiple items",
        "Use connection pooling",
        "Compress large data transfers",
        "Implement circuit breakers",
        "Monitor and scale workers"
    ]
    for optimization in optimizations:
        print(f"   โ€ข {optimization}")
    
    # Example 9: Monitoring metrics
    print("\n9๏ธโƒฃ Monitoring Metrics:")
    metrics_list = [
        "Request count and rate",
        "Processing duration",
        "Error rate by type",
        "Queue depth",
        "Cache hit rate",
        "Model prediction accuracy",
        "Resource utilization"
    ]
    for metric in metrics_list:
        print(f"   โ€ข {metric}")
    
    # Example 10: Best practices
    print("\n๐Ÿ”Ÿ Integration Best Practices:")
    practices = [
        "๐ŸŽฏ Define clear interfaces between systems",
        "๐Ÿ“ Document data schemas thoroughly",
        "๐Ÿ”’ Implement proper authentication",
        "โšก Use async for long-running processes",
        "๐Ÿ’พ Cache frequently accessed data",
        "๐Ÿ”„ Implement retry logic with backoff",
        "๐Ÿ“Š Monitor performance metrics",
        "๐Ÿงช Test error scenarios thoroughly",
        "๐Ÿ“ฆ Version your APIs and models",
        "๐Ÿš€ Design for horizontal scaling"
    ]
    for practice in practices:
        print(f"   {practice}")
    
    print("\nโœ… UiPath/Python integration examples complete!")

Key Takeaways and Best Practices ๐ŸŽฏ

UiPath/Python Integration Best Practices ๐Ÿ“‹

Pro Tip: Think of UiPath/Python integration as building a bridge between visual workflow automation and programmatic processing power - each platform should do what it does best. Use UiPath for UI automation, workflow orchestration, and enterprise features like queues and scheduling. Use Python for complex data processing, machine learning, API integrations, and custom business logic. Choose the right integration method: Python Activity for simple scripts, REST APIs for service-oriented architecture, queues for decoupled processing, and files for large data transfers. Design clear interfaces with well-defined request/response schemas - use Pydantic or similar for validation. Implement proper error handling at integration points - errors in Python should be gracefully handled in UiPath and vice versa. Use caching strategically to avoid reprocessing - Redis is excellent for this. For long-running Python processes, use asynchronous patterns with callbacks or polling rather than blocking UiPath workflows. Monitor performance across both platforms - track request rates, processing times, and error rates. Implement circuit breakers to prevent cascading failures. Version your Python APIs and maintain backward compatibility during updates. Use containerization (Docker) for Python services to ensure consistency across environments. Most importantly: test integration scenarios thoroughly including timeout, error, and edge cases!

Mastering UiPath/Python integration enables you to build sophisticated automation solutions that leverage the best of both platforms. You can now create workflows that seamlessly combine UiPath's enterprise RPA capabilities with Python's computational power, implement robust error handling across platforms, and scale your automation initiatives effectively. Whether you're adding ML capabilities to UiPath workflows or orchestrating Python services, these integration skills are essential for enterprise automation! ๐Ÿš€