Skip to main content

šŸ“Š CSV Processing: Master Tabular Data with Python

CSV files are the unsung heroes of data exchange. They're simple, universal, and everywhere - from Excel exports to database dumps, API responses to IoT sensor logs. But beneath their simple comma-separated surface lies a world of complexity. Let's turn Python into your CSV command center! šŸŽÆ

The Hidden Complexity of CSV Files

CSV files seem simple - just commas and data, right? Wrong! They can have different delimiters, encodings, quote characters, escape sequences, and more. It's like each CSV file speaks a slightly different dialect of the same language. Python helps you become a polyglot!

graph TB A[CSV Data Sources] --> B[Input Processing] B --> C[Encoding Detection] B --> D[Delimiter Detection] B --> E[Header Analysis] C --> F[Data Parsing] D --> F E --> F F --> G[Validation] F --> H[Transformation] F --> I[Analysis] G --> J[Clean Data] H --> J I --> J J --> K[Output Formats] K --> L[CSV] K --> M[JSON] K --> N[Database] K --> O[Excel] style A fill:#ff6b6b style J fill:#51cf66 style K fill:#339af0

Real-World Scenario: The Data Pipeline Orchestrator šŸ­

You're the data engineer for a retail company. Every day, you receive CSV files from 50 stores (sales data), 20 suppliers (inventory), 5 marketing platforms (campaigns), and 3 payment processors (transactions). Each has different formats, encodings, and quirks. Let's build a system that handles them all!

import csv
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional, Any, Tuple, Union
import json
import chardet
from datetime import datetime, timedelta
import re
import logging
from io import StringIO, BytesIO
import sqlite3
from collections import defaultdict, Counter
import asyncio
import aiofiles

