Skip to main content

šŸ“ Log File Analysis: Extract Insights from System Logs

Logs are the black boxes of the digital world - they record everything that happens in your systems. But raw logs are like hieroglyphics; they need translation to reveal their secrets. Log analysis transforms mountains of text into actionable intelligence, helping you debug issues, detect security threats, and understand system behavior. Let's become digital archaeologists! šŸ”

The Log Analysis Ecosystem

Think of logs as the heartbeat of your systems. Each line is a pulse telling you what's happening. Web servers log requests, applications log errors, security systems log access attempts, and databases log queries. Python gives you the power to listen to all these heartbeats simultaneously and detect when something's wrong!

graph TB A[Log Sources] --> B[Log Collection] B --> C[Parsing & Extraction] C --> D[Analysis Engine] D --> E[Insights & Actions] A --> F[System Logs] A --> G[Application Logs] A --> H[Security Logs] A --> I[Web Server Logs] A --> J[Database Logs] C --> K[Pattern Matching] C --> L[Field Extraction] C --> M[Normalization] D --> N[Error Analysis] D --> O[Performance Analysis] D --> P[Security Analysis] D --> Q[Trend Detection] E --> R[Alerts] E --> S[Reports] E --> T[Visualizations] E --> U[Automated Responses] style A fill:#ff6b6b style C fill:#51cf66 style D fill:#339af0 style E fill:#ffd43b

Real-World Scenario: The Incident Response Center 🚨

You're managing a production environment with web servers, application servers, databases, and microservices. Logs are scattered across dozens of servers in different formats. When something goes wrong, you need to quickly correlate events across all systems, identify root causes, and detect security threats. Let's build a log analysis system that turns chaos into clarity!

import re
import os
import glob
import json
import csv
import sqlite3
import gzip
import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Generator, Pattern
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from enum import Enum
import logging
import threading
import queue
import time
import hashlib
import ipaddress
import urllib.parse
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod

class LogLevel(Enum):
    """Standard log levels."""
    DEBUG = 10
    INFO = 20
    WARNING = 30
    ERROR = 40
    CRITICAL = 50

@dataclass
class LogEntry:
    """Parsed log entry."""
    timestamp: datetime
    level: LogLevel
    message: str
    source: str
    raw_line: str
    fields: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict:
        """Convert to dictionary."""
        return {
            'timestamp': self.timestamp.isoformat(),
            'level': self.level.name,
            'message': self.message,
            'source': self.source,
            'fields': self.fields
        }

class LogParser(ABC):
    """Abstract base class for log parsers."""
    
    @abstractmethod
    def parse(self, line: str) -> Optional[LogEntry]:
        """Parse a log line."""
        pass
    
    @abstractmethod
    def can_parse(self, line: str) -> bool:
        """Check if this parser can handle the line."""
        pass

class SyslogParser(LogParser):
    """Parse syslog format logs."""
    
    def __init__(self):
        # Syslog pattern: timestamp hostname process[pid]: message
        self.pattern = re.compile(
            r'^(?P\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+'
            r'(?P[\w\.-]+)\s+'
            r'(?P[\w\-/]+)(?:\[(?P\d+)\])?\s*:\s*'
            r'(?P.*?)$'
        )
        
        self.levels = {
            'emerg': LogLevel.CRITICAL,
            'alert': LogLevel.CRITICAL,
            'crit': LogLevel.CRITICAL,
            'err': LogLevel.ERROR,
            'error': LogLevel.ERROR,
            'warning': LogLevel.WARNING,
            'warn': LogLevel.WARNING,
            'notice': LogLevel.INFO,
            'info': LogLevel.INFO,
            'debug': LogLevel.DEBUG
        }
    
    def can_parse(self, line: str) -> bool:
        """Check if line matches syslog format."""
        return bool(self.pattern.match(line))
    
    def parse(self, line: str) -> Optional[LogEntry]:
        """Parse syslog line."""
        match = self.pattern.match(line)
        if not match:
            return None
        
        data = match.groupdict()
        
        # Parse timestamp (add current year)
        timestamp_str = data['timestamp']
        current_year = datetime.now().year
        timestamp = datetime.strptime(f"{current_year} {timestamp_str}", "%Y %b %d %H:%M:%S")
        
        # Determine log level from message
        level = LogLevel.INFO
        message_lower = data['message'].lower()
        for keyword, log_level in self.levels.items():
            if keyword in message_lower:
                level = log_level
                break
        
        return LogEntry(
            timestamp=timestamp,
            level=level,
            message=data['message'],
            source=f"{data['hostname']}:{data['process']}",
            raw_line=line,
            fields={
                'hostname': data['hostname'],
                'process': data['process'],
                'pid': data.get('pid')
            }
        )

