Skip to main content

๐Ÿ”— Integration Testing: Ensure Components Work Together Flawlessly

Integration testing bridges the gap between unit tests and system tests - it verifies that independently developed components work correctly when combined, catching issues that arise from component interactions, data flow, and interface mismatches. Like testing the gears of a complex machine working together, integration testing ensures your application's parts mesh seamlessly. Whether you're testing database interactions, API integrations, or microservice communications, mastering integration testing is crucial for building reliable systems. Let's explore the comprehensive world of integration test automation! ๐Ÿ—๏ธ

The Integration Testing Architecture

Think of integration testing as quality assurance for assembled systems - while unit tests verify individual parts work, integration tests ensure they work together harmoniously. Using test containers, mock servers, and orchestration tools, you can create realistic test environments that validate component interactions without the complexity of full system testing. Understanding test boundaries, data management, and environment setup is essential for effective integration testing!

graph TB A[Integration Testing] --> B[Test Strategies] A --> C[Test Environments] A --> D[Data Management] A --> E[Test Types] B --> F[Big Bang] B --> G[Top-Down] B --> H[Bottom-Up] B --> I[Sandwich] C --> J[Containers] C --> K[Mock Services] C --> L[Test Databases] C --> M[Message Queues] D --> N[Fixtures] D --> O[Migrations] D --> P[Cleanup] D --> Q[Isolation] E --> R[Database] E --> S[API] E --> T[Messaging] E --> U[Service] V[Tools] --> W[TestContainers] V --> X[Docker] V --> Y[WireMock] V --> Z[LocalStack] style A fill:#ff6b6b style B fill:#51cf66 style C fill:#339af0 style D fill:#ffd43b style E fill:#ff6b6b style V fill:#51cf66

Real-World Scenario: The Microservices Testing Platform ๐ŸŒ

You're building a comprehensive integration testing platform for a microservices architecture that tests service-to-service communication, validates database transactions across services, ensures message queue reliability, verifies API contracts between teams, tests failover and circuit breaker behaviors, validates data consistency across systems, simulates network conditions and failures, and ensures backward compatibility during deployments. Your platform must support parallel test execution, provide isolated test environments, maintain test data consistency, and deliver clear diagnostics. Let's build a professional integration testing framework!

# Comprehensive Integration Testing Framework
# pip install pytest testcontainers docker requests sqlalchemy
# pip install redis pymongo kafka-python elasticsearch aiohttp
# pip install faker factory-boy pytest-asyncio pytest-docker

import os
import json
import time
import asyncio
from typing import Dict, List, Any, Optional, Generator, Type
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
import logging
import tempfile
from contextlib import contextmanager
from unittest.mock import Mock, patch

import pytest
import docker
import requests
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
import redis
import pymongo
from kafka import KafkaProducer, KafkaConsumer
from elasticsearch import Elasticsearch

# Test containers for isolated environments
from testcontainers.postgres import PostgresContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.elasticsearch import ElasticSearchContainer
from testcontainers.compose import DockerCompose

# ==================== Test Configuration ====================

@dataclass
class IntegrationTestConfig:
    """Configuration for integration tests."""
    database_url: str = "postgresql://test:test@localhost/testdb"
    redis_url: str = "redis://localhost:6379"
    kafka_bootstrap_servers: str = "localhost:9092"
    elasticsearch_url: str = "http://localhost:9200"
    api_base_url: str = "http://localhost:8000"
    
    use_containers: bool = True
    parallel_execution: bool = False
    cleanup_after_test: bool = True
    test_data_dir: Path = Path("test_data")
    
    timeout: float = 30.0
    retry_count: int = 3
    retry_delay: float = 1.0

# ==================== Database Integration Testing ====================

