š CSV Processing: Master Tabular Data with Python
CSV files are the unsung heroes of data exchange. They're simple, universal, and everywhere - from Excel exports to database dumps, API responses to IoT sensor logs. But beneath their simple comma-separated surface lies a world of complexity. Let's turn Python into your CSV command center! šÆ
The Hidden Complexity of CSV Files
CSV files seem simple - just commas and data, right? Wrong! They can have different delimiters, encodings, quote characters, escape sequences, and more. It's like each CSV file speaks a slightly different dialect of the same language. Python helps you become a polyglot!
Real-World Scenario: The Data Pipeline Orchestrator š
You're the data engineer for a retail company. Every day, you receive CSV files from 50 stores (sales data), 20 suppliers (inventory), 5 marketing platforms (campaigns), and 3 payment processors (transactions). Each has different formats, encodings, and quirks. Let's build a system that handles them all!
import csv
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional, Any, Tuple, Union
import json
import chardet
from datetime import datetime, timedelta
import re
import logging
from io import StringIO, BytesIO
import sqlite3
from collections import defaultdict, Counter
import asyncio
import aiofiles
class CSVMasterProcessor:
"""
Comprehensive CSV processing system with advanced features
for handling any CSV file thrown at it.
"""
def __init__(self, working_dir: str = None):
self.working_dir = Path(working_dir) if working_dir else Path.cwd()
self.working_dir.mkdir(parents=True, exist_ok=True)
# Setup logging
self.setup_logging()
# Common CSV dialects
self.dialects = {
'excel': csv.excel,
'excel_tab': csv.excel_tab,
'unix': csv.unix_dialect
}
# Data type inference patterns
self.type_patterns = {
'integer': re.compile(r'^-?\d+$'),
'float': re.compile(r'^-?\d*\.\d+$'),
'date': re.compile(r'^\d{4}-\d{2}-\d{2}$'),
'datetime': re.compile(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}$'),
'boolean': re.compile(r'^(true|false|yes|no|1|0)$', re.IGNORECASE),
'email': re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'),
'phone': re.compile(r'^[\d\s\-\(\)\+]+$'),
'url': re.compile(r'^https?://'),
'json': re.compile(r'^[\{\[].*[\}\]]$')
}
def setup_logging(self):
"""Setup comprehensive logging."""
log_file = self.working_dir / 'csv_processing.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def detect_encoding(self, file_path: str, sample_size: int = 10000) -> str:
"""
Detect file encoding using chardet.
"""
try:
with open(file_path, 'rb') as file:
sample = file.read(sample_size)
result = chardet.detect(sample)
encoding = result['encoding']
confidence = result['confidence']
self.logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})")
# Fallback for low confidence
if confidence < 0.7:
# Try common encodings
for enc in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
try:
sample.decode(enc)
return enc
except:
continue
return encoding or 'utf-8'
except Exception as e:
self.logger.warning(f"Encoding detection failed: {e}, using utf-8")
return 'utf-8'
def detect_delimiter(self, file_path: str, encoding: str = 'utf-8') -> str:
"""
Intelligently detect CSV delimiter.
"""
try:
with open(file_path, 'r', encoding=encoding) as file:
# Read sample lines
sample = []
for _ in range(10):
line = file.readline()
if line:
sample.append(line)
# Use csv.Sniffer
sniffer = csv.Sniffer()
dialect = sniffer.sniff(''.join(sample))
delimiter = dialect.delimiter
self.logger.info(f"Detected delimiter: {repr(delimiter)}")
return delimiter
except:
# Fallback: count common delimiters
delimiters = [',', '\t', ';', '|', ':']
delimiter_counts = defaultdict(int)
with open(file_path, 'r', encoding=encoding) as file:
for _ in range(10):
line = file.readline()
if line:
for delim in delimiters:
delimiter_counts[delim] += line.count(delim)
if delimiter_counts:
delimiter = max(delimiter_counts, key=delimiter_counts.get)
self.logger.info(f"Detected delimiter (fallback): {repr(delimiter)}")
return delimiter
return ','
def analyze_csv_structure(self, file_path: str) -> Dict:
"""
Comprehensive CSV structure analysis.
"""
encoding = self.detect_encoding(file_path)
delimiter = self.detect_delimiter(file_path, encoding)
structure = {
'encoding': encoding,
'delimiter': delimiter,
'has_header': False,
'column_count': 0,
'row_count': 0,
'columns': [],
'data_types': {},
'null_counts': {},
'unique_counts': {},
'sample_data': []
}
try:
# Read CSV with detected parameters
df = pd.read_csv(
file_path,
encoding=encoding,
delimiter=delimiter,
nrows=1000, # Sample for analysis
low_memory=False
)
# Detect if first row is header
first_row = df.iloc[0] if len(df) > 0 else []
numeric_count = sum(1 for val in first_row if self._is_numeric(str(val)))
structure['has_header'] = numeric_count < len(first_row) / 2
if not structure['has_header']:
# Re-read without header
df = pd.read_csv(
file_path,
encoding=encoding,
delimiter=delimiter,
header=None,
nrows=1000
)
df.columns = [f'Column_{i}' for i in range(len(df.columns))]
# Analyze structure
structure['column_count'] = len(df.columns)
structure['columns'] = df.columns.tolist()
# Count total rows
with open(file_path, 'r', encoding=encoding) as f:
structure['row_count'] = sum(1 for _ in f) - (1 if structure['has_header'] else 0)
# Analyze each column
for col in df.columns:
# Data type inference
structure['data_types'][col] = self._infer_data_type(df[col])
# Null counts
structure['null_counts'][col] = df[col].isnull().sum()
# Unique counts
structure['unique_counts'][col] = df[col].nunique()
# Sample data
structure['sample_data'] = df.head(5).to_dict('records')
self.logger.info(f"CSV structure analysis complete: {Path(file_path).name}")
except Exception as e:
self.logger.error(f"Structure analysis failed: {e}")
return structure
def _is_numeric(self, value: str) -> bool:
"""Check if a string value is numeric."""
try:
float(value)
return True
except:
return False
def _infer_data_type(self, series: pd.Series) -> str:
"""Infer the data type of a pandas series."""
# Remove nulls for analysis
non_null = series.dropna()
if len(non_null) == 0:
return 'empty'
# Check patterns
sample = non_null.astype(str).head(100)
type_counts = defaultdict(int)
for value in sample:
for dtype, pattern in self.type_patterns.items():
if pattern.match(value):
type_counts[dtype] += 1
break
else:
type_counts['string'] += 1
# Return most common type
if type_counts:
return max(type_counts, key=type_counts.get)
return 'string'
def clean_csv(self, input_file: str, output_file: str,
cleaning_options: Dict = None) -> Dict:
"""
Clean and standardize CSV file.
"""
options = cleaning_options or {
'remove_duplicates': True,
'handle_missing': 'drop', # 'drop', 'fill', 'interpolate'
'standardize_dates': True,
'trim_whitespace': True,
'standardize_case': None, # 'upper', 'lower', 'title'
'remove_special_chars': False,
'validate_emails': False,
'validate_phones': False
}
stats = {
'original_rows': 0,
'cleaned_rows': 0,
'duplicates_removed': 0,
'missing_handled': 0,
'modifications': []
}
try:
# Detect encoding and delimiter
encoding = self.detect_encoding(input_file)
delimiter = self.detect_delimiter(input_file, encoding)
# Read CSV
df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
stats['original_rows'] = len(df)
# Remove duplicates
if options['remove_duplicates']:
before = len(df)
df = df.drop_duplicates()
stats['duplicates_removed'] = before - len(df)
if stats['duplicates_removed'] > 0:
stats['modifications'].append(f"Removed {stats['duplicates_removed']} duplicates")
# Handle missing values
if options['handle_missing'] == 'drop':
before = len(df)
df = df.dropna()
stats['missing_handled'] = before - len(df)
if stats['missing_handled'] > 0:
stats['modifications'].append(f"Dropped {stats['missing_handled']} rows with missing values")
elif options['handle_missing'] == 'fill':
# Fill with appropriate values based on data type
for col in df.columns:
if df[col].dtype == 'object':
df[col] = df[col].fillna('')
else:
df[col] = df[col].fillna(df[col].median())
stats['modifications'].append("Filled missing values")
elif options['handle_missing'] == 'interpolate':
df = df.interpolate()
stats['modifications'].append("Interpolated missing values")
# Trim whitespace
if options['trim_whitespace']:
string_cols = df.select_dtypes(include=['object']).columns
for col in string_cols:
df[col] = df[col].str.strip()
stats['modifications'].append("Trimmed whitespace")
# Standardize case
if options['standardize_case']:
string_cols = df.select_dtypes(include=['object']).columns
for col in string_cols:
if options['standardize_case'] == 'upper':
df[col] = df[col].str.upper()
elif options['standardize_case'] == 'lower':
df[col] = df[col].str.lower()
elif options['standardize_case'] == 'title':
df[col] = df[col].str.title()
stats['modifications'].append(f"Standardized case to {options['standardize_case']}")
# Standardize dates
if options['standardize_dates']:
for col in df.columns:
if 'date' in col.lower():
try:
df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')
stats['modifications'].append(f"Standardized dates in {col}")
except:
pass
# Validate emails
if options['validate_emails']:
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
for col in df.columns:
if 'email' in col.lower():
df[col] = df[col].apply(
lambda x: x if pd.isna(x) or email_pattern.match(str(x)) else np.nan
)
stats['modifications'].append(f"Validated emails in {col}")
# Remove special characters
if options['remove_special_chars']:
string_cols = df.select_dtypes(include=['object']).columns
for col in string_cols:
df[col] = df[col].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
stats['modifications'].append("Removed special characters")
stats['cleaned_rows'] = len(df)
# Save cleaned CSV
df.to_csv(output_file, index=False, encoding='utf-8')
self.logger.info(f"CSV cleaned: {output_file}")
self.logger.info(f"Cleaning stats: {stats}")
except Exception as e:
self.logger.error(f"CSV cleaning failed: {e}")
stats['error'] = str(e)
return stats
def merge_csv_files(self, files: List[str], output_file: str,
merge_type: str = 'concat', **kwargs) -> bool:
"""
Merge multiple CSV files using various strategies.
Types:
- 'concat': Simple concatenation
- 'join': Database-style join
- 'merge': Pandas merge with deduplication
"""
try:
dataframes = []
# Read all CSV files
for file in files:
encoding = self.detect_encoding(file)
delimiter = self.detect_delimiter(file, encoding)
df = pd.read_csv(file, encoding=encoding, delimiter=delimiter)
dataframes.append(df)
self.logger.info(f"Loaded {Path(file).name}: {len(df)} rows")
if merge_type == 'concat':
# Simple concatenation
result = pd.concat(dataframes, ignore_index=True)
# Remove duplicates if requested
if kwargs.get('remove_duplicates', True):
before = len(result)
result = result.drop_duplicates()
self.logger.info(f"Removed {before - len(result)} duplicate rows")
elif merge_type == 'join':
# Database-style join
join_on = kwargs.get('on', None)
how = kwargs.get('how', 'inner')
result = dataframes[0]
for df in dataframes[1:]:
result = result.merge(df, on=join_on, how=how, suffixes=('', '_dup'))
elif merge_type == 'merge':
# Smart merge with conflict resolution
result = dataframes[0]
for df in dataframes[1:]:
# Find common columns
common_cols = list(set(result.columns) & set(df.columns))
if common_cols:
# Merge on common columns
result = pd.concat([result, df]).drop_duplicates(subset=common_cols, keep='last')
else:
# No common columns, just concatenate
result = pd.concat([result, df], axis=1)
# Save merged result
result.to_csv(output_file, index=False, encoding='utf-8')
self.logger.info(f"Merged {len(files)} files into {output_file}: {len(result)} total rows")
return True
except Exception as e:
self.logger.error(f"Merge failed: {e}")
return False
def split_csv(self, input_file: str, output_dir: str,
split_by: str = 'rows', **kwargs) -> List[str]:
"""
Split CSV file into multiple files.
Split strategies:
- 'rows': Split by number of rows
- 'column_value': Split by unique values in a column
- 'size': Split by file size
- 'random': Random sampling
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
output_files = []
try:
# Detect encoding and delimiter
encoding = self.detect_encoding(input_file)
delimiter = self.detect_delimiter(input_file, encoding)
if split_by == 'rows':
# Split by row count
chunk_size = kwargs.get('chunk_size', 1000)
for chunk_num, chunk in enumerate(pd.read_csv(
input_file,
encoding=encoding,
delimiter=delimiter,
chunksize=chunk_size
)):
output_file = output_path / f"chunk_{chunk_num:04d}.csv"
chunk.to_csv(output_file, index=False)
output_files.append(str(output_file))
self.logger.info(f"Split into {len(output_files)} chunks of {chunk_size} rows")
elif split_by == 'column_value':
# Split by unique column values
column = kwargs.get('column')
df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
for value in df[column].unique():
subset = df[df[column] == value]
safe_value = re.sub(r'[^\w\s-]', '', str(value))[:50]
output_file = output_path / f"{column}_{safe_value}.csv"
subset.to_csv(output_file, index=False)
output_files.append(str(output_file))
self.logger.info(f"Split into {len(output_files)} files by {column} values")
elif split_by == 'size':
# Split by file size
max_size_mb = kwargs.get('max_size_mb', 10)
max_size_bytes = max_size_mb * 1024 * 1024
df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
current_chunk = []
current_size = 0
chunk_num = 0
for _, row in df.iterrows():
row_size = len(row.to_csv())
if current_size + row_size > max_size_bytes and current_chunk:
# Save current chunk
output_file = output_path / f"chunk_{chunk_num:04d}.csv"
pd.DataFrame(current_chunk).to_csv(output_file, index=False)
output_files.append(str(output_file))
# Reset
current_chunk = []
current_size = 0
chunk_num += 1
current_chunk.append(row)
current_size += row_size
# Save last chunk
if current_chunk:
output_file = output_path / f"chunk_{chunk_num:04d}.csv"
pd.DataFrame(current_chunk).to_csv(output_file, index=False)
output_files.append(str(output_file))
self.logger.info(f"Split into {len(output_files)} files by size ({max_size_mb}MB max)")
elif split_by == 'random':
# Random sampling
sample_size = kwargs.get('sample_size', 1000)
num_samples = kwargs.get('num_samples', 5)
df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
for i in range(num_samples):
sample = df.sample(n=min(sample_size, len(df)), replace=False)
output_file = output_path / f"sample_{i:03d}.csv"
sample.to_csv(output_file, index=False)
output_files.append(str(output_file))
self.logger.info(f"Created {num_samples} random samples of {sample_size} rows")
return output_files
except Exception as e:
self.logger.error(f"Split failed: {e}")
return []
def transform_csv(self, input_file: str, output_file: str,
transformations: List[Dict]) -> bool:
"""
Apply transformations to CSV data.
transformations = [
{'type': 'rename', 'columns': {'old_name': 'new_name'}},
{'type': 'drop', 'columns': ['col1', 'col2']},
{'type': 'add', 'column': 'new_col', 'formula': 'col1 + col2'},
{'type': 'filter', 'condition': 'col1 > 100'},
{'type': 'sort', 'by': ['col1'], 'ascending': True}
]
"""
try:
# Read CSV
encoding = self.detect_encoding(input_file)
delimiter = self.detect_delimiter(input_file, encoding)
df = pd.read_csv(input_file, encoding=encoding, delimiter=delimiter)
self.logger.info(f"Loaded {input_file}: {len(df)} rows, {len(df.columns)} columns")
# Apply transformations
for transform in transformations:
t_type = transform['type']
if t_type == 'rename':
# Rename columns
df = df.rename(columns=transform['columns'])
self.logger.info(f"Renamed columns: {transform['columns']}")
elif t_type == 'drop':
# Drop columns
df = df.drop(columns=transform['columns'], errors='ignore')
self.logger.info(f"Dropped columns: {transform['columns']}")
elif t_type == 'add':
# Add calculated column
column_name = transform['column']
formula = transform['formula']
# Safe evaluation of formula
try:
df[column_name] = eval(formula, {"__builtins__": {}}, df.to_dict('series'))
self.logger.info(f"Added column '{column_name}' with formula: {formula}")
except Exception as e:
self.logger.error(f"Formula evaluation failed: {e}")
elif t_type == 'filter':
# Filter rows
condition = transform['condition']
before = len(df)
df = df.query(condition)
self.logger.info(f"Filtered {before - len(df)} rows with condition: {condition}")
elif t_type == 'sort':
# Sort data
df = df.sort_values(
by=transform['by'],
ascending=transform.get('ascending', True)
)
self.logger.info(f"Sorted by: {transform['by']}")
elif t_type == 'pivot':
# Create pivot table
df = df.pivot_table(
index=transform.get('index'),
columns=transform.get('columns'),
values=transform.get('values'),
aggfunc=transform.get('aggfunc', 'sum')
)
self.logger.info("Created pivot table")
elif t_type == 'groupby':
# Group and aggregate
df = df.groupby(transform['by']).agg(transform.get('agg', 'sum'))
self.logger.info(f"Grouped by: {transform['by']}")
# Save transformed data
df.to_csv(output_file, index=False, encoding='utf-8')
self.logger.info(f"Saved transformed data: {output_file} ({len(df)} rows, {len(df.columns)} columns)")
return True
except Exception as e:
self.logger.error(f"Transformation failed: {e}")
return False
def validate_csv(self, file_path: str, schema: Dict) -> Dict:
"""
Validate CSV against a schema.
schema = {
'columns': ['col1', 'col2', 'col3'],
'required_columns': ['col1', 'col2'],
'data_types': {'col1': 'int', 'col2': 'string', 'col3': 'float'},
'constraints': {
'col1': {'min': 0, 'max': 100},
'col2': {'pattern': r'^[A-Z]+$'},
'col3': {'not_null': True}
}
}
"""
validation_results = {
'valid': True,
'errors': [],
'warnings': [],
'statistics': {}
}
try:
# Read CSV
encoding = self.detect_encoding(file_path)
delimiter = self.detect_delimiter(file_path, encoding)
df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter)
# Validate columns
if 'columns' in schema:
expected = set(schema['columns'])
actual = set(df.columns)
missing = expected - actual
extra = actual - expected
if missing:
validation_results['errors'].append(f"Missing columns: {missing}")
validation_results['valid'] = False
if extra:
validation_results['warnings'].append(f"Extra columns: {extra}")
# Validate required columns
if 'required_columns' in schema:
for col in schema['required_columns']:
if col not in df.columns:
validation_results['errors'].append(f"Required column missing: {col}")
validation_results['valid'] = False
# Validate data types
if 'data_types' in schema:
for col, expected_type in schema['data_types'].items():
if col in df.columns:
actual_type = self._infer_data_type(df[col])
if not self._types_compatible(actual_type, expected_type):
validation_results['errors'].append(
f"Column '{col}' type mismatch: expected {expected_type}, got {actual_type}"
)
validation_results['valid'] = False
# Validate constraints
if 'constraints' in schema:
for col, constraints in schema['constraints'].items():
if col not in df.columns:
continue
# Not null constraint
if constraints.get('not_null'):
null_count = df[col].isnull().sum()
if null_count > 0:
validation_results['errors'].append(
f"Column '{col}' has {null_count} null values"
)
validation_results['valid'] = False
# Min/max constraints
if 'min' in constraints:
min_val = df[col].min()
if min_val < constraints['min']:
validation_results['errors'].append(
f"Column '{col}' has values below minimum {constraints['min']}"
)
validation_results['valid'] = False
if 'max' in constraints:
max_val = df[col].max()
if max_val > constraints['max']:
validation_results['errors'].append(
f"Column '{col}' has values above maximum {constraints['max']}"
)
validation_results['valid'] = False
# Pattern constraint
if 'pattern' in constraints:
pattern = re.compile(constraints['pattern'])
non_matching = df[~df[col].astype(str).str.match(pattern)]
if len(non_matching) > 0:
validation_results['errors'].append(
f"Column '{col}' has {len(non_matching)} values not matching pattern"
)
validation_results['valid'] = False
# Unique constraint
if constraints.get('unique'):
duplicates = df[col].duplicated().sum()
if duplicates > 0:
validation_results['errors'].append(
f"Column '{col}' has {duplicates} duplicate values"
)
validation_results['valid'] = False
# Collect statistics
validation_results['statistics'] = {
'row_count': len(df),
'column_count': len(df.columns),
'memory_usage': df.memory_usage(deep=True).sum() / 1024 / 1024, # MB
'null_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
}
except Exception as e:
validation_results['errors'].append(f"Validation error: {e}")
validation_results['valid'] = False
return validation_results
def _types_compatible(self, actual: str, expected: str) -> bool:
"""Check if actual type is compatible with expected type."""
compatibility = {
'integer': ['integer', 'float'],
'float': ['integer', 'float'],
'string': ['string', 'integer', 'float', 'date', 'datetime'],
'date': ['date', 'datetime'],
'datetime': ['datetime']
}
return actual in compatibility.get(expected, [expected])
def csv_to_database(self, csv_file: str, db_path: str,
table_name: str, if_exists: str = 'replace') -> bool:
"""
Import CSV to SQLite database.
"""
try:
# Read CSV
encoding = self.detect_encoding(csv_file)
delimiter = self.detect_delimiter(csv_file, encoding)
df = pd.read_csv(csv_file, encoding=encoding, delimiter=delimiter)
# Connect to database
conn = sqlite3.connect(db_path)
# Import to database
df.to_sql(table_name, conn, if_exists=if_exists, index=False)
# Create indexes on likely key columns
cursor = conn.cursor()
for col in df.columns:
if 'id' in col.lower() or 'key' in col.lower():
try:
cursor.execute(f"CREATE INDEX idx_{table_name}_{col} ON {table_name}({col})")
except:
pass
conn.commit()
conn.close()
self.logger.info(f"Imported {len(df)} rows to database table '{table_name}'")
return True
except Exception as e:
self.logger.error(f"Database import failed: {e}")
return False
class CSVStreamProcessor:
"""
Process large CSV files using streaming and chunking.
"""
def __init__(self, chunk_size: int = 10000):
self.chunk_size = chunk_size
self.processed_rows = 0
def process_large_csv(self, file_path: str, process_func, *args, **kwargs):
"""
Process large CSV file in chunks.
"""
for chunk in pd.read_csv(file_path, chunksize=self.chunk_size):
result = process_func(chunk, *args, **kwargs)
self.processed_rows += len(chunk)
if self.processed_rows % 100000 == 0:
print(f" Processed {self.processed_rows:,} rows...")
yield result
async def async_process_csv(self, file_path: str, process_func):
"""
Asynchronously process CSV file.
"""
async with aiofiles.open(file_path, mode='r') as file:
reader = csv.DictReader(await file.read().split('\n'))
tasks = []
for row in reader:
task = asyncio.create_task(process_func(row))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
class CSVAnalyzer:
"""
Advanced CSV analysis and profiling.
"""
def profile_csv(self, file_path: str) -> Dict:
"""
Create comprehensive profile of CSV data.
"""
profile = {
'file_info': {},
'structure': {},
'quality': {},
'statistics': {},
'recommendations': []
}
# File information
file_stats = os.stat(file_path)
profile['file_info'] = {
'path': file_path,
'size_mb': file_stats.st_size / 1024 / 1024,
'modified': datetime.fromtimestamp(file_stats.st_mtime),
'created': datetime.fromtimestamp(file_stats.st_ctime)
}
# Read sample for analysis
df = pd.read_csv(file_path, nrows=10000)
# Structure analysis
profile['structure'] = {
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': df.dtypes.astype(str).to_dict()
}
# Quality metrics
profile['quality'] = {
'completeness': (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
'duplicate_rows': df.duplicated().sum(),
'duplicate_ratio': (df.duplicated().sum() / len(df)) * 100
}
# Statistical analysis
profile['statistics'] = {
'numeric_columns': {},
'categorical_columns': {}
}
for col in df.select_dtypes(include=[np.number]).columns:
profile['statistics']['numeric_columns'][col] = {
'mean': df[col].mean(),
'median': df[col].median(),
'std': df[col].std(),
'min': df[col].min(),
'max': df[col].max(),
'nulls': df[col].isnull().sum()
}
for col in df.select_dtypes(include=['object']).columns:
profile['statistics']['categorical_columns'][col] = {
'unique': df[col].nunique(),
'most_common': df[col].value_counts().head(5).to_dict(),
'nulls': df[col].isnull().sum()
}
# Recommendations
if profile['quality']['completeness'] < 90:
profile['recommendations'].append("High number of missing values detected")
if profile['quality']['duplicate_ratio'] > 5:
profile['recommendations'].append("Significant number of duplicate rows found")
for col, stats in profile['statistics']['numeric_columns'].items():
if stats['std'] > stats['mean'] * 2:
profile['recommendations'].append(f"High variance in column '{col}'")
return profile
# Example usage
if __name__ == "__main__":
# Initialize processor
csv_processor = CSVMasterProcessor(working_dir="./csv_workspace")
# Analyze CSV structure
structure = csv_processor.analyze_csv_structure("sales_data.csv")
print(json.dumps(structure, indent=2, default=str))
# Clean CSV
csv_processor.clean_csv(
input_file="dirty_data.csv",
output_file="clean_data.csv",
cleaning_options={
'remove_duplicates': True,
'handle_missing': 'fill',
'standardize_dates': True,
'trim_whitespace': True
}
)
# Merge multiple CSV files
csv_processor.merge_csv_files(
files=["data_jan.csv", "data_feb.csv", "data_mar.csv"],
output_file="quarterly_data.csv",
merge_type='concat',
remove_duplicates=True
)
# Transform CSV
csv_processor.transform_csv(
input_file="raw_data.csv",
output_file="transformed_data.csv",
transformations=[
{'type': 'rename', 'columns': {'old_col': 'new_col'}},
{'type': 'add', 'column': 'total', 'formula': 'price * quantity'},
{'type': 'filter', 'condition': 'total > 100'},
{'type': 'sort', 'by': ['total'], 'ascending': False}
]
)
# Validate CSV
schema = {
'required_columns': ['id', 'name', 'price'],
'data_types': {
'id': 'integer',
'name': 'string',
'price': 'float'
},
'constraints': {
'id': {'unique': True},
'price': {'min': 0, 'not_null': True}
}
}
validation = csv_processor.validate_csv("products.csv", schema)
print(json.dumps(validation, indent=2))
# Process large CSV in chunks
stream_processor = CSVStreamProcessor(chunk_size=50000)
def analyze_chunk(chunk):
return {
'row_count': len(chunk),
'revenue_sum': chunk['revenue'].sum() if 'revenue' in chunk else 0
}
for result in stream_processor.process_large_csv("huge_file.csv", analyze_chunk):
print(f"Chunk analysis: {result}")
print("\nā
CSV processing complete!")
Key Takeaways and Best Practices šÆ
- Always Detect Encoding: Never assume UTF-8. Use chardet or try multiple encodings.
- Handle Large Files Efficiently: Use chunking for files that don't fit in memory.
- Validate Early and Often: Check data quality before processing to avoid garbage in, garbage out.
- Preserve Data Types: Be careful with automatic type inference - dates and numbers can be tricky.
- Clean Before Processing: Remove duplicates, handle missing values, and standardize formats.
- Use Appropriate Tools: csv module for simple operations, pandas for complex analysis.
- Document Your Schemas: Keep clear documentation of expected CSV formats.
CSV Processing Best Practices š
CSV processing mastery turns you from a data janitor into a data architect. You can handle any CSV thrown at you - from messy exports to perfectly structured datasets. Whether you're doing ETL pipelines, data migration, or analysis, these skills will save you countless hours of manual data cleaning! š
Pro Tip: CSV files are deceptively complex. What looks like a simple comma-separated file can have nested quotes, multiline fields, different encodings, and various delimiters. Always profile your CSV files first, handle edge cases gracefully, and keep audit logs of all transformations. Remember: pandas is powerful but memory-hungry - for truly massive files, consider using Dask or processing in chunks!