class ApacheLogParser(LogParser):
    """Parse Apache/Nginx access logs."""
    
    def __init__(self):
        # Common Log Format
        self.pattern = re.compile(
            r'^(?P[\d\.]+)\s+'
            r'(?P[\w\-]+)\s+'
            r'(?P[\w\-]+)\s+'
            r'\[(?P[^\]]+)\]\s+'
            r'"(?P\w+)\s+(?P[^\s]+)\s+(?P[^"]+)"\s+'
            r'(?P\d+)\s+'
            r'(?P[\d\-]+)'
            r'(?:\s+"(?P[^"]*)")?'
            r'(?:\s+"(?P[^"]*)")?'
        )
    
    def can_parse(self, line: str) -> bool:
        """Check if line matches Apache format."""
        return bool(self.pattern.match(line))
    
    def parse(self, line: str) -> Optional[LogEntry]:
        """Parse Apache log line."""
        match = self.pattern.match(line)
        if not match:
            return None
        
        data = match.groupdict()
        
        # Parse timestamp
        timestamp = datetime.strptime(
            data['timestamp'], 
            "%d/%b/%Y:%H:%M:%S %z"
        )
        
        # Determine log level based on status code
        status = int(data['status'])
        if status >= 500:
            level = LogLevel.ERROR
        elif status >= 400:
            level = LogLevel.WARNING
        else:
            level = LogLevel.INFO
        
        # Parse request
        message = f"{data['method']} {data['path']} - {status}"
        
        return LogEntry(
            timestamp=timestamp,
            level=level,
            message=message,
            source='apache',
            raw_line=line,
            fields={
                'ip': data['ip'],
                'method': data['method'],
                'path': data['path'],
                'status': status,
                'size': int(data['size']) if data['size'] != '-' else 0,
                'referer': data.get('referer'),
                'user_agent': data.get('user_agent')
            }
        )

class PythonLogParser(LogParser):
    """Parse Python logging format."""
    
    def __init__(self):
        self.pattern = re.compile(
            r'^(?P\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+'
            r'(?P\w+)\s+'
            r'(?P[\w\.]+)\s+'
            r'(?P.*?)$'
        )
        
        self.levels = {
            'DEBUG': LogLevel.DEBUG,
            'INFO': LogLevel.INFO,
            'WARNING': LogLevel.WARNING,
            'ERROR': LogLevel.ERROR,
            'CRITICAL': LogLevel.CRITICAL
        }
    
    def can_parse(self, line: str) -> bool:
        """Check if line matches Python logging format."""
        return bool(self.pattern.match(line))
    
    def parse(self, line: str) -> Optional[LogEntry]:
        """Parse Python log line."""
        match = self.pattern.match(line)
        if not match:
            return None
        
        data = match.groupdict()
        
        # Parse timestamp
        timestamp = datetime.strptime(
            data['timestamp'], 
            "%Y-%m-%d %H:%M:%S,%f"
        )
        
        # Get log level
        level = self.levels.get(data['level'], LogLevel.INFO)
        
        return LogEntry(
            timestamp=timestamp,
            level=level,
            message=data['message'],
            source=data['module'],
            raw_line=line,
            fields={'module': data['module']}
        )