class DatabaseTestBase:
    """Base class for database integration tests."""
    
    @classmethod
    def setup_class(cls):
        """Setup database for testing."""
        if cls.use_containers:
            cls.container = PostgresContainer("postgres:13")
            cls.container.start()
            cls.database_url = cls.container.get_connection_url()
        else:
            cls.database_url = "postgresql://test:test@localhost/testdb"
        
        # Create engine and session
        cls.engine = create_engine(cls.database_url)
        cls.SessionLocal = sessionmaker(bind=cls.engine)
        
        # Create tables
        cls.Base = declarative_base()
        cls.setup_models()
        cls.Base.metadata.create_all(bind=cls.engine)
        
    @classmethod
    def teardown_class(cls):
        """Cleanup database after tests."""
        cls.Base.metadata.drop_all(bind=cls.engine)
        cls.engine.dispose()
        
        if cls.use_containers:
            cls.container.stop()
    
    @classmethod
    def setup_models(cls):
        """Define test models."""
        Base = cls.Base
        
        class User(Base):
            __tablename__ = 'users'
            id = Column(Integer, primary_key=True)
            username = Column(String(50), unique=True)
            email = Column(String(100))
            created_at = Column(DateTime, default=datetime.utcnow)
        
        class Order(Base):
            __tablename__ = 'orders'
            id = Column(Integer, primary_key=True)
            user_id = Column(Integer, ForeignKey('users.id'))
            amount = Column(Decimal(10, 2))
            status = Column(String(20))
            created_at = Column(DateTime, default=datetime.utcnow)
        
        cls.User = User
        cls.Order = Order
    
    @contextmanager
    def get_session(self) -> Generator[Session, None, None]:
        """Get database session with automatic cleanup."""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()
    
    def test_database_transaction(self):
        """Test database transaction integrity."""
        with self.get_session() as session:
            # Create user
            user = self.User(username="testuser", email="test@example.com")
            session.add(user)
            session.flush()
            
            # Create order
            order = self.Order(
                user_id=user.id,
                amount=99.99,
                status="pending"
            )
            session.add(order)
            session.commit()
            
            # Verify data
            saved_user = session.query(self.User).filter_by(username="testuser").first()
            assert saved_user is not None
            assert saved_user.email == "test@example.com"
            
            saved_order = session.query(self.Order).filter_by(user_id=saved_user.id).first()
            assert saved_order is not None
            assert saved_order.amount == 99.99
    
    def test_transaction_rollback(self):
        """Test transaction rollback on error."""
        with self.get_session() as session:
            user = self.User(username="rollback_test", email="rollback@example.com")
            session.add(user)
            
            # Simulate error
            try:
                # This should fail due to missing required field
                order = self.Order(user_id=999999, amount=None, status="invalid")
                session.add(order)
                session.commit()
            except Exception:
                session.rollback()
            
            # Verify rollback
            result = session.query(self.User).filter_by(username="rollback_test").first()
            assert result is None

# ==================== API Integration Testing ====================