class CSVMasterProcessor:
    """
    Comprehensive CSV processing system with advanced features
    for handling any CSV file thrown at it.
    """
    
    def __init__(self, working_dir: str = None):
        self.working_dir = Path(working_dir) if working_dir else Path.cwd()
        self.working_dir.mkdir(parents=True, exist_ok=True)
        
        # Setup logging
        self.setup_logging()
        
        # Common CSV dialects
        self.dialects = {
            'excel': csv.excel,
            'excel_tab': csv.excel_tab,
            'unix': csv.unix_dialect
        }
        
        # Data type inference patterns
        self.type_patterns = {
            'integer': re.compile(r'^-?\d+$'),
            'float': re.compile(r'^-?\d*\.\d+$'),
            'date': re.compile(r'^\d{4}-\d{2}-\d{2}$'),
            'datetime': re.compile(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}$'),
            'boolean': re.compile(r'^(true|false|yes|no|1|0)$', re.IGNORECASE),
            'email': re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'),
            'phone': re.compile(r'^[\d\s\-\(\)\+]+$'),
            'url': re.compile(r'^https?://'),
            'json': re.compile(r'^[\{\[].*[\}\]]$')
        }
    
    def setup_logging(self):
        """Setup comprehensive logging."""
        log_file = self.working_dir / 'csv_processing.log'
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def detect_encoding(self, file_path: str, sample_size: int = 10000) -> str:
        """
        Detect file encoding using chardet.
        """
        try:
            with open(file_path, 'rb') as file:
                sample = file.read(sample_size)
                result = chardet.detect(sample)
                encoding = result['encoding']
                confidence = result['confidence']
                
                self.logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})")
                
                # Fallback for low confidence
                if confidence < 0.7:
                    # Try common encodings
                    for enc in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
                        try:
                            sample.decode(enc)
                            return enc
                        except:
                            continue
                
                return encoding or 'utf-8'
                
        except Exception as e:
            self.logger.warning(f"Encoding detection failed: {e}, using utf-8")
            return 'utf-8'
    
    def detect_delimiter(self, file_path: str, encoding: str = 'utf-8') -> str:
        """
        Intelligently detect CSV delimiter.
        """
        try:
            with open(file_path, 'r', encoding=encoding) as file:
                # Read sample lines
                sample = []
                for _ in range(10):
                    line = file.readline()
                    if line:
                        sample.append(line)
                
                # Use csv.Sniffer
                sniffer = csv.Sniffer()
                dialect = sniffer.sniff(''.join(sample))
                delimiter = dialect.delimiter
                
                self.logger.info(f"Detected delimiter: {repr(delimiter)}")
                return delimiter
                
        except:
            # Fallback: count common delimiters
            delimiters = [',', '\t', ';', '|', ':']
            delimiter_counts = defaultdict(int)
            
            with open(file_path, 'r', encoding=encoding) as file:
                for _ in range(10):
                    line = file.readline()
                    if line:
                        for delim in delimiters:
                            delimiter_counts[delim] += line.count(delim)
            
            if delimiter_counts:
                delimiter = max(delimiter_counts, key=delimiter_counts.get)
                self.logger.info(f"Detected delimiter (fallback): {repr(delimiter)}")
                return delimiter
            
            return ','
    
    def analyze_csv_structure(self, file_path: str) -> Dict:
        """
        Comprehensive CSV structure analysis.
        """
        encoding = self.detect_encoding(file_path)
        delimiter = self.detect_delimiter(file_path, encoding)
        
        structure = {
            'encoding': encoding,
            'delimiter': delimiter,
            'has_header': False,
            'column_count': 0,
            'row_count': 0,
            'columns': [],
            'data_types': {},
            'null_counts': {},
            'unique_counts': {},
            'sample_data': []
        }
        
        try:
            # Read CSV with detected parameters
            df = pd.read_csv(
                file_path,
                encoding=encoding,
                delimiter=delimiter,
                nrows=1000,  # Sample for analysis
                low_memory=False
            )
            
            # Detect if first row is header
            first_row = df.iloc[0] if len(df) > 0 else []
            numeric_count = sum(1 for val in first_row if self._is_numeric(str(val)))
            structure['has_header'] = numeric_count < len(first_row) / 2
            
            if not structure['has_header']:
                # Re-read without header
                df = pd.read_csv(
                    file_path,
                    encoding=encoding,
                    delimiter=delimiter,
                    header=None,
                    nrows=1000
                )
                df.columns = [f'Column_{i}' for i in range(len(df.columns))]
            
            # Analyze structure
            structure['column_count'] = len(df.columns)
            structure['columns'] = df.columns.tolist()
            
            # Count total rows
            with open(file_path, 'r', encoding=encoding) as f:
                structure['row_count'] = sum(1 for _ in f) - (1 if structure['has_header'] else 0)
            
            # Analyze each column
            for col in df.columns:
                # Data type inference
                structure['data_types'][col] = self._infer_data_type(df[col])
                
                # Null counts
                structure['null_counts'][col] = df[col].isnull().sum()
                
                # Unique counts
                structure['unique_counts'][col] = df[col].nunique()
            
            # Sample data
            structure['sample_data'] = df.head(5).to_dict('records')
            
            self.logger.info(f"CSV structure analysis complete: {Path(file_path).name}")
            
        except Exception as e:
            self.logger.error(f"Structure analysis failed: {e}")
        
        return structure
    
    def _is_numeric(self, value: str) -> bool:
        """Check if a string value is numeric."""
        try:
            float(value)
            return True
        except:
            return False
    
    def _infer_data_type(self, series: pd.Series) -> str:
        """Infer the data type of a pandas series."""
        # Remove nulls for analysis
        non_null = series.dropna()
        
        if len(non_null) == 0:
            return 'empty'
        
        # Check patterns
        sample = non_null.astype(str).head(100)
        
        type_counts = defaultdict(int)
        for value in sample:
            for dtype, pattern in self.type_patterns.items():
                if pattern.match(value):
                    type_counts[dtype] += 1
                    break
            else:
                type_counts['string'] += 1
        
        # Return most common type
        if type_counts:
            return max(type_counts, key=type_counts.get)
        return 'string'
    
    def clean_csv(self, input_file: str, output_file: str, 
                  cleaning_options: Dict = None) -> Dict:
        """
        Clean and standardize CSV file.
        """
        options = cleaning_options or {
            'remove_duplicates': True,
            'handle_missing': 'drop',  # 'drop', 'fill', 'interpolate'
            'standardize_dates': True,
            'trim_whitespace': True,
            'standardize_case': None,  # 'upper', 'lower', 'title'
            'remove_special_chars': False,
            'validate_emails': False,
            'validate_phones': False
        }
        
        stats = {
            'original_rows': 0,
            'cleaned_rows': 0,
            'duplicates_removed': 0,
            'missing_handled': 0,
            'modifications': []
        }
        
        try:
            # Detect encoding and delimiter
            encoding = self.detect_encoding(input_file)
            delimiter = self.detect_delimiter(input_file, encoding)
            
            # Read CSV
            df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
            stats['original_rows'] = len(df)
            
            # Remove duplicates
            if options['remove_duplicates']:
                before = len(df)
                df = df.drop_duplicates()
                stats['duplicates_removed'] = before - len(df)
                if stats['duplicates_removed'] > 0:
                    stats['modifications'].append(f"Removed {stats['duplicates_removed']} duplicates")
            
            # Handle missing values
            if options['handle_missing'] == 'drop':
                before = len(df)
                df = df.dropna()
                stats['missing_handled'] = before - len(df)
                if stats['missing_handled'] > 0:
                    stats['modifications'].append(f"Dropped {stats['missing_handled']} rows with missing values")
            
            elif options['handle_missing'] == 'fill':
                # Fill with appropriate values based on data type
                for col in df.columns:
                    if df[col].dtype == 'object':
                        df[col] = df[col].fillna('')
                    else:
                        df[col] = df[col].fillna(df[col].median())
                stats['modifications'].append("Filled missing values")
            
            elif options['handle_missing'] == 'interpolate':
                df = df.interpolate()
                stats['modifications'].append("Interpolated missing values")
            
            # Trim whitespace
            if options['trim_whitespace']:
                string_cols = df.select_dtypes(include=['object']).columns
                for col in string_cols:
                    df[col] = df[col].str.strip()
                stats['modifications'].append("Trimmed whitespace")
            
            # Standardize case
            if options['standardize_case']:
                string_cols = df.select_dtypes(include=['object']).columns
                for col in string_cols:
                    if options['standardize_case'] == 'upper':
                        df[col] = df[col].str.upper()
                    elif options['standardize_case'] == 'lower':
                        df[col] = df[col].str.lower()
                    elif options['standardize_case'] == 'title':
                        df[col] = df[col].str.title()
                stats['modifications'].append(f"Standardized case to {options['standardize_case']}")
            
            # Standardize dates
            if options['standardize_dates']:
                for col in df.columns:
                    if 'date' in col.lower():
                        try:
                            df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')
                            stats['modifications'].append(f"Standardized dates in {col}")
                        except:
                            pass
            
            # Validate emails
            if options['validate_emails']:
                email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
                for col in df.columns:
                    if 'email' in col.lower():
                        df[col] = df[col].apply(
                            lambda x: x if pd.isna(x) or email_pattern.match(str(x)) else np.nan
                        )
                        stats['modifications'].append(f"Validated emails in {col}")
            
            # Remove special characters
            if options['remove_special_chars']:
                string_cols = df.select_dtypes(include=['object']).columns
                for col in string_cols:
                    df[col] = df[col].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
                stats['modifications'].append("Removed special characters")
            
            stats['cleaned_rows'] = len(df)
            
            # Save cleaned CSV
            df.to_csv(output_file, index=False, encoding='utf-8')
            
            self.logger.info(f"CSV cleaned: {output_file}")
            self.logger.info(f"Cleaning stats: {stats}")
            
        except Exception as e:
            self.logger.error(f"CSV cleaning failed: {e}")
            stats['error'] = str(e)
        
        return stats
    
    def merge_csv_files(self, files: List[str], output_file: str,
                       merge_type: str = 'concat', **kwargs) -> bool:
        """
        Merge multiple CSV files using various strategies.
        
        Types:
        - 'concat': Simple concatenation
        - 'join': Database-style join
        - 'merge': Pandas merge with deduplication
        """
        try:
            dataframes = []
            
            # Read all CSV files
            for file in files:
                encoding = self.detect_encoding(file)
                delimiter = self.detect_delimiter(file, encoding)
                df = pd.read_csv(file, encoding=encoding, delimiter=delimiter)
                dataframes.append(df)
                self.logger.info(f"Loaded {Path(file).name}: {len(df)} rows")
            
            if merge_type == 'concat':
                # Simple concatenation
                result = pd.concat(dataframes, ignore_index=True)
                
                # Remove duplicates if requested
                if kwargs.get('remove_duplicates', True):
                    before = len(result)
                    result = result.drop_duplicates()
                    self.logger.info(f"Removed {before - len(result)} duplicate rows")
            
            elif merge_type == 'join':
                # Database-style join
                join_on = kwargs.get('on', None)
                how = kwargs.get('how', 'inner')
                
                result = dataframes[0]
                for df in dataframes[1:]:
                    result = result.merge(df, on=join_on, how=how, suffixes=('', '_dup'))
            
            elif merge_type == 'merge':
                # Smart merge with conflict resolution
                result = dataframes[0]
                
                for df in dataframes[1:]:
                    # Find common columns
                    common_cols = list(set(result.columns) & set(df.columns))
                    
                    if common_cols:
                        # Merge on common columns
                        result = pd.concat([result, df]).drop_duplicates(subset=common_cols, keep='last')
                    else:
                        # No common columns, just concatenate
                        result = pd.concat([result, df], axis=1)
            
            # Save merged result
            result.to_csv(output_file, index=False, encoding='utf-8')
            
            self.logger.info(f"Merged {len(files)} files into {output_file}: {len(result)} total rows")
            return True
            
        except Exception as e:
            self.logger.error(f"Merge failed: {e}")
            return False
    
    def split_csv(self, input_file: str, output_dir: str,
                 split_by: str = 'rows', **kwargs) -> List[str]:
        """
        Split CSV file into multiple files.
        
        Split strategies:
        - 'rows': Split by number of rows
        - 'column_value': Split by unique values in a column
        - 'size': Split by file size
        - 'random': Random sampling
        """
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        output_files = []
        
        try:
            # Detect encoding and delimiter
            encoding = self.detect_encoding(input_file)
            delimiter = self.detect_delimiter(input_file, encoding)
            
            if split_by == 'rows':
                # Split by row count
                chunk_size = kwargs.get('chunk_size', 1000)
                
                for chunk_num, chunk in enumerate(pd.read_csv(
                    input_file, 
                    encoding=encoding, 
                    delimiter=delimiter,
                    chunksize=chunk_size
                )):
                    output_file = output_path / f"chunk_{chunk_num:04d}.csv"
                    chunk.to_csv(output_file, index=False)
                    output_files.append(str(output_file))
                
                self.logger.info(f"Split into {len(output_files)} chunks of {chunk_size} rows")
            
            elif split_by == 'column_value':
                # Split by unique column values
                column = kwargs.get('column')
                df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
                
                for value in df[column].unique():
                    subset = df[df[column] == value]
                    safe_value = re.sub(r'[^\w\s-]', '', str(value))[:50]
                    output_file = output_path / f"{column}_{safe_value}.csv"
                    subset.to_csv(output_file, index=False)
                    output_files.append(str(output_file))
                
                self.logger.info(f"Split into {len(output_files)} files by {column} values")
            
            elif split_by == 'size':
                # Split by file size
                max_size_mb = kwargs.get('max_size_mb', 10)
                max_size_bytes = max_size_mb * 1024 * 1024
                
                df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
                
                current_chunk = []
                current_size = 0
                chunk_num = 0
                
                for _, row in df.iterrows():
                    row_size = len(row.to_csv())
                    
                    if current_size + row_size > max_size_bytes and current_chunk:
                        # Save current chunk
                        output_file = output_path / f"chunk_{chunk_num:04d}.csv"
                        pd.DataFrame(current_chunk).to_csv(output_file, index=False)
                        output_files.append(str(output_file))
                        
                        # Reset
                        current_chunk = []
                        current_size = 0
                        chunk_num += 1
                    
                    current_chunk.append(row)
                    current_size += row_size
                
                # Save last chunk
                if current_chunk:
                    output_file = output_path / f"chunk_{chunk_num:04d}.csv"
                    pd.DataFrame(current_chunk).to_csv(output_file, index=False)
                    output_files.append(str(output_file))
                
                self.logger.info(f"Split into {len(output_files)} files by size ({max_size_mb}MB max)")
            
            elif split_by == 'random':
                # Random sampling
                sample_size = kwargs.get('sample_size', 1000)
                num_samples = kwargs.get('num_samples', 5)
                
                df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
                
                for i in range(num_samples):
                    sample = df.sample(n=min(sample_size, len(df)), replace=False)
                    output_file = output_path / f"sample_{i:03d}.csv"
                    sample.to_csv(output_file, index=False)
                    output_files.append(str(output_file))
                
                self.logger.info(f"Created {num_samples} random samples of {sample_size} rows")
            
            return output_files
            
        except Exception as e:
            self.logger.error(f"Split failed: {e}")
            return []
    
    def transform_csv(self, input_file: str, output_file: str,
                     transformations: List[Dict]) -> bool:
        """
        Apply transformations to CSV data.
        
        transformations = [
            {'type': 'rename', 'columns': {'old_name': 'new_name'}},
            {'type': 'drop', 'columns': ['col1', 'col2']},
            {'type': 'add', 'column': 'new_col', 'formula': 'col1 + col2'},
            {'type': 'filter', 'condition': 'col1 > 100'},
            {'type': 'sort', 'by': ['col1'], 'ascending': True}
        ]
        """
        try:
            # Read CSV
            encoding = self.detect_encoding(input_file)
            delimiter = self.detect_delimiter(input_file, encoding)
            df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
            
            self.logger.info(f"Loaded {input_file}: {len(df)} rows, {len(df.columns)} columns")
            
            # Apply transformations
            for transform in transformations:
                t_type = transform['type']
                
                if t_type == 'rename':
                    # Rename columns
                    df = df.rename(columns=transform['columns'])
                    self.logger.info(f"Renamed columns: {transform['columns']}")
                
                elif t_type == 'drop':
                    # Drop columns
                    df = df.drop(columns=transform['columns'], errors='ignore')
                    self.logger.info(f"Dropped columns: {transform['columns']}")
                
                elif t_type == 'add':
                    # Add calculated column
                    column_name = transform['column']
                    formula = transform['formula']
                    
                    # Safe evaluation of formula
                    try:
                        df[column_name] = eval(formula, {"__builtins__": {}}, df.to_dict('series'))
                        self.logger.info(f"Added column '{column_name}' with formula: {formula}")
                    except Exception as e:
                        self.logger.error(f"Formula evaluation failed: {e}")
                
                elif t_type == 'filter':
                    # Filter rows
                    condition = transform['condition']
                    before = len(df)
                    df = df.query(condition)
                    self.logger.info(f"Filtered {before - len(df)} rows with condition: {condition}")
                
                elif t_type == 'sort':
                    # Sort data
                    df = df.sort_values(
                        by=transform['by'],
                        ascending=transform.get('ascending', True)
                    )
                    self.logger.info(f"Sorted by: {transform['by']}")
                
                elif t_type == 'pivot':
                    # Create pivot table
                    df = df.pivot_table(
                        index=transform.get('index'),
                        columns=transform.get('columns'),
                        values=transform.get('values'),
                        aggfunc=transform.get('aggfunc', 'sum')
                    )
                    self.logger.info("Created pivot table")
                
                elif t_type == 'groupby':
                    # Group and aggregate
                    df = df.groupby(transform['by']).agg(transform.get('agg', 'sum'))
                    self.logger.info(f"Grouped by: {transform['by']}")
            
            # Save transformed data
            df.to_csv(output_file, index=False, encoding='utf-8')
            
            self.logger.info(f"Saved transformed data: {output_file} ({len(df)} rows, {len(df.columns)} columns)")
            return True
            
        except Exception as e:
            self.logger.error(f"Transformation failed: {e}")
            return False
    
    def validate_csv(self, file_path: str, schema: Dict) -> Dict:
        """
        Validate CSV against a schema.
        
        schema = {
            'columns': ['col1', 'col2', 'col3'],
            'required_columns': ['col1', 'col2'],
            'data_types': {'col1': 'int', 'col2': 'string', 'col3': 'float'},
            'constraints': {
                'col1': {'min': 0, 'max': 100},
                'col2': {'pattern': r'^[A-Z]+$'},
                'col3': {'not_null': True}
            }
        }
        """
        validation_results = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'statistics': {}
        }
        
        try:
            # Read CSV
            encoding = self.detect_encoding(file_path)
            delimiter = self.detect_delimiter(file_path, encoding)
            df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter)
            
            # Validate columns
            if 'columns' in schema:
                expected = set(schema['columns'])
                actual = set(df.columns)
                
                missing = expected - actual
                extra = actual - expected
                
                if missing:
                    validation_results['errors'].append(f"Missing columns: {missing}")
                    validation_results['valid'] = False
                
                if extra:
                    validation_results['warnings'].append(f"Extra columns: {extra}")
            
            # Validate required columns
            if 'required_columns' in schema:
                for col in schema['required_columns']:
                    if col not in df.columns:
                        validation_results['errors'].append(f"Required column missing: {col}")
                        validation_results['valid'] = False
            
            # Validate data types
            if 'data_types' in schema:
                for col, expected_type in schema['data_types'].items():
                    if col in df.columns:
                        actual_type = self._infer_data_type(df[col])
                        
                        if not self._types_compatible(actual_type, expected_type):
                            validation_results['errors'].append(
                                f"Column '{col}' type mismatch: expected {expected_type}, got {actual_type}"
                            )
                            validation_results['valid'] = False
            
            # Validate constraints
            if 'constraints' in schema:
                for col, constraints in schema['constraints'].items():
                    if col not in df.columns:
                        continue
                    
                    # Not null constraint
                    if constraints.get('not_null'):
                        null_count = df[col].isnull().sum()
                        if null_count > 0:
                            validation_results['errors'].append(
                                f"Column '{col}' has {null_count} null values"
                            )
                            validation_results['valid'] = False
                    
                    # Min/max constraints
                    if 'min' in constraints:
                        min_val = df[col].min()
                        if min_val < constraints['min']:
                            validation_results['errors'].append(
                                f"Column '{col}' has values below minimum {constraints['min']}"
                            )
                            validation_results['valid'] = False
                    
                    if 'max' in constraints:
                        max_val = df[col].max()
                        if max_val > constraints['max']:
                            validation_results['errors'].append(
                                f"Column '{col}' has values above maximum {constraints['max']}"
                            )
                            validation_results['valid'] = False
                    
                    # Pattern constraint
                    if 'pattern' in constraints:
                        pattern = re.compile(constraints['pattern'])
                        non_matching = df[~df[col].astype(str).str.match(pattern)]
                        
                        if len(non_matching) > 0:
                            validation_results['errors'].append(
                                f"Column '{col}' has {len(non_matching)} values not matching pattern"
                            )
                            validation_results['valid'] = False
                    
                    # Unique constraint
                    if constraints.get('unique'):
                        duplicates = df[col].duplicated().sum()
                        if duplicates > 0:
                            validation_results['errors'].append(
                                f"Column '{col}' has {duplicates} duplicate values"
                            )
                            validation_results['valid'] = False
            
            # Collect statistics
            validation_results['statistics'] = {
                'row_count': len(df),
                'column_count': len(df.columns),
                'memory_usage': df.memory_usage(deep=True).sum() / 1024 / 1024,  # MB
                'null_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
            }
            
        except Exception as e:
            validation_results['errors'].append(f"Validation error: {e}")
            validation_results['valid'] = False
        
        return validation_results
    
    def _types_compatible(self, actual: str, expected: str) -> bool:
        """Check if actual type is compatible with expected type."""
        compatibility = {
            'integer': ['integer', 'float'],
            'float': ['integer', 'float'],
            'string': ['string', 'integer', 'float', 'date', 'datetime'],
            'date': ['date', 'datetime'],
            'datetime': ['datetime']
        }
        
        return actual in compatibility.get(expected, [expected])
    
    def csv_to_database(self, csv_file: str, db_path: str, 
                       table_name: str, if_exists: str = 'replace') -> bool:
        """
        Import CSV to SQLite database.
        """
        try:
            # Read CSV
            encoding = self.detect_encoding(csv_file)
            delimiter = self.detect_delimiter(csv_file, encoding)
            df = pd.read_csv(csv_file, encoding=encoding, delimiter=delimiter)
            
            # Connect to database
            conn = sqlite3.connect(db_path)
            
            # Import to database
            df.to_sql(table_name, conn, if_exists=if_exists, index=False)
            
            # Create indexes on likely key columns
            cursor = conn.cursor()
            for col in df.columns:
                if 'id' in col.lower() or 'key' in col.lower():
                    try:
                        cursor.execute(f"CREATE INDEX idx_{table_name}_{col} ON {table_name}({col})")
                    except:
                        pass
            
            conn.commit()
            conn.close()
            
            self.logger.info(f"Imported {len(df)} rows to database table '{table_name}'")
            return True
            
        except Exception as e:
            self.logger.error(f"Database import failed: {e}")
            return False

