š Log File Analysis: Extract Insights from System Logs
Logs are the black boxes of the digital world - they record everything that happens in your systems. But raw logs are like hieroglyphics; they need translation to reveal their secrets. Log analysis transforms mountains of text into actionable intelligence, helping you debug issues, detect security threats, and understand system behavior. Let's become digital archaeologists! š
The Log Analysis Ecosystem
Think of logs as the heartbeat of your systems. Each line is a pulse telling you what's happening. Web servers log requests, applications log errors, security systems log access attempts, and databases log queries. Python gives you the power to listen to all these heartbeats simultaneously and detect when something's wrong!
Real-World Scenario: The Incident Response Center šØ
You're managing a production environment with web servers, application servers, databases, and microservices. Logs are scattered across dozens of servers in different formats. When something goes wrong, you need to quickly correlate events across all systems, identify root causes, and detect security threats. Let's build a log analysis system that turns chaos into clarity!
import re
import os
import glob
import json
import csv
import sqlite3
import gzip
import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Generator, Pattern
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from enum import Enum
import logging
import threading
import queue
import time
import hashlib
import ipaddress
import urllib.parse
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
class LogLevel(Enum):
"""Standard log levels."""
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
@dataclass
class LogEntry:
"""Parsed log entry."""
timestamp: datetime
level: LogLevel
message: str
source: str
raw_line: str
fields: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convert to dictionary."""
return {
'timestamp': self.timestamp.isoformat(),
'level': self.level.name,
'message': self.message,
'source': self.source,
'fields': self.fields
}
class LogParser(ABC):
"""Abstract base class for log parsers."""
@abstractmethod
def parse(self, line: str) -> Optional[LogEntry]:
"""Parse a log line."""
pass
@abstractmethod
def can_parse(self, line: str) -> bool:
"""Check if this parser can handle the line."""
pass
class SyslogParser(LogParser):
"""Parse syslog format logs."""
def __init__(self):
# Syslog pattern: timestamp hostname process[pid]: message
self.pattern = re.compile(
r'^(?P\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+'
r'(?P[\w\.-]+)\s+'
r'(?P[\w\-/]+)(?:\[(?P\d+)\])?\s*:\s*'
r'(?P.*?)$'
)
self.levels = {
'emerg': LogLevel.CRITICAL,
'alert': LogLevel.CRITICAL,
'crit': LogLevel.CRITICAL,
'err': LogLevel.ERROR,
'error': LogLevel.ERROR,
'warning': LogLevel.WARNING,
'warn': LogLevel.WARNING,
'notice': LogLevel.INFO,
'info': LogLevel.INFO,
'debug': LogLevel.DEBUG
}
def can_parse(self, line: str) -> bool:
"""Check if line matches syslog format."""
return bool(self.pattern.match(line))
def parse(self, line: str) -> Optional[LogEntry]:
"""Parse syslog line."""
match = self.pattern.match(line)
if not match:
return None
data = match.groupdict()
# Parse timestamp (add current year)
timestamp_str = data['timestamp']
current_year = datetime.now().year
timestamp = datetime.strptime(f"{current_year} {timestamp_str}", "%Y %b %d %H:%M:%S")
# Determine log level from message
level = LogLevel.INFO
message_lower = data['message'].lower()
for keyword, log_level in self.levels.items():
if keyword in message_lower:
level = log_level
break
return LogEntry(
timestamp=timestamp,
level=level,
message=data['message'],
source=f"{data['hostname']}:{data['process']}",
raw_line=line,
fields={
'hostname': data['hostname'],
'process': data['process'],
'pid': data.get('pid')
}
)
class ApacheLogParser(LogParser):
"""Parse Apache/Nginx access logs."""
def __init__(self):
# Common Log Format
self.pattern = re.compile(
r'^(?P[\d\.]+)\s+'
r'(?P[\w\-]+)\s+'
r'(?P[\w\-]+)\s+'
r'\[(?P[^\]]+)\]\s+'
r'"(?P\w+)\s+(?P[^\s]+)\s+(?P[^"]+)"\s+'
r'(?P\d+)\s+'
r'(?P[\d\-]+)'
r'(?:\s+"(?P[^"]*)")?'
r'(?:\s+"(?P[^"]*)")?'
)
def can_parse(self, line: str) -> bool:
"""Check if line matches Apache format."""
return bool(self.pattern.match(line))
def parse(self, line: str) -> Optional[LogEntry]:
"""Parse Apache log line."""
match = self.pattern.match(line)
if not match:
return None
data = match.groupdict()
# Parse timestamp
timestamp = datetime.strptime(
data['timestamp'],
"%d/%b/%Y:%H:%M:%S %z"
)
# Determine log level based on status code
status = int(data['status'])
if status >= 500:
level = LogLevel.ERROR
elif status >= 400:
level = LogLevel.WARNING
else:
level = LogLevel.INFO
# Parse request
message = f"{data['method']} {data['path']} - {status}"
return LogEntry(
timestamp=timestamp,
level=level,
message=message,
source='apache',
raw_line=line,
fields={
'ip': data['ip'],
'method': data['method'],
'path': data['path'],
'status': status,
'size': int(data['size']) if data['size'] != '-' else 0,
'referer': data.get('referer'),
'user_agent': data.get('user_agent')
}
)
class PythonLogParser(LogParser):
"""Parse Python logging format."""
def __init__(self):
self.pattern = re.compile(
r'^(?P\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+'
r'(?P\w+)\s+'
r'(?P[\w\.]+)\s+'
r'(?P.*?)$'
)
self.levels = {
'DEBUG': LogLevel.DEBUG,
'INFO': LogLevel.INFO,
'WARNING': LogLevel.WARNING,
'ERROR': LogLevel.ERROR,
'CRITICAL': LogLevel.CRITICAL
}
def can_parse(self, line: str) -> bool:
"""Check if line matches Python logging format."""
return bool(self.pattern.match(line))
def parse(self, line: str) -> Optional[LogEntry]:
"""Parse Python log line."""
match = self.pattern.match(line)
if not match:
return None
data = match.groupdict()
# Parse timestamp
timestamp = datetime.strptime(
data['timestamp'],
"%Y-%m-%d %H:%M:%S,%f"
)
# Get log level
level = self.levels.get(data['level'], LogLevel.INFO)
return LogEntry(
timestamp=timestamp,
level=level,
message=data['message'],
source=data['module'],
raw_line=line,
fields={'module': data['module']}
)
class LogAnalyzer:
"""
Comprehensive log analysis system for multiple log formats and sources.
"""
def __init__(self, config: Dict = None):
self.config = config or self.get_default_config()
self.parsers = []
self.entries = []
self.setup_logging()
self.setup_database()
self.setup_parsers()
# Analysis caches
self.error_patterns = defaultdict(int)
self.ip_statistics = defaultdict(lambda: {'count': 0, 'errors': 0})
self.timeline = defaultdict(list)
def get_default_config(self) -> Dict:
"""Get default configuration."""
return {
'db_path': 'logs.db',
'batch_size': 1000,
'max_memory_entries': 100000,
'enable_realtime': False,
'alert_on_errors': True
}
def setup_logging(self):
"""Setup logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def setup_database(self):
"""Setup SQLite database for log storage."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
level TEXT,
source TEXT,
message TEXT,
fields TEXT,
raw_line TEXT,
file_hash TEXT,
INDEX idx_timestamp (timestamp),
INDEX idx_level (level),
INDEX idx_source (source)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS processed_files (
file_path TEXT PRIMARY KEY,
file_hash TEXT,
processed_at DATETIME,
line_count INTEGER
)
''')
conn.commit()
conn.close()
def setup_parsers(self):
"""Setup log parsers."""
self.parsers = [
SyslogParser(),
ApacheLogParser(),
PythonLogParser()
]
def add_parser(self, parser: LogParser):
"""Add custom log parser."""
self.parsers.append(parser)
self.logger.info(f"Added parser: {parser.__class__.__name__}")
def parse_line(self, line: str, source: str = 'unknown') -> Optional[LogEntry]:
"""Parse a single log line."""
line = line.strip()
if not line:
return None
# Try each parser
for parser in self.parsers:
if parser.can_parse(line):
entry = parser.parse(line)
if entry:
if entry.source == 'unknown':
entry.source = source
return entry
# Fallback - create generic entry
return LogEntry(
timestamp=datetime.now(),
level=LogLevel.INFO,
message=line,
source=source,
raw_line=line,
fields={}
)
def process_file(self, file_path: str, source: str = None) -> int:
"""Process a single log file."""
file_path = Path(file_path)
if not file_path.exists():
self.logger.error(f"File not found: {file_path}")
return 0
# Check if file was already processed
file_hash = self._get_file_hash(file_path)
if self._is_file_processed(str(file_path), file_hash):
self.logger.info(f"File already processed: {file_path}")
return 0
source = source or file_path.stem
entries_count = 0
batch = []
# Handle compressed files
if file_path.suffix == '.gz':
open_func = gzip.open
mode = 'rt'
else:
open_func = open
mode = 'r'
try:
with open_func(file_path, mode) as f:
for line_num, line in enumerate(f, 1):
try:
entry = self.parse_line(line, source)
if entry:
batch.append(entry)
entries_count += 1
# Store batch
if len(batch) >= self.config['batch_size']:
self._store_entries(batch)
batch = []
except Exception as e:
self.logger.error(f"Error parsing line {line_num}: {e}")
# Store remaining entries
if batch:
self._store_entries(batch)
# Mark file as processed
self._mark_file_processed(str(file_path), file_hash, entries_count)
except Exception as e:
self.logger.error(f"Error processing file {file_path}: {e}")
return 0
self.logger.info(f"Processed {entries_count} entries from {file_path}")
return entries_count
def process_directory(self, directory: str, pattern: str = "*.log*") -> int:
"""Process all log files in a directory."""
directory = Path(directory)
total_entries = 0
for file_path in directory.glob(pattern):
if file_path.is_file():
total_entries += self.process_file(file_path)
return total_entries
def tail_file(self, file_path: str, callback=None):
"""Tail a log file in real-time."""
file_path = Path(file_path)
with open(file_path, 'r') as f:
# Go to end of file
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
entry = self.parse_line(line, file_path.stem)
if entry:
# Process entry
self.entries.append(entry)
# Analyze in real-time
self._update_statistics(entry)
# Call callback if provided
if callback:
callback(entry)
# Check for alerts
if self.config['alert_on_errors'] and entry.level >= LogLevel.ERROR:
self._trigger_alert(entry)
def _store_entries(self, entries: List[LogEntry]):
"""Store entries in database."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
data = [
(
entry.timestamp,
entry.level.name,
entry.source,
entry.message,
json.dumps(entry.fields),
entry.raw_line,
None
)
for entry in entries
]
cursor.executemany(
'''INSERT INTO logs
(timestamp, level, source, message, fields, raw_line, file_hash)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
data
)
conn.commit()
conn.close()
# Update in-memory cache
self.entries.extend(entries)
# Limit memory usage
max_entries = self.config['max_memory_entries']
if len(self.entries) > max_entries:
self.entries = self.entries[-max_entries:]
def _get_file_hash(self, file_path: Path) -> str:
"""Calculate file hash."""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _is_file_processed(self, file_path: str, file_hash: str) -> bool:
"""Check if file was already processed."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute(
"SELECT file_hash FROM processed_files WHERE file_path = ?",
(file_path,)
)
result = cursor.fetchone()
conn.close()
return result and result[0] == file_hash
def _mark_file_processed(self, file_path: str, file_hash: str, line_count: int):
"""Mark file as processed."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
cursor.execute(
'''INSERT OR REPLACE INTO processed_files
(file_path, file_hash, processed_at, line_count)
VALUES (?, ?, ?, ?)''',
(file_path, file_hash, datetime.now(), line_count)
)
conn.commit()
conn.close()
def _update_statistics(self, entry: LogEntry):
"""Update real-time statistics."""
# Update error patterns
if entry.level >= LogLevel.ERROR:
# Extract error pattern
pattern = self._extract_error_pattern(entry.message)
self.error_patterns[pattern] += 1
# Update IP statistics (for web logs)
if 'ip' in entry.fields:
ip = entry.fields['ip']
self.ip_statistics[ip]['count'] += 1
if entry.level >= LogLevel.ERROR:
self.ip_statistics[ip]['errors'] += 1
# Update timeline
hour_key = entry.timestamp.strftime("%Y-%m-%d %H:00")
self.timeline[hour_key].append(entry)
def _extract_error_pattern(self, message: str) -> str:
"""Extract error pattern from message."""
# Remove numbers, IDs, etc. to create generic pattern
pattern = re.sub(r'\b\d+\b', 'NUM', message)
pattern = re.sub(r'\b[a-f0-9]{8,}\b', 'ID', pattern)
pattern = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 'IP', pattern)
return pattern[:100] # Limit length
def _trigger_alert(self, entry: LogEntry):
"""Trigger alert for critical log entry."""
alert_message = f"ALERT: {entry.level.name} - {entry.source} - {entry.message}"
self.logger.warning(alert_message)
# Implement additional alerting (email, webhook, etc.)
def analyze_errors(self, hours: int = 24) -> Dict:
"""Analyze errors in recent logs."""
cutoff = datetime.now() - timedelta(hours=hours)
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
# Get error counts by source
cursor.execute('''
SELECT source, level, COUNT(*) as count
FROM logs
WHERE timestamp > ? AND level IN ('ERROR', 'CRITICAL')
GROUP BY source, level
ORDER BY count DESC
''', (cutoff,))
error_by_source = cursor.fetchall()
# Get top error messages
cursor.execute('''
SELECT message, COUNT(*) as count
FROM logs
WHERE timestamp > ? AND level IN ('ERROR', 'CRITICAL')
GROUP BY message
ORDER BY count DESC
LIMIT 10
''', (cutoff,))
top_errors = cursor.fetchall()
conn.close()
return {
'error_by_source': error_by_source,
'top_errors': top_errors,
'error_patterns': dict(self.error_patterns.most_common(10))
}
def analyze_performance(self, hours: int = 24) -> Dict:
"""Analyze performance metrics from logs."""
cutoff = datetime.now() - timedelta(hours=hours)
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
# For web logs - response time analysis
cursor.execute('''
SELECT
AVG(CAST(json_extract(fields, '$.response_time') AS REAL)) as avg_response_time,
MAX(CAST(json_extract(fields, '$.response_time') AS REAL)) as max_response_time,
MIN(CAST(json_extract(fields, '$.response_time') AS REAL)) as min_response_time
FROM logs
WHERE timestamp > ?
AND json_extract(fields, '$.response_time') IS NOT NULL
''', (cutoff,))
response_times = cursor.fetchone()
# Status code distribution
cursor.execute('''
SELECT
json_extract(fields, '$.status') as status,
COUNT(*) as count
FROM logs
WHERE timestamp > ?
AND json_extract(fields, '$.status') IS NOT NULL
GROUP BY status
''', (cutoff,))
status_distribution = cursor.fetchall()
conn.close()
return {
'response_times': response_times,
'status_distribution': status_distribution
}
def find_anomalies(self) -> List[Dict]:
"""Find anomalies in log patterns."""
anomalies = []
# Check for unusual error spikes
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
for i in range(24):
hour = current_hour - timedelta(hours=i)
hour_key = hour.strftime("%Y-%m-%d %H:00")
if hour_key in self.timeline:
entries = self.timeline[hour_key]
error_count = sum(1 for e in entries if e.level >= LogLevel.ERROR)
# Check if error rate is unusually high
if error_count > 100: # Threshold
anomalies.append({
'type': 'error_spike',
'timestamp': hour,
'count': error_count,
'severity': 'high'
})
# Check for suspicious IPs
for ip, stats in self.ip_statistics.items():
error_rate = stats['errors'] / stats['count'] if stats['count'] > 0 else 0
# High error rate from single IP
if error_rate > 0.5 and stats['count'] > 100:
anomalies.append({
'type': 'suspicious_ip',
'ip': ip,
'requests': stats['count'],
'errors': stats['errors'],
'error_rate': error_rate,
'severity': 'medium'
})
return anomalies
def search_logs(self, query: str,
start_time: datetime = None,
end_time: datetime = None,
level: LogLevel = None,
source: str = None,
limit: int = 100) -> List[LogEntry]:
"""Search logs with filters."""
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
# Build query
conditions = []
params = []
if query:
conditions.append("message LIKE ?")
params.append(f"%{query}%")
if start_time:
conditions.append("timestamp >= ?")
params.append(start_time)
if end_time:
conditions.append("timestamp <= ?")
params.append(end_time)
if level:
conditions.append("level = ?")
params.append(level.name)
if source:
conditions.append("source = ?")
params.append(source)
where_clause = " AND ".join(conditions) if conditions else "1=1"
cursor.execute(f'''
SELECT timestamp, level, source, message, fields, raw_line
FROM logs
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT ?
''', params + [limit])
results = cursor.fetchall()
conn.close()
# Convert to LogEntry objects
entries = []
for row in results:
entries.append(LogEntry(
timestamp=datetime.fromisoformat(row[0]),
level=LogLevel[row[1]],
source=row[2],
message=row[3],
fields=json.loads(row[4]) if row[4] else {},
raw_line=row[5]
))
return entries
def generate_report(self, hours: int = 24) -> Dict:
"""Generate comprehensive log analysis report."""
cutoff = datetime.now() - timedelta(hours=hours)
conn = sqlite3.connect(self.config['db_path'])
cursor = conn.cursor()
# Total log entries
cursor.execute(
"SELECT COUNT(*) FROM logs WHERE timestamp > ?",
(cutoff,)
)
total_entries = cursor.fetchone()[0]
# Log level distribution
cursor.execute('''
SELECT level, COUNT(*) as count
FROM logs
WHERE timestamp > ?
GROUP BY level
''', (cutoff,))
level_distribution = dict(cursor.fetchall())
# Top sources
cursor.execute('''
SELECT source, COUNT(*) as count
FROM logs
WHERE timestamp > ?
GROUP BY source
ORDER BY count DESC
LIMIT 10
''', (cutoff,))
top_sources = cursor.fetchall()
conn.close()
# Get additional analysis
error_analysis = self.analyze_errors(hours)
performance_analysis = self.analyze_performance(hours)
anomalies = self.find_anomalies()
return {
'period': {
'start': cutoff,
'end': datetime.now(),
'hours': hours
},
'summary': {
'total_entries': total_entries,
'level_distribution': level_distribution,
'top_sources': top_sources
},
'errors': error_analysis,
'performance': performance_analysis,
'anomalies': anomalies,
'top_ips': [
{
'ip': ip,
'count': stats['count'],
'errors': stats['errors']
}
for ip, stats in sorted(
self.ip_statistics.items(),
key=lambda x: x[1]['count'],
reverse=True
)[:10]
]
}
class SecurityLogAnalyzer:
"""Specialized analyzer for security logs."""
def __init__(self, log_analyzer: LogAnalyzer):
self.analyzer = log_analyzer
self.threat_patterns = self.load_threat_patterns()
def load_threat_patterns(self) -> Dict[str, Pattern]:
"""Load security threat patterns."""
return {
'sql_injection': re.compile(
r'(\bunion\b.*\bselect\b|\bselect\b.*\bfrom\b|\bdrop\b.*\btable\b)',
re.IGNORECASE
),
'xss_attempt': re.compile(
r'(
Key Takeaways and Best Practices šÆ
- Use Multiple Parsers: Different log formats require different parsing strategies.
- Index Your Logs: Store logs in a database with proper indexes for fast searching.
- Pattern Recognition: Extract patterns from errors to identify recurring issues.
- Real-time Monitoring: Tail important logs for immediate alert on critical events.
- Correlation is Key: Correlate events across different log sources to understand issues.
- Security First: Always check for security threats and suspicious patterns.
- Manage Log Rotation: Handle compressed and rotated logs properly.
Log Analysis Best Practices š
Log analysis mastery transforms you from a reactive debugger to a proactive system detective. You can identify issues before users complain, detect security threats before damage occurs, and understand system behavior at a deep level. Whether you're debugging applications, hunting security threats, or optimizing performance, these log analysis skills are invaluable! š
Pro Tip: Logs are your system's diary - they tell you everything that happened, but only if you know how to read them. Start by understanding your log formats, build robust parsers, and create a centralized log storage system. Use pattern matching to identify issues, but don't forget about context. A single error might be noise, but 1000 errors in 5 minutes is a crisis. Always correlate logs from different sources - the web server log might show a 500 error, but the application log tells you why. And remember: log analysis isn't just about finding problems, it's about understanding your system's behavior and improving it!