class LogAnalyzer:
    """
    Comprehensive log analysis system for multiple log formats and sources.
    """
    
    def __init__(self, config: Dict = None):
        self.config = config or self.get_default_config()
        self.parsers = []
        self.entries = []
        self.setup_logging()
        self.setup_database()
        self.setup_parsers()
        
        # Analysis caches
        self.error_patterns = defaultdict(int)
        self.ip_statistics = defaultdict(lambda: {'count': 0, 'errors': 0})
        self.timeline = defaultdict(list)
        
    def get_default_config(self) -> Dict:
        """Get default configuration."""
        return {
            'db_path': 'logs.db',
            'batch_size': 1000,
            'max_memory_entries': 100000,
            'enable_realtime': False,
            'alert_on_errors': True
        }
    
    def setup_logging(self):
        """Setup logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_database(self):
        """Setup SQLite database for log storage."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                level TEXT,
                source TEXT,
                message TEXT,
                fields TEXT,
                raw_line TEXT,
                file_hash TEXT,
                INDEX idx_timestamp (timestamp),
                INDEX idx_level (level),
                INDEX idx_source (source)
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS processed_files (
                file_path TEXT PRIMARY KEY,
                file_hash TEXT,
                processed_at DATETIME,
                line_count INTEGER
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def setup_parsers(self):
        """Setup log parsers."""
        self.parsers = [
            SyslogParser(),
            ApacheLogParser(),
            PythonLogParser()
        ]
    
    def add_parser(self, parser: LogParser):
        """Add custom log parser."""
        self.parsers.append(parser)
        self.logger.info(f"Added parser: {parser.__class__.__name__}")
    
    def parse_line(self, line: str, source: str = 'unknown') -> Optional[LogEntry]:
        """Parse a single log line."""
        line = line.strip()
        if not line:
            return None
        
        # Try each parser
        for parser in self.parsers:
            if parser.can_parse(line):
                entry = parser.parse(line)
                if entry:
                    if entry.source == 'unknown':
                        entry.source = source
                    return entry
        
        # Fallback - create generic entry
        return LogEntry(
            timestamp=datetime.now(),
            level=LogLevel.INFO,
            message=line,
            source=source,
            raw_line=line,
            fields={}
        )
    
    def process_file(self, file_path: str, source: str = None) -> int:
        """Process a single log file."""
        file_path = Path(file_path)
        
        if not file_path.exists():
            self.logger.error(f"File not found: {file_path}")
            return 0
        
        # Check if file was already processed
        file_hash = self._get_file_hash(file_path)
        if self._is_file_processed(str(file_path), file_hash):
            self.logger.info(f"File already processed: {file_path}")
            return 0
        
        source = source or file_path.stem
        entries_count = 0
        batch = []
        
        # Handle compressed files
        if file_path.suffix == '.gz':
            open_func = gzip.open
            mode = 'rt'
        else:
            open_func = open
            mode = 'r'
        
        try:
            with open_func(file_path, mode) as f:
                for line_num, line in enumerate(f, 1):
                    try:
                        entry = self.parse_line(line, source)
                        if entry:
                            batch.append(entry)
                            entries_count += 1
                            
                            # Store batch
                            if len(batch) >= self.config['batch_size']:
                                self._store_entries(batch)
                                batch = []
                                
                    except Exception as e:
                        self.logger.error(f"Error parsing line {line_num}: {e}")
                
                # Store remaining entries
                if batch:
                    self._store_entries(batch)
                
                # Mark file as processed
                self._mark_file_processed(str(file_path), file_hash, entries_count)
                
        except Exception as e:
            self.logger.error(f"Error processing file {file_path}: {e}")
            return 0
        
        self.logger.info(f"Processed {entries_count} entries from {file_path}")
        return entries_count
    
    def process_directory(self, directory: str, pattern: str = "*.log*") -> int:
        """Process all log files in a directory."""
        directory = Path(directory)
        total_entries = 0
        
        for file_path in directory.glob(pattern):
            if file_path.is_file():
                total_entries += self.process_file(file_path)
        
        return total_entries
    
    def tail_file(self, file_path: str, callback=None):
        """Tail a log file in real-time."""
        file_path = Path(file_path)
        
        with open(file_path, 'r') as f:
            # Go to end of file
            f.seek(0, 2)
            
            while True:
                line = f.readline()
                
                if not line:
                    time.sleep(0.1)
                    continue
                
                entry = self.parse_line(line, file_path.stem)
                if entry:
                    # Process entry
                    self.entries.append(entry)
                    
                    # Analyze in real-time
                    self._update_statistics(entry)
                    
                    # Call callback if provided
                    if callback:
                        callback(entry)
                    
                    # Check for alerts
                    if self.config['alert_on_errors'] and entry.level >= LogLevel.ERROR:
                        self._trigger_alert(entry)
    
    def _store_entries(self, entries: List[LogEntry]):
        """Store entries in database."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        data = [
            (
                entry.timestamp,
                entry.level.name,
                entry.source,
                entry.message,
                json.dumps(entry.fields),
                entry.raw_line,
                None
            )
            for entry in entries
        ]
        
        cursor.executemany(
            '''INSERT INTO logs 
               (timestamp, level, source, message, fields, raw_line, file_hash)
               VALUES (?, ?, ?, ?, ?, ?, ?)''',
            data
        )
        
        conn.commit()
        conn.close()
        
        # Update in-memory cache
        self.entries.extend(entries)
        
        # Limit memory usage
        max_entries = self.config['max_memory_entries']
        if len(self.entries) > max_entries:
            self.entries = self.entries[-max_entries:]
    
    def _get_file_hash(self, file_path: Path) -> str:
        """Calculate file hash."""
        hash_md5 = hashlib.md5()
        
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        
        return hash_md5.hexdigest()
    
    def _is_file_processed(self, file_path: str, file_hash: str) -> bool:
        """Check if file was already processed."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute(
            "SELECT file_hash FROM processed_files WHERE file_path = ?",
            (file_path,)
        )
        
        result = cursor.fetchone()
        conn.close()
        
        return result and result[0] == file_hash
    
    def _mark_file_processed(self, file_path: str, file_hash: str, line_count: int):
        """Mark file as processed."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        cursor.execute(
            '''INSERT OR REPLACE INTO processed_files 
               (file_path, file_hash, processed_at, line_count)
               VALUES (?, ?, ?, ?)''',
            (file_path, file_hash, datetime.now(), line_count)
        )
        
        conn.commit()
        conn.close()
    
    def _update_statistics(self, entry: LogEntry):
        """Update real-time statistics."""
        # Update error patterns
        if entry.level >= LogLevel.ERROR:
            # Extract error pattern
            pattern = self._extract_error_pattern(entry.message)
            self.error_patterns[pattern] += 1
        
        # Update IP statistics (for web logs)
        if 'ip' in entry.fields:
            ip = entry.fields['ip']
            self.ip_statistics[ip]['count'] += 1
            
            if entry.level >= LogLevel.ERROR:
                self.ip_statistics[ip]['errors'] += 1
        
        # Update timeline
        hour_key = entry.timestamp.strftime("%Y-%m-%d %H:00")
        self.timeline[hour_key].append(entry)
    
    def _extract_error_pattern(self, message: str) -> str:
        """Extract error pattern from message."""
        # Remove numbers, IDs, etc. to create generic pattern
        pattern = re.sub(r'\b\d+\b', 'NUM', message)
        pattern = re.sub(r'\b[a-f0-9]{8,}\b', 'ID', pattern)
        pattern = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 'IP', pattern)
        
        return pattern[:100]  # Limit length
    
    def _trigger_alert(self, entry: LogEntry):
        """Trigger alert for critical log entry."""
        alert_message = f"ALERT: {entry.level.name} - {entry.source} - {entry.message}"
        self.logger.warning(alert_message)
        
        # Implement additional alerting (email, webhook, etc.)
    
    def analyze_errors(self, hours: int = 24) -> Dict:
        """Analyze errors in recent logs."""
        cutoff = datetime.now() - timedelta(hours=hours)
        
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        # Get error counts by source
        cursor.execute('''
            SELECT source, level, COUNT(*) as count
            FROM logs
            WHERE timestamp > ? AND level IN ('ERROR', 'CRITICAL')
            GROUP BY source, level
            ORDER BY count DESC
        ''', (cutoff,))
        
        error_by_source = cursor.fetchall()
        
        # Get top error messages
        cursor.execute('''
            SELECT message, COUNT(*) as count
            FROM logs
            WHERE timestamp > ? AND level IN ('ERROR', 'CRITICAL')
            GROUP BY message
            ORDER BY count DESC
            LIMIT 10
        ''', (cutoff,))
        
        top_errors = cursor.fetchall()
        
        conn.close()
        
        return {
            'error_by_source': error_by_source,
            'top_errors': top_errors,
            'error_patterns': dict(self.error_patterns.most_common(10))
        }
    
    def analyze_performance(self, hours: int = 24) -> Dict:
        """Analyze performance metrics from logs."""
        cutoff = datetime.now() - timedelta(hours=hours)
        
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        # For web logs - response time analysis
        cursor.execute('''
            SELECT 
                AVG(CAST(json_extract(fields, '$.response_time') AS REAL)) as avg_response_time,
                MAX(CAST(json_extract(fields, '$.response_time') AS REAL)) as max_response_time,
                MIN(CAST(json_extract(fields, '$.response_time') AS REAL)) as min_response_time
            FROM logs
            WHERE timestamp > ? 
                AND json_extract(fields, '$.response_time') IS NOT NULL
        ''', (cutoff,))
        
        response_times = cursor.fetchone()
        
        # Status code distribution
        cursor.execute('''
            SELECT 
                json_extract(fields, '$.status') as status,
                COUNT(*) as count
            FROM logs
            WHERE timestamp > ? 
                AND json_extract(fields, '$.status') IS NOT NULL
            GROUP BY status
        ''', (cutoff,))
        
        status_distribution = cursor.fetchall()
        
        conn.close()
        
        return {
            'response_times': response_times,
            'status_distribution': status_distribution
        }
    
    def find_anomalies(self) -> List[Dict]:
        """Find anomalies in log patterns."""
        anomalies = []
        
        # Check for unusual error spikes
        current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
        
        for i in range(24):
            hour = current_hour - timedelta(hours=i)
            hour_key = hour.strftime("%Y-%m-%d %H:00")
            
            if hour_key in self.timeline:
                entries = self.timeline[hour_key]
                error_count = sum(1 for e in entries if e.level >= LogLevel.ERROR)
                
                # Check if error rate is unusually high
                if error_count > 100:  # Threshold
                    anomalies.append({
                        'type': 'error_spike',
                        'timestamp': hour,
                        'count': error_count,
                        'severity': 'high'
                    })
        
        # Check for suspicious IPs
        for ip, stats in self.ip_statistics.items():
            error_rate = stats['errors'] / stats['count'] if stats['count'] > 0 else 0
            
            # High error rate from single IP
            if error_rate > 0.5 and stats['count'] > 100:
                anomalies.append({
                    'type': 'suspicious_ip',
                    'ip': ip,
                    'requests': stats['count'],
                    'errors': stats['errors'],
                    'error_rate': error_rate,
                    'severity': 'medium'
                })
        
        return anomalies
    
    def search_logs(self, query: str, 
                   start_time: datetime = None,
                   end_time: datetime = None,
                   level: LogLevel = None,
                   source: str = None,
                   limit: int = 100) -> List[LogEntry]:
        """Search logs with filters."""
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        # Build query
        conditions = []
        params = []
        
        if query:
            conditions.append("message LIKE ?")
            params.append(f"%{query}%")
        
        if start_time:
            conditions.append("timestamp >= ?")
            params.append(start_time)
        
        if end_time:
            conditions.append("timestamp <= ?")
            params.append(end_time)
        
        if level:
            conditions.append("level = ?")
            params.append(level.name)
        
        if source:
            conditions.append("source = ?")
            params.append(source)
        
        where_clause = " AND ".join(conditions) if conditions else "1=1"
        
        cursor.execute(f'''
            SELECT timestamp, level, source, message, fields, raw_line
            FROM logs
            WHERE {where_clause}
            ORDER BY timestamp DESC
            LIMIT ?
        ''', params + [limit])
        
        results = cursor.fetchall()
        conn.close()
        
        # Convert to LogEntry objects
        entries = []
        for row in results:
            entries.append(LogEntry(
                timestamp=datetime.fromisoformat(row[0]),
                level=LogLevel[row[1]],
                source=row[2],
                message=row[3],
                fields=json.loads(row[4]) if row[4] else {},
                raw_line=row[5]
            ))
        
        return entries
    
    def generate_report(self, hours: int = 24) -> Dict:
        """Generate comprehensive log analysis report."""
        cutoff = datetime.now() - timedelta(hours=hours)
        
        conn = sqlite3.connect(self.config['db_path'])
        cursor = conn.cursor()
        
        # Total log entries
        cursor.execute(
            "SELECT COUNT(*) FROM logs WHERE timestamp > ?",
            (cutoff,)
        )
        total_entries = cursor.fetchone()[0]
        
        # Log level distribution
        cursor.execute('''
            SELECT level, COUNT(*) as count
            FROM logs
            WHERE timestamp > ?
            GROUP BY level
        ''', (cutoff,))
        
        level_distribution = dict(cursor.fetchall())
        
        # Top sources
        cursor.execute('''
            SELECT source, COUNT(*) as count
            FROM logs
            WHERE timestamp > ?
            GROUP BY source
            ORDER BY count DESC
            LIMIT 10
        ''', (cutoff,))
        
        top_sources = cursor.fetchall()
        
        conn.close()
        
        # Get additional analysis
        error_analysis = self.analyze_errors(hours)
        performance_analysis = self.analyze_performance(hours)
        anomalies = self.find_anomalies()
        
        return {
            'period': {
                'start': cutoff,
                'end': datetime.now(),
                'hours': hours
            },
            'summary': {
                'total_entries': total_entries,
                'level_distribution': level_distribution,
                'top_sources': top_sources
            },
            'errors': error_analysis,
            'performance': performance_analysis,
            'anomalies': anomalies,
            'top_ips': [
                {
                    'ip': ip,
                    'count': stats['count'],
                    'errors': stats['errors']
                }
                for ip, stats in sorted(
                    self.ip_statistics.items(),
                    key=lambda x: x[1]['count'],
                    reverse=True
                )[:10]
            ]
        }

class SecurityLogAnalyzer:
    """Specialized analyzer for security logs."""
    
    def __init__(self, log_analyzer: LogAnalyzer):
        self.analyzer = log_analyzer
        self.threat_patterns = self.load_threat_patterns()
    
    def load_threat_patterns(self) -> Dict[str, Pattern]:
        """Load security threat patterns."""
        return {
            'sql_injection': re.compile(
                r'(\bunion\b.*\bselect\b|\bselect\b.*\bfrom\b|\bdrop\b.*\btable\b)',
                re.IGNORECASE
            ),
            'xss_attempt': re.compile(
                r'( List[Dict]:
        """Detect security threats in logs."""
        threats = []
        
        # Search recent logs
        cutoff = datetime.now() - timedelta(hours=hours)
        entries = self.analyzer.search_logs(
            query=None,
            start_time=cutoff
        )
        
        # Check each entry for threats
        threat_ips = defaultdict(list)
        
        for entry in entries:
            for threat_type, pattern in self.threat_patterns.items():
                if pattern.search(entry.message) or pattern.search(entry.raw_line):
                    threat = {
                        'type': threat_type,
                        'timestamp': entry.timestamp,
                        'source': entry.source,
                        'message': entry.message[:200],
                        'severity': 'high' if threat_type in ['sql_injection', 'command_injection'] else 'medium'
                    }
                    
                    # Extract IP if available
                    if 'ip' in entry.fields:
                        threat['ip'] = entry.fields['ip']
                        threat_ips[entry.fields['ip']].append(threat_type)
                    
                    threats.append(threat)
        
        # Identify high-risk IPs
        for ip, threat_types in threat_ips.items():
            if len(threat_types) > 5:
                threats.append({
                    'type': 'multi_threat_source',
                    'ip': ip,
                    'threat_count': len(threat_types),
                    'threat_types': list(set(threat_types)),
                    'severity': 'critical'
                })
        
        return threats
    
    def detect_brute_force(self, threshold: int = 10, 
                          window_minutes: int = 5) -> List[Dict]:
        """Detect brute force attempts."""
        attacks = []
        
        # Group failed login attempts by IP and time window
        failed_attempts = defaultdict(list)
        
        entries = self.analyzer.search_logs(
            query="failed",
            start_time=datetime.now() - timedelta(hours=1)
        )
        
        for entry in entries:
            if 'ip' in entry.fields:
                failed_attempts[entry.fields['ip']].append(entry.timestamp)
        
        # Check for rapid failed attempts
        for ip, timestamps in failed_attempts.items():
            timestamps.sort()
            
            # Sliding window check
            for i in range(len(timestamps)):
                window_end = timestamps[i] + timedelta(minutes=window_minutes)
                count = sum(1 for t in timestamps[i:] if t <= window_end)
                
                if count >= threshold:
                    attacks.append({
                        'type': 'brute_force',
                        'ip': ip,
                        'attempts': count,
                        'window': window_minutes,
                        'first_attempt': timestamps[i],
                        'severity': 'high'
                    })
                    break
        
        return attacks

class LogMonitor:
    """Real-time log monitoring with alerting."""
    
    def __init__(self, analyzer: LogAnalyzer):
        self.analyzer = analyzer
        self.monitoring = False
        self.threads = []
        self.alert_rules = []
        self.alert_queue = queue.Queue()
    
    def add_alert_rule(self, name: str, condition: callable, 
                       message: str, severity: str = 'medium'):
        """Add alert rule."""
        self.alert_rules.append({
            'name': name,
            'condition': condition,
            'message': message,
            'severity': severity
        })
    
    def start_monitoring(self, log_files: List[str]):
        """Start monitoring log files."""
        self.monitoring = True
        
        # Start monitoring threads for each file
        for file_path in log_files:
            thread = threading.Thread(
                target=self._monitor_file,
                args=(file_path,)
            )
            thread.daemon = True
            thread.start()
            self.threads.append(thread)
        
        # Start alert processing thread
        alert_thread = threading.Thread(target=self._process_alerts)
        alert_thread.daemon = True
        alert_thread.start()
        self.threads.append(alert_thread)
        
        logging.info(f"Started monitoring {len(log_files)} files")
    
    def _monitor_file(self, file_path: str):
        """Monitor a single file."""
        def process_entry(entry: LogEntry):
            # Check alert rules
            for rule in self.alert_rules:
                if rule['condition'](entry):
                    alert = {
                        'rule': rule['name'],
                        'message': rule['message'].format(entry=entry),
                        'severity': rule['severity'],
                        'timestamp': datetime.now(),
                        'log_entry': entry
                    }
                    self.alert_queue.put(alert)
        
        # Tail file
        self.analyzer.tail_file(file_path, callback=process_entry)
    
    def _process_alerts(self):
        """Process alerts from queue."""
        while self.monitoring:
            try:
                alert = self.alert_queue.get(timeout=1)
                self._send_alert(alert)
            except queue.Empty:
                continue
    
    def _send_alert(self, alert: Dict):
        """Send alert through configured channels."""
        logging.warning(f"ALERT: {alert['severity']} - {alert['message']}")
        
        # Implement additional alerting (email, Slack, etc.)

# Example usage
if __name__ == "__main__":
    # Initialize log analyzer
    analyzer = LogAnalyzer()
    
    # Add custom parser for application-specific logs
    class CustomAppParser(LogParser):
        def __init__(self):
            self.pattern = re.compile(r'^\[(?P\w+)\] (?P.*)')
        
        def can_parse(self, line: str) -> bool:
            return line.startswith('[')
        
        def parse(self, line: str) -> Optional[LogEntry]:
            match = self.pattern.match(line)
            if match:
                return LogEntry(
                    timestamp=datetime.now(),
                    level=LogLevel.INFO,
                    message=match.group('message'),
                    source='custom_app',
                    raw_line=line,
                    fields={}
                )
            return None
    
    analyzer.add_parser(CustomAppParser())
    
    # Process log files
    print("šŸ“ Processing log files...")
    analyzer.process_file('/var/log/syslog', 'syslog')
    analyzer.process_file('/var/log/apache2/access.log', 'apache')
    analyzer.process_directory('/var/log/myapp/', '*.log')
    
    # Generate report
    print("\nšŸ“Š Generating analysis report...")
    report = analyzer.generate_report(hours=24)
    
    print(f"\nšŸ“ˆ Log Analysis Report (Last 24 Hours)")
    print(f"   Total Entries: {report['summary']['total_entries']:,}")
    print(f"   Log Levels:")
    for level, count in report['summary']['level_distribution'].items():
        print(f"     {level}: {count:,}")
    
    print(f"\nšŸ” Error Analysis:")
    print(f"   Top Error Patterns:")
    for pattern, count in report['errors']['error_patterns'].items():
        print(f"     {pattern[:50]}...: {count} occurrences")
    
    print(f"\nāš ļø Anomalies Detected:")
    for anomaly in report['anomalies']:
        print(f"   {anomaly['type']}: {anomaly}")
    
    # Security analysis
    print("\nšŸ”’ Security Analysis:")
    security_analyzer = SecurityLogAnalyzer(analyzer)
    
    threats = security_analyzer.detect_threats()
    if threats:
        print(f"   Threats Detected: {len(threats)}")
        for threat in threats[:5]:
            print(f"     {threat['type']}: {threat.get('ip', 'N/A')} - {threat['severity']}")
    
    brute_force = security_analyzer.detect_brute_force()
    if brute_force:
        print(f"   Brute Force Attempts: {len(brute_force)}")
        for attack in brute_force:
            print(f"     {attack['ip']}: {attack['attempts']} attempts in {attack['window']} minutes")
    
    # Search specific logs
    print("\nšŸ”Ž Searching for errors...")
    error_entries = analyzer.search_logs(
        query="error",
        level=LogLevel.ERROR,
        limit=5
    )
    
    for entry in error_entries:
        print(f"   {entry.timestamp} - {entry.source}: {entry.message[:100]}")
    
    # Start real-time monitoring
    print("\nšŸ‘ļø Starting real-time monitoring...")
    monitor = LogMonitor(analyzer)
    
    # Add alert rules
    monitor.add_alert_rule(
        name="critical_error",
        condition=lambda e: e.level >= LogLevel.CRITICAL,
        message="Critical error detected: {entry.message}",
        severity="critical"
    )
    
    monitor.add_alert_rule(
        name="high_error_rate",
        condition=lambda e: e.level >= LogLevel.ERROR,
        message="Error detected in {entry.source}: {entry.message}",
        severity="high"
    )
    
    # Start monitoring
    # monitor.start_monitoring(['/var/log/syslog', '/var/log/apache2/error.log'])
    
    print("\nāœ… Log analysis complete!")

Key Takeaways and Best Practices šŸŽÆ

Log Analysis Best Practices šŸ“‹

Pro Tip: Logs are your system's diary - they tell you everything that happened, but only if you know how to read them. Start by understanding your log formats, build robust parsers, and create a centralized log storage system. Use pattern matching to identify issues, but don't forget about context. A single error might be noise, but 1000 errors in 5 minutes is a crisis. Always correlate logs from different sources - the web server log might show a 500 error, but the application log tells you why. And remember: log analysis isn't just about finding problems, it's about understanding your system's behavior and improving it!

Log analysis mastery transforms you from a reactive debugger to a proactive system detective. You can identify issues before users complain, detect security threats before damage occurs, and understand system behavior at a deep level. Whether you're debugging applications, hunting security threats, or optimizing performance, these log analysis skills are invaluable! šŸš€