class CSVStreamProcessor:
    """
    Process large CSV files using streaming and chunking.
    """
    
    def __init__(self, chunk_size: int = 10000):
        self.chunk_size = chunk_size
        self.processed_rows = 0
    
    def process_large_csv(self, file_path: str, process_func, *args, **kwargs):
        """
        Process large CSV file in chunks.
        """
        for chunk in pd.read_csv(file_path, chunksize=self.chunk_size):
            result = process_func(chunk, *args, **kwargs)
            self.processed_rows += len(chunk)
            
            if self.processed_rows % 100000 == 0:
                print(f"  Processed {self.processed_rows:,} rows...")
            
            yield result
    
    async def async_process_csv(self, file_path: str, process_func):
        """
        Asynchronously process CSV file.
        """
        async with aiofiles.open(file_path, mode='r') as file:
            reader = csv.DictReader(await file.read().split('\n'))
            
            tasks = []
            for row in reader:
                task = asyncio.create_task(process_func(row))
                tasks.append(task)
            
            results = await asyncio.gather(*tasks)
            return results

class CSVAnalyzer:
    """
    Advanced CSV analysis and profiling.
    """
    
    def profile_csv(self, file_path: str) -> Dict:
        """
        Create comprehensive profile of CSV data.
        """
        profile = {
            'file_info': {},
            'structure': {},
            'quality': {},
            'statistics': {},
            'recommendations': []
        }
        
        # File information
        file_stats = os.stat(file_path)
        profile['file_info'] = {
            'path': file_path,
            'size_mb': file_stats.st_size / 1024 / 1024,
            'modified': datetime.fromtimestamp(file_stats.st_mtime),
            'created': datetime.fromtimestamp(file_stats.st_ctime)
        }
        
        # Read sample for analysis
        df = pd.read_csv(file_path, nrows=10000)
        
        # Structure analysis
        profile['structure'] = {
            'rows': len(df),
            'columns': len(df.columns),
            'column_names': df.columns.tolist(),
            'dtypes': df.dtypes.astype(str).to_dict()
        }
        
        # Quality metrics
        profile['quality'] = {
            'completeness': (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
            'duplicate_rows': df.duplicated().sum(),
            'duplicate_ratio': (df.duplicated().sum() / len(df)) * 100
        }
        
        # Statistical analysis
        profile['statistics'] = {
            'numeric_columns': {},
            'categorical_columns': {}
        }
        
        for col in df.select_dtypes(include=[np.number]).columns:
            profile['statistics']['numeric_columns'][col] = {
                'mean': df[col].mean(),
                'median': df[col].median(),
                'std': df[col].std(),
                'min': df[col].min(),
                'max': df[col].max(),
                'nulls': df[col].isnull().sum()
            }
        
        for col in df.select_dtypes(include=['object']).columns:
            profile['statistics']['categorical_columns'][col] = {
                'unique': df[col].nunique(),
                'most_common': df[col].value_counts().head(5).to_dict(),
                'nulls': df[col].isnull().sum()
            }
        
        # Recommendations
        if profile['quality']['completeness'] < 90:
            profile['recommendations'].append("High number of missing values detected")
        
        if profile['quality']['duplicate_ratio'] > 5:
            profile['recommendations'].append("Significant number of duplicate rows found")
        
        for col, stats in profile['statistics']['numeric_columns'].items():
            if stats['std'] > stats['mean'] * 2:
                profile['recommendations'].append(f"High variance in column '{col}'")
        
        return profile

# Example usage
if __name__ == "__main__":
    # Initialize processor
    csv_processor = CSVMasterProcessor(working_dir="./csv_workspace")
    
    # Analyze CSV structure
    structure = csv_processor.analyze_csv_structure("sales_data.csv")
    print(json.dumps(structure, indent=2, default=str))
    
    # Clean CSV
    csv_processor.clean_csv(
        input_file="dirty_data.csv",
        output_file="clean_data.csv",
        cleaning_options={
            'remove_duplicates': True,
            'handle_missing': 'fill',
            'standardize_dates': True,
            'trim_whitespace': True
        }
    )
    
    # Merge multiple CSV files
    csv_processor.merge_csv_files(
        files=["data_jan.csv", "data_feb.csv", "data_mar.csv"],
        output_file="quarterly_data.csv",
        merge_type='concat',
        remove_duplicates=True
    )
    
    # Transform CSV
    csv_processor.transform_csv(
        input_file="raw_data.csv",
        output_file="transformed_data.csv",
        transformations=[
            {'type': 'rename', 'columns': {'old_col': 'new_col'}},
            {'type': 'add', 'column': 'total', 'formula': 'price * quantity'},
            {'type': 'filter', 'condition': 'total > 100'},
            {'type': 'sort', 'by': ['total'], 'ascending': False}
        ]
    )
    
    # Validate CSV
    schema = {
        'required_columns': ['id', 'name', 'price'],
        'data_types': {
            'id': 'integer',
            'name': 'string',
            'price': 'float'
        },
        'constraints': {
            'id': {'unique': True},
            'price': {'min': 0, 'not_null': True}
        }
    }
    
    validation = csv_processor.validate_csv("products.csv", schema)
    print(json.dumps(validation, indent=2))
    
    # Process large CSV in chunks
    stream_processor = CSVStreamProcessor(chunk_size=50000)
    
    def analyze_chunk(chunk):
        return {
            'row_count': len(chunk),
            'revenue_sum': chunk['revenue'].sum() if 'revenue' in chunk else 0
        }
    
    for result in stream_processor.process_large_csv("huge_file.csv", analyze_chunk):
        print(f"Chunk analysis: {result}")
    
    print("\nāœ… CSV processing complete!")

Key Takeaways and Best Practices šŸŽÆ

CSV Processing Best Practices šŸ“‹

Pro Tip: CSV files are deceptively complex. What looks like a simple comma-separated file can have nested quotes, multiline fields, different encodings, and various delimiters. Always profile your CSV files first, handle edge cases gracefully, and keep audit logs of all transformations. Remember: pandas is powerful but memory-hungry - for truly massive files, consider using Dask or processing in chunks!

CSV processing mastery turns you from a data janitor into a data architect. You can handle any CSV thrown at you - from messy exports to perfectly structured datasets. Whether you're doing ETL pipelines, data migration, or analysis, these skills will save you countless hours of manual data cleaning! šŸš€