class APITestClient:
    """Test client for API integration tests."""
    
    def __init__(self, base_url: str, timeout: float = 10.0):
        self.base_url = base_url
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({'Content-Type': 'application/json'})
        
    def request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> requests.Response:
        """Make API request with error handling."""
        url = f"{self.base_url}{endpoint}"
        kwargs.setdefault('timeout', self.timeout)
        
        response = self.session.request(method, url, **kwargs)
        return response
    
    def get(self, endpoint: str, **kwargs) -> requests.Response:
        """GET request."""
        return self.request('GET', endpoint, **kwargs)
    
    def post(self, endpoint: str, **kwargs) -> requests.Response:
        """POST request."""
        return self.request('POST', endpoint, **kwargs)
    
    def put(self, endpoint: str, **kwargs) -> requests.Response:
        """PUT request."""
        return self.request('PUT', endpoint, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> requests.Response:
        """DELETE request."""
        return self.request('DELETE', endpoint, **kwargs)

class TestAPIIntegration:
    """Test API integrations."""
    
    @pytest.fixture(autouse=True)
    def setup(self):
        """Setup API test client."""
        self.client = APITestClient("http://localhost:8000")
        self.test_data = {}
        yield
        # Cleanup
        self.cleanup_test_data()
    
    def cleanup_test_data(self):
        """Clean up test data created during tests."""
        for resource_type, ids in self.test_data.items():
            for resource_id in ids:
                try:
                    self.client.delete(f"/{resource_type}/{resource_id}")
                except:
                    pass
    
    def test_user_crud_operations(self):
        """Test complete CRUD operations for user."""
        # Create
        user_data = {
            "username": "integration_test",
            "email": "integration@test.com",
            "password": "secure123"
        }
        
        response = self.client.post("/users", json=user_data)
        assert response.status_code == 201
        
        user = response.json()
        user_id = user['id']
        self.test_data.setdefault('users', []).append(user_id)
        
        # Read
        response = self.client.get(f"/users/{user_id}")
        assert response.status_code == 200
        assert response.json()['username'] == user_data['username']
        
        # Update
        update_data = {"email": "updated@test.com"}
        response = self.client.put(f"/users/{user_id}", json=update_data)
        assert response.status_code == 200
        
        # Verify update
        response = self.client.get(f"/users/{user_id}")
        assert response.json()['email'] == update_data['email']
        
        # Delete
        response = self.client.delete(f"/users/{user_id}")
        assert response.status_code == 204
        
        # Verify deletion
        response = self.client.get(f"/users/{user_id}")
        assert response.status_code == 404
    
    def test_api_pagination(self):
        """Test API pagination."""
        # Create multiple items
        for i in range(25):
            response = self.client.post("/items", json={"name": f"item_{i}"})
            if response.status_code == 201:
                self.test_data.setdefault('items', []).append(response.json()['id'])
        
        # Test pagination
        response = self.client.get("/items", params={"page": 1, "limit": 10})
        assert response.status_code == 200
        
        data = response.json()
        assert len(data['items']) == 10
        assert data['total'] == 25
        assert data['page'] == 1
        assert data['pages'] == 3
    
    def test_api_error_handling(self):
        """Test API error handling."""
        # Test 400 Bad Request
        response = self.client.post("/users", json={"invalid": "data"})
        assert response.status_code == 400
        assert 'error' in response.json()
        
        # Test 404 Not Found
        response = self.client.get("/users/999999")
        assert response.status_code == 404
        
        # Test 401 Unauthorized
        response = self.client.get("/admin/users")
        assert response.status_code == 401

# ==================== Message Queue Integration Testing ====================

class MessageQueueTestBase:
    """Base class for message queue integration tests."""
    
    @classmethod
    def setup_class(cls):
        """Setup message queue for testing."""
        if cls.use_containers:
            cls.kafka_container = KafkaContainer()
            cls.kafka_container.start()
            cls.bootstrap_servers = cls.kafka_container.get_bootstrap_server()
        else:
            cls.bootstrap_servers = "localhost:9092"
        
        # Setup producer and consumer
        cls.producer = KafkaProducer(
            bootstrap_servers=cls.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        cls.consumer = KafkaConsumer(
            bootstrap_servers=cls.bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            group_id='test_group'
        )
    
    @classmethod
    def teardown_class(cls):
        """Cleanup message queue after tests."""
        cls.producer.close()
        cls.consumer.close()
        
        if cls.use_containers:
            cls.kafka_container.stop()
    
    def test_message_publishing(self):
        """Test message publishing to queue."""
        topic = 'test_topic'
        message = {'id': 1, 'data': 'test message'}
        
        # Publish message
        future = self.producer.send(topic, message)
        record_metadata = future.get(timeout=10)
        
        assert record_metadata.topic == topic
        assert record_metadata.partition is not None
        assert record_metadata.offset is not None
    
    def test_message_consumption(self):
        """Test message consumption from queue."""
        topic = 'consume_test'
        messages = [
            {'id': i, 'data': f'message_{i}'}
            for i in range(5)
        ]
        
        # Publish messages
        for msg in messages:
            self.producer.send(topic, msg)
        self.producer.flush()
        
        # Subscribe and consume
        self.consumer.subscribe([topic])
        
        consumed = []
        timeout = time.time() + 10
        
        while len(consumed) < 5 and time.time() < timeout:
            msg_pack = self.consumer.poll(timeout_ms=1000)
            for tp, messages in msg_pack.items():
                for message in messages:
                    consumed.append(message.value)
        
        assert len(consumed) == 5
        for i, msg in enumerate(consumed):
            assert msg['id'] == i

# ==================== Service Integration Testing ====================

class ServiceIntegrationTest:
    """Test integration between multiple services."""
    
    @pytest.fixture(scope="class")
    def docker_compose(self):
        """Setup services using docker-compose."""
        compose = DockerCompose(
            filepath="docker-compose.test.yml",
            compose_file_name="docker-compose.yml"
        )
        compose.start()
        
        # Wait for services to be ready
        self.wait_for_services()
        
        yield compose
        
        compose.stop()
    
    def wait_for_services(self, timeout: int = 60):
        """Wait for all services to be ready."""
        services = [
            ("http://localhost:8000/health", "API"),
            ("http://localhost:5432", "Database"),
            ("http://localhost:6379", "Redis"),
            ("http://localhost:9092", "Kafka")
        ]
        
        start_time = time.time()
        
        for url, name in services:
            while time.time() - start_time < timeout:
                try:
                    if url.startswith("http"):
                        response = requests.get(url, timeout=1)
                        if response.status_code == 200:
                            break
                    else:
                        # For non-HTTP services, just try to connect
                        # Implementation depends on service type
                        break
                except:
                    time.sleep(1)
            else:
                raise TimeoutError(f"Service {name} failed to start")
    
    def test_order_processing_workflow(self, docker_compose):
        """Test complete order processing workflow across services."""
        # Create order via API
        order_data = {
            "user_id": 1,
            "items": [
                {"product_id": 1, "quantity": 2},
                {"product_id": 2, "quantity": 1}
            ],
            "payment_method": "credit_card"
        }
        
        api_client = APITestClient("http://localhost:8000")
        response = api_client.post("/orders", json=order_data)
        assert response.status_code == 201
        
        order = response.json()
        order_id = order['id']
        
        # Verify order in database
        db_engine = create_engine("postgresql://test:test@localhost/testdb")
        with db_engine.connect() as conn:
            result = conn.execute(
                text("SELECT * FROM orders WHERE id = :id"),
                {"id": order_id}
            ).fetchone()
            assert result is not None
        
        # Verify message in queue
        consumer = KafkaConsumer(
            'order_events',
            bootstrap_servers='localhost:9092',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        for message in consumer:
            if message.value.get('order_id') == order_id:
                assert message.value['event'] == 'order_created'
                break
        
        # Verify cache update
        redis_client = redis.Redis(host='localhost', port=6379)
        cached_order = redis_client.get(f"order:{order_id}")
        assert cached_order is not None

# ==================== Contract Testing ====================

class ContractTest:
    """Test API contracts between services."""
    
    def __init__(self, contract_file: str):
        """Initialize with contract definition."""
        with open(contract_file, 'r') as f:
            self.contract = json.load(f)
    
    def validate_response(self, response: Dict, contract_name: str) -> bool:
        """Validate response against contract."""
        contract = self.contract.get(contract_name)
        if not contract:
            raise ValueError(f"Contract {contract_name} not found")
        
        # Validate required fields
        for field in contract.get('required', []):
            if field not in response:
                return False
        
        # Validate field types
        for field, expected_type in contract.get('properties', {}).items():
            if field in response:
                if not self._check_type(response[field], expected_type):
                    return False
        
        return True
    
    def _check_type(self, value: Any, expected_type: str) -> bool:
        """Check if value matches expected type."""
        type_map = {
            'string': str,
            'integer': int,
            'number': (int, float),
            'boolean': bool,
            'array': list,
            'object': dict
        }
        
        expected = type_map.get(expected_type)
        if expected:
            return isinstance(value, expected)
        return False

# ==================== Test Data Management ====================

class TestDataManager:
    """Manage test data for integration tests."""
    
    def __init__(self, database_url: str):
        self.engine = create_engine(database_url)
        self.created_records = []
    
    def create_test_user(self, **kwargs) -> Dict:
        """Create test user with defaults."""
        defaults = {
            'username': f'test_user_{datetime.now().timestamp()}',
            'email': 'test@example.com',
            'password': 'test123'
        }
        defaults.update(kwargs)
        
        with self.engine.connect() as conn:
            result = conn.execute(
                text("""
                    INSERT INTO users (username, email, password)
                    VALUES (:username, :email, :password)
                    RETURNING id, username, email
                """),
                defaults
            )
            user = dict(result.fetchone())
            self.created_records.append(('users', user['id']))
            return user
    
    def cleanup(self):
        """Clean up all created test data."""
        with self.engine.connect() as conn:
            for table, record_id in reversed(self.created_records):
                conn.execute(
                    text(f"DELETE FROM {table} WHERE id = :id"),
                    {"id": record_id}
                )
            conn.commit()

# ==================== Performance Integration Testing ====================

class PerformanceIntegrationTest:
    """Test performance across integrated components."""
    
    def test_database_connection_pool(self):
        """Test database connection pool performance."""
        engine = create_engine(
            "postgresql://test:test@localhost/testdb",
            pool_size=10,
            max_overflow=20
        )
        
        start_time = time.time()
        
        # Simulate concurrent connections
        import concurrent.futures
        
        def query_database():
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
            futures = [executor.submit(query_database) for _ in range(100)]
            concurrent.futures.wait(futures)
        
        duration = time.time() - start_time
        
        # Should handle 100 queries in under 2 seconds with pooling
        assert duration < 2.0
    
    def test_api_throughput(self):
        """Test API throughput with database."""
        client = APITestClient("http://localhost:8000")
        
        start_time = time.time()
        successful_requests = 0
        
        for i in range(100):
            response = client.get(f"/users/{i % 10 + 1}")
            if response.status_code == 200:
                successful_requests += 1
        
        duration = time.time() - start_time
        throughput = successful_requests / duration
        
        # Should handle at least 50 requests per second
        assert throughput > 50

# ==================== Integration Test Fixtures ====================

@pytest.fixture(scope="session")
def database():
    """Provide database for integration tests."""
    container = PostgresContainer("postgres:13")
    container.start()
    
    engine = create_engine(container.get_connection_url())
    
    # Create schema
    Base = declarative_base()
    
    # Define models
    from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Decimal
    
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True)
        email = Column(String(100))
    
    Base.metadata.create_all(bind=engine)
    
    yield engine
    
    engine.dispose()
    container.stop()

@pytest.fixture(scope="session")
def redis_client():
    """Provide Redis client for integration tests."""
    container = RedisContainer()
    container.start()
    
    client = redis.Redis(
        host=container.get_container_host_ip(),
        port=container.get_exposed_port(6379)
    )
    
    yield client
    
    client.close()
    container.stop()

@pytest.fixture
def api_client(api_server):
    """Provide API client for tests."""
    return APITestClient(api_server)

@pytest.fixture(scope="session")
def api_server():
    """Start API server for testing."""
    # Start your API server here
    # This could be a test server or a containerized version
    return "http://localhost:8000"

# ==================== Integration Test Orchestration ====================

class IntegrationTestRunner:
    """Orchestrate integration test execution."""
    
    def __init__(self, config: IntegrationTestConfig):
        self.config = config
        self.services = {}
        
    def setup_environment(self):
        """Setup test environment with all services."""
        if self.config.use_containers:
            self._start_containers()
        else:
            self._verify_local_services()
    
    def _start_containers(self):
        """Start all required containers."""
        # Start database
        self.services['postgres'] = PostgresContainer("postgres:13")
        self.services['postgres'].start()
        
        # Start Redis
        self.services['redis'] = RedisContainer()
        self.services['redis'].start()
        
        # Start Kafka
        self.services['kafka'] = KafkaContainer()
        self.services['kafka'].start()
        
        # Wait for services to be ready
        self._wait_for_services()
    
    def _wait_for_services(self):
        """Wait for all services to be ready."""
        import time
        time.sleep(5)  # Simple wait, could be more sophisticated
    
    def teardown_environment(self):
        """Teardown test environment."""
        for service in self.services.values():
            service.stop()
    
    def run_tests(self, test_suite: str = "integration"):
        """Run integration tests."""
        import subprocess
        
        cmd = [
            "pytest",
            f"tests/{test_suite}",
            "-v",
            "--tb=short"
        ]
        
        if self.config.parallel_execution:
            cmd.extend(["-n", "auto"])
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        return {
            'success': result.returncode == 0,
            'stdout': result.stdout,
            'stderr': result.stderr
        }

# Fix missing import
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Decimal

# Example usage
if __name__ == "__main__":
    print("๐Ÿ”— Integration Testing Examples\n")
    
    # Example 1: Integration test types
    print("1๏ธโƒฃ Types of Integration Tests:")
    test_types = [
        "Database Integration - Test ORM and SQL",
        "API Integration - Test HTTP endpoints",
        "Message Queue - Test pub/sub systems",
        "Service-to-Service - Test microservice communication",
        "Third-party APIs - Test external services",
        "Cache Integration - Test caching layers"
    ]
    for test_type in test_types:
        print(f"   โ€ข {test_type}")
    
    # Example 2: Test isolation strategies
    print("\n2๏ธโƒฃ Test Isolation Strategies:")
    strategies = [
        "Test Containers - Isolated Docker containers",
        "Transactions - Rollback after each test",
        "Test Databases - Separate database per test",
        "Mock Services - Replace external dependencies",
        "Fixture Data - Controlled test data"
    ]
    for strategy in strategies:
        print(f"   โ€ข {strategy}")
    
    # Example 3: Testing patterns
    print("\n3๏ธโƒฃ Integration Testing Patterns:")
    patterns = [
        ("Big Bang", "Test all components together"),
        ("Bottom-Up", "Start with data layer, move up"),
        ("Top-Down", "Start with UI/API, stub lower layers"),
        ("Sandwich", "Combine top-down and bottom-up"),
        ("Risk-Based", "Test high-risk integrations first")
    ]
    for pattern, description in patterns:
        print(f"   {pattern}: {description}")
    
    # Example 4: Common challenges
    print("\n4๏ธโƒฃ Common Integration Testing Challenges:")
    challenges = [
        "Test data management",
        "Environment setup complexity",
        "Test execution time",
        "Flaky tests due to timing",
        "External service dependencies",
        "Database state management"
    ]
    for challenge in challenges:
        print(f"   โ€ข {challenge}")
    
    # Example 5: Best practices
    print("\n5๏ธโƒฃ Integration Testing Best Practices:")
    practices = [
        "๐ŸŽฏ Use test containers for isolation",
        "๐Ÿ”„ Reset state between tests",
        "๐Ÿ“Š Test both happy and error paths",
        "โฑ๏ธ Set appropriate timeouts",
        "๐Ÿ” Verify data at multiple layers",
        "๐Ÿ“ Use clear test data",
        "๐Ÿš€ Run in parallel when possible",
        "๐Ÿงน Always clean up resources",
        "๐Ÿ“ˆ Monitor test performance",
        "๐Ÿ” Use test-specific credentials"
    ]
    for practice in practices:
        print(f"   {practice}")
    
    # Example 6: Test data setup
    print("\n6๏ธโƒฃ Test Data Management:")
    print("   # Using fixtures")
    print("   @pytest.fixture")
    print("   def test_user(database):")
    print("       user = create_user()")
    print("       yield user")
    print("       delete_user(user)")
    
    # Example 7: Container usage
    print("\n7๏ธโƒฃ Using Test Containers:")
    print("   from testcontainers.postgres import PostgresContainer")
    print("   ")
    print("   with PostgresContainer('postgres:13') as postgres:")
    print("       url = postgres.get_connection_url()")
    print("       # Run tests with isolated database")
    
    # Example 8: API testing
    print("\n8๏ธโƒฃ API Integration Testing:")
    print("   client = APITestClient('http://api.example.com')")
    print("   response = client.post('/users', json=data)")
    print("   assert response.status_code == 201")
    print("   assert response.json()['id'] is not None")
    
    # Example 9: Test organization
    print("\n9๏ธโƒฃ Integration Test Organization:")
    structure = """
    tests/
    โ”œโ”€โ”€ integration/
    โ”‚   โ”œโ”€โ”€ conftest.py       # Shared fixtures
    โ”‚   โ”œโ”€โ”€ test_database.py  # Database tests
    โ”‚   โ”œโ”€โ”€ test_api.py       # API tests
    โ”‚   โ”œโ”€โ”€ test_services.py  # Service tests
    โ”‚   โ””โ”€โ”€ test_messaging.py # Message queue tests
    โ”œโ”€โ”€ fixtures/
    โ”‚   โ””โ”€โ”€ test_data.json
    โ””โ”€โ”€ docker-compose.test.yml
    """
    print(structure)
    
    # Example 10: Running integration tests
    print("\n๐Ÿ”Ÿ Running Integration Tests:")
    
    config = IntegrationTestConfig(
        use_containers=True,
        parallel_execution=True
    )
    
    runner = IntegrationTestRunner(config)
    print(f"   Config: use_containers={config.use_containers}")
    print(f"   Config: parallel={config.parallel_execution}")
    
    print("\nโœ… Integration testing examples complete!")

Key Takeaways and Best Practices ๐ŸŽฏ

Integration Testing Best Practices ๐Ÿ“‹

Pro Tip: Think of integration testing as verifying the plumbing of your application - while unit tests check individual components work, integration tests ensure they work together. Use test containers (Docker) to create isolated, reproducible test environments - this eliminates "works on my machine" issues. Always test the boundaries where components meet: database queries, API calls, message passing, and cache interactions. Design tests to be independent - each test should set up its own data and clean up afterward. Use transactions for database tests when possible - roll back after each test for speed and isolation. Test both success and failure scenarios - what happens when the database is down, the API returns 500, or the message queue is full? Implement retry logic for flaky tests but investigate the root cause. Use factories or builders for test data creation - this makes tests more maintainable. Test at different levels: single integration (one database call), workflow (multiple components), and end-to-end (complete user journey). Monitor test execution time - integration tests are slower than unit tests but shouldn't take minutes. Use parallel execution when possible but be careful of shared resources. Mock external services that you don't control but test against real versions of services you do control. Most importantly: integration tests find the bugs that unit tests miss - invest in them!

Mastering integration testing ensures your components work together seamlessly in production. You can now test database interactions reliably, validate API integrations thoroughly, ensure message queue reliability, verify service-to-service communication, and maintain data consistency across systems. Whether you're building microservices, monoliths, or hybrid architectures, these integration testing skills are essential for delivering reliable software! ๐Ÿš€