Skip to main content

🌐 JSON/XML Parsing: Navigate Complex Data Structures

JSON and XML are the languages of the internet. APIs speak JSON, configuration files use XML, and every modern application exchanges data in these formats. They're like the DNA of digital communication - structured, hierarchical, and sometimes surprisingly complex. Let's master the art of parsing, transforming, and generating these ubiquitous data formats! šŸš€

The Architecture of Structured Data

Think of JSON as a modern skyscraper - clean lines, efficient structure, easy to navigate. XML is like a Gothic cathedral - ornate, detailed, with namespaces like flying buttresses supporting complex structures. Both have their place, and Python gives you the tools to be an architect of both!

graph TB A[Data Sources] --> B{Format Detection} B --> C[JSON] B --> D[XML] B --> E[YAML] C --> F[Parse & Validate] D --> F E --> F F --> G[Schema Validation] F --> H[Data Extraction] F --> I[Transformation] G --> J[Clean Data Structure] H --> J I --> J J --> K[Processing Engine] K --> L[Query & Filter] K --> M[Merge & Transform] K --> N[Convert Format] L --> O[Output] M --> O N --> O O --> P[JSON] O --> Q[XML] O --> R[Database] O --> S[API Response] style A fill:#ff6b6b style J fill:#51cf66 style O fill:#339af0

Real-World Scenario: The API Integration Hub šŸ”Œ

You're building an integration platform that connects 20 different services. Each API returns different JSON structures, legacy systems send XML, configuration comes in YAML, and you need to transform everything into a unified format for your data warehouse. Let's build a universal parser!

import json
import xml.etree.ElementTree as ET
import xml.dom.minidom as minidom
from lxml import etree
import xmltodict
import yaml
import jsonschema
from jsonschema import validate, ValidationError
import xmlschema
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
import re
from datetime import datetime
from collections import defaultdict, OrderedDict
import logging
from json import JSONEncoder
import requests
from bs4 import BeautifulSoup

class UniversalDataParser:
    """
    Comprehensive parser for JSON, XML, YAML and other structured data formats.
    """
    
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.setup_logging()
        
        # JSON configuration
        self.json_config = {
            'strict': False,
            'ensure_ascii': False,
            'indent': 2,
            'sort_keys': False
        }
        
        # XML configuration
        self.xml_config = {
            'parser': 'lxml',  # 'lxml' or 'etree'
            'remove_namespaces': False,
            'preserve_attributes': True,
            'encoding': 'utf-8'
        }
        
        # Schema cache
        self.schema_cache = {}
        
    def setup_logging(self):
        """Setup logging for parser operations."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def detect_format(self, data: Union[str, bytes], file_path: str = None) -> str:
        """
        Intelligently detect data format.
        """
        # Check file extension if provided
        if file_path:
            ext = Path(file_path).suffix.lower()
            if ext in ['.json', '.jsonl']:
                return 'json'
            elif ext in ['.xml', '.xsd']:
                return 'xml'
            elif ext in ['.yaml', '.yml']:
                return 'yaml'
            elif ext == '.html':
                return 'html'
        
        # Content-based detection
        if isinstance(data, bytes):
            data = data.decode('utf-8', errors='ignore')
        
        data_stripped = data.strip()
        
        # JSON detection
        if (data_stripped.startswith('{') and data_stripped.endswith('}')) or \
           (data_stripped.startswith('[') and data_stripped.endswith(']')):
            try:
                json.loads(data_stripped)
                return 'json'
            except:
                pass
        
        # XML detection
        if data_stripped.startswith('<') and ('<?xml' in data_stripped or '<!' in data_stripped):
            try:
                ET.fromstring(data_stripped)
                return 'xml'
            except:
                pass
        
        # YAML detection
        if ':' in data_stripped and not data_stripped.startswith('<'):
            try:
                yaml.safe_load(data_stripped)
                return 'yaml'
            except:
                pass
        
        # HTML detection
        if '<html' in data_stripped.lower() or '<!doctype html' in data_stripped.lower():
            return 'html'
        
        return 'unknown'
    
    def parse_json(self, data: Union[str, bytes], schema: Dict = None, 
                   strict: bool = False) -> Dict:
        """
        Parse JSON with validation and error handling.
        """
        try:
            # Parse JSON
            if isinstance(data, bytes):
                data = data.decode('utf-8')
            
            if strict:
                # Strict parsing - no comments, trailing commas, etc.
                parsed = json.loads(data, strict=True)
            else:
                # Lenient parsing - try to fix common issues
                # Remove comments
                data = re.sub(r'//.*?\n|/\*.*?\*/', '', data, flags=re.DOTALL)
                # Remove trailing commas
                data = re.sub(r',\s*}', '}', data)
                data = re.sub(r',\s*\]', ']', data)
                
                parsed = json.loads(data)
            
            # Validate against schema if provided
            if schema:
                try:
                    validate(instance=parsed, schema=schema)
                    self.logger.info("JSON validated against schema successfully")
                except ValidationError as e:
                    self.logger.error(f"Schema validation failed: {e.message}")
                    raise
            
            return parsed
            
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON parsing error: {e}")
            # Try to provide helpful error information
            lines = data.split('\n')
            error_line = lines[e.lineno - 1] if e.lineno <= len(lines) else ''
            self.logger.error(f"Error at line {e.lineno}: {error_line[:100]}")
            raise
    
    def parse_xml(self, data: Union[str, bytes], schema_path: str = None,
                  remove_namespaces: bool = False) -> Dict:
        """
        Parse XML with multiple strategies and options.
        """
        try:
            if isinstance(data, str):
                data = data.encode('utf-8')
            
            # Parse with lxml for better performance and features
            if self.xml_config['parser'] == 'lxml':
                parser = etree.XMLParser(remove_blank_text=True, recover=True)
                root = etree.fromstring(data, parser)
                
                # Validate against schema if provided
                if schema_path:
                    self.validate_xml_schema(root, schema_path)
                
                # Remove namespaces if requested
                if remove_namespaces:
                    root = self.remove_xml_namespaces(root)
                
                # Convert to dictionary
                result = self.xml_to_dict_lxml(root)
            else:
                # Use ElementTree
                root = ET.fromstring(data)
                result = self.xml_to_dict_etree(root)
            
            return result
            
        except Exception as e:
            self.logger.error(f"XML parsing error: {e}")
            raise
    
    def xml_to_dict_lxml(self, element) -> Dict:
        """
        Convert lxml element to dictionary.
        """
        result = {}
        
        # Add attributes
        if element.attrib and self.xml_config['preserve_attributes']:
            result['@attributes'] = dict(element.attrib)
        
        # Add text content
        if element.text and element.text.strip():
            if len(element) == 0:  # No children
                if element.attrib:
                    result['#text'] = element.text.strip()
                else:
                    return element.text.strip()
            else:
                result['#text'] = element.text.strip()
        
        # Process children
        children = defaultdict(list)
        for child in element:
            child_data = self.xml_to_dict_lxml(child)
            children[child.tag].append(child_data)
        
        # Add children to result
        for tag, values in children.items():
            if len(values) == 1:
                result[tag] = values[0]
            else:
                result[tag] = values
        
        return result if result else None
    
    def xml_to_dict_etree(self, element) -> Dict:
        """
        Convert ElementTree element to dictionary.
        """
        def parse_element(elem):
            result = {}
            
            # Attributes
            if elem.attrib:
                result['@attributes'] = elem.attrib
            
            # Text content
            if elem.text and elem.text.strip():
                result['#text'] = elem.text.strip()
            
            # Children
            for child in elem:
                child_result = parse_element(child)
                
                if child.tag in result:
                    # Multiple children with same tag
                    if not isinstance(result[child.tag], list):
                        result[child.tag] = [result[child.tag]]
                    result[child.tag].append(child_result)
                else:
                    result[child.tag] = child_result
            
            # Simplify if only text content
            if len(result) == 1 and '#text' in result:
                return result['#text']
            
            return result if result else None
        
        return {element.tag: parse_element(element)}
    
    def remove_xml_namespaces(self, root):
        """
        Remove namespaces from XML element tree.
        """
        for elem in root.iter():
            # Remove namespace from tag
            elem.tag = elem.tag.split('}')[-1]
            
            # Remove namespace from attributes
            new_attrib = {}
            for key, value in elem.attrib.items():
                new_key = key.split('}')[-1]
                new_attrib[new_key] = value
            elem.attrib = new_attrib
        
        return root
    
    def validate_xml_schema(self, root, schema_path: str):
        """
        Validate XML against XSD schema.
        """
        try:
            with open(schema_path, 'r') as schema_file:
                schema_doc = etree.parse(schema_file)
                schema = etree.XMLSchema(schema_doc)
                
            if not schema.validate(root):
                errors = schema.error_log
                for error in errors:
                    self.logger.error(f"XML validation error: {error}")
                raise ValueError("XML schema validation failed")
            
            self.logger.info("XML validated against schema successfully")
            
        except Exception as e:
            self.logger.error(f"Schema validation error: {e}")
            raise
    
    def parse_yaml(self, data: Union[str, bytes]) -> Dict:
        """
        Parse YAML data safely.
        """
        try:
            if isinstance(data, bytes):
                data = data.decode('utf-8')
            
            # Use safe_load to prevent code execution
            parsed = yaml.safe_load(data)
            
            self.logger.info("YAML parsed successfully")
            return parsed
            
        except yaml.YAMLError as e:
            self.logger.error(f"YAML parsing error: {e}")
            raise
    
    def convert_format(self, data: Any, from_format: str, to_format: str,
                      pretty: bool = True) -> str:
        """
        Convert between different data formats.
        """
        # Parse input if it's a string
        if isinstance(data, str):
            if from_format == 'json':
                data = self.parse_json(data)
            elif from_format == 'xml':
                data = self.parse_xml(data)
            elif from_format == 'yaml':
                data = self.parse_yaml(data)
        
        # Convert to target format
        if to_format == 'json':
            return self.to_json(data, pretty)
        elif to_format == 'xml':
            return self.to_xml(data, pretty)
        elif to_format == 'yaml':
            return self.to_yaml(data)
        else:
            raise ValueError(f"Unsupported output format: {to_format}")
    
    def to_json(self, data: Any, pretty: bool = True) -> str:
        """
        Convert data to JSON string.
        """
        if pretty:
            return json.dumps(data, indent=2, ensure_ascii=False, default=str)
        else:
            return json.dumps(data, ensure_ascii=False, default=str)
    
    def to_xml(self, data: Dict, pretty: bool = True, root_name: str = 'root') -> str:
        """
        Convert dictionary to XML string.
        """
        def dict_to_element(tag, d):
            elem = ET.Element(tag)
            
            if isinstance(d, dict):
                for key, val in d.items():
                    if key == '@attributes':
                        elem.attrib.update(val)
                    elif key == '#text':
                        elem.text = str(val)
                    elif isinstance(val, list):
                        for item in val:
                            elem.append(dict_to_element(key, item))
                    else:
                        elem.append(dict_to_element(key, val))
            else:
                elem.text = str(d)
            
            return elem
        
        root = dict_to_element(root_name, data)
        
        if pretty:
            return self.prettify_xml(ET.tostring(root, encoding='unicode'))
        else:
            return ET.tostring(root, encoding='unicode')
    
    def prettify_xml(self, xml_string: str) -> str:
        """
        Prettify XML string.
        """
        parsed = minidom.parseString(xml_string)
        return parsed.toprettyxml(indent="  ")
    
    def to_yaml(self, data: Any) -> str:
        """
        Convert data to YAML string.
        """
        return yaml.dump(data, default_flow_style=False, allow_unicode=True)
    
    def query_json(self, data: Dict, path: str) -> Any:
        """
        Query JSON data using JSONPath-like syntax.
        
        Examples:
        - "$.store.book[0].title"
        - "$.store.book[*].author"
        - "$.store.book[?(@.price < 10)]"
        """
        from jsonpath_ng import parse
        
        try:
            jsonpath_expr = parse(path)
            matches = jsonpath_expr.find(data)
            
            if not matches:
                return None
            elif len(matches) == 1:
                return matches[0].value
            else:
                return [match.value for match in matches]
                
        except Exception as e:
            self.logger.error(f"JSONPath query error: {e}")
            return None
    
    def query_xml(self, xml_data: Union[str, bytes, ET.Element], 
                  xpath: str) -> List:
        """
        Query XML data using XPath.
        """
        try:
            if isinstance(xml_data, (str, bytes)):
                if isinstance(xml_data, str):
                    xml_data = xml_data.encode('utf-8')
                root = etree.fromstring(xml_data)
            else:
                root = xml_data
            
            results = root.xpath(xpath)
            
            # Convert elements to strings or values
            processed_results = []
            for result in results:
                if isinstance(result, etree._Element):
                    processed_results.append(etree.tostring(result, encoding='unicode'))
                else:
                    processed_results.append(result)
            
            return processed_results
            
        except Exception as e:
            self.logger.error(f"XPath query error: {e}")
            return []
    
    def merge_json(self, *json_objects, strategy: str = 'deep') -> Dict:
        """
        Merge multiple JSON objects.
        
        Strategies:
        - 'deep': Deep merge, combining nested structures
        - 'shallow': Shallow merge, last one wins
        - 'append': Append arrays instead of replacing
        """
        def deep_merge(dict1, dict2):
            result = dict1.copy()
            
            for key, value in dict2.items():
                if key in result:
                    if isinstance(result[key], dict) and isinstance(value, dict):
                        result[key] = deep_merge(result[key], value)
                    elif isinstance(result[key], list) and isinstance(value, list):
                        if strategy == 'append':
                            result[key].extend(value)
                        else:
                            result[key] = value
                    else:
                        result[key] = value
                else:
                    result[key] = value
            
            return result
        
        if strategy == 'shallow':
            result = {}
            for obj in json_objects:
                result.update(obj)
            return result
        else:
            result = {}
            for obj in json_objects:
                result = deep_merge(result, obj)
            return result
    
    def flatten_json(self, data: Dict, separator: str = '.', 
                     prefix: str = '') -> Dict:
        """
        Flatten nested JSON structure.
        """
        items = []
        
        for key, value in data.items():
            new_key = f"{prefix}{separator}{key}" if prefix else key
            
            if isinstance(value, dict):
                items.extend(
                    self.flatten_json(value, separator, new_key).items()
                )
            elif isinstance(value, list):
                for i, item in enumerate(value):
                    if isinstance(item, dict):
                        items.extend(
                            self.flatten_json(item, separator, f"{new_key}[{i}]").items()
                        )
                    else:
                        items.append((f"{new_key}[{i}]", item))
            else:
                items.append((new_key, value))
        
        return dict(items)
    
    def unflatten_json(self, data: Dict, separator: str = '.') -> Dict:
        """
        Unflatten a flattened JSON structure.
        """
        result = {}
        
        for key, value in data.items():
            parts = key.split(separator)
            d = result
            
            for part in parts[:-1]:
                # Handle array indices
                if '[' in part and ']' in part:
                    base, index = part.split('[')
                    index = int(index.rstrip(']'))
                    
                    if base not in d:
                        d[base] = []
                    
                    # Extend array if needed
                    while len(d[base]) <= index:
                        d[base].append({})
                    
                    d = d[base][index]
                else:
                    if part not in d:
                        d[part] = {}
                    d = d[part]
            
            # Set the final value
            final_key = parts[-1]
            if '[' in final_key and ']' in final_key:
                base, index = final_key.split('[')
                index = int(index.rstrip(']'))
                
                if base not in d:
                    d[base] = []
                
                while len(d[base]) <= index:
                    d[base].append(None)
                
                d[base][index] = value
            else:
                d[final_key] = value
        
        return result

class APIDataProcessor:
    """
    Process data from various API responses.
    """
    
    def __init__(self, parser: UniversalDataParser = None):
        self.parser = parser or UniversalDataParser()
        self.session = requests.Session()
        
    def fetch_and_parse(self, url: str, headers: Dict = None, 
                       params: Dict = None) -> Any:
        """
        Fetch data from API and parse response.
        """
        try:
            response = self.session.get(url, headers=headers, params=params)
            response.raise_for_status()
            
            # Detect content type
            content_type = response.headers.get('content-type', '')
            
            if 'json' in content_type:
                return response.json()
            elif 'xml' in content_type:
                return self.parser.parse_xml(response.content)
            else:
                # Try to detect format
                format_type = self.parser.detect_format(response.text)
                
                if format_type == 'json':
                    return response.json()
                elif format_type == 'xml':
                    return self.parser.parse_xml(response.content)
                elif format_type == 'html':
                    return self.parse_html(response.text)
                else:
                    return response.text
                    
        except requests.exceptions.RequestException as e:
            self.parser.logger.error(f"API request failed: {e}")
            raise
    
    def parse_html(self, html: str) -> Dict:
        """
        Parse HTML and extract structured data.
        """
        soup = BeautifulSoup(html, 'html.parser')
        
        # Extract common structured data
        result = {
            'title': soup.title.string if soup.title else None,
            'meta': {},
            'links': [],
            'images': [],
            'text': soup.get_text(strip=True)[:1000]  # First 1000 chars
        }
        
        # Extract meta tags
        for meta in soup.find_all('meta'):
            name = meta.get('name') or meta.get('property')
            content = meta.get('content')
            if name and content:
                result['meta'][name] = content
        
        # Extract links
        for link in soup.find_all('a', href=True):
            result['links'].append({
                'href': link['href'],
                'text': link.get_text(strip=True)
            })
        
        # Extract images
        for img in soup.find_all('img', src=True):
            result['images'].append({
                'src': img['src'],
                'alt': img.get('alt', '')
            })
        
        return result
    
    def process_paginated_api(self, base_url: str, 
                             page_param: str = 'page',
                             max_pages: int = None) -> List:
        """
        Process paginated API responses.
        """
        all_results = []
        page = 1
        
        while True:
            if max_pages and page > max_pages:
                break
            
            try:
                # Fetch page
                params = {page_param: page}
                data = self.fetch_and_parse(base_url, params=params)
                
                # Check if we have results
                if not data or (isinstance(data, dict) and not data.get('results')):
                    break
                
                # Add results
                if isinstance(data, dict):
                    results = data.get('results', data.get('data', []))
                    if not results:
                        break
                    all_results.extend(results)
                else:
                    all_results.extend(data)
                
                page += 1
                
            except Exception as e:
                self.parser.logger.error(f"Error processing page {page}: {e}")
                break
        
        return all_results

class DataTransformer:
    """
    Transform and manipulate structured data.
    """
    
    def __init__(self):
        self.transformations = []
    
    def add_transformation(self, func, *args, **kwargs):
        """
        Add a transformation to the pipeline.
        """
        self.transformations.append((func, args, kwargs))
        return self
    
    def apply(self, data: Any) -> Any:
        """
        Apply all transformations in sequence.
        """
        result = data
        
        for func, args, kwargs in self.transformations:
            result = func(result, *args, **kwargs)
        
        return result
    
    @staticmethod
    def filter_keys(data: Dict, keep: List[str] = None, 
                   remove: List[str] = None) -> Dict:
        """
        Filter dictionary keys.
        """
        if keep:
            return {k: v for k, v in data.items() if k in keep}
        elif remove:
            return {k: v for k, v in data.items() if k not in remove}
        return data
    
    @staticmethod
    def rename_keys(data: Dict, mapping: Dict[str, str]) -> Dict:
        """
        Rename dictionary keys.
        """
        result = {}
        for key, value in data.items():
            new_key = mapping.get(key, key)
            result[new_key] = value
        return result
    
    @staticmethod
    def apply_to_nested(data: Any, path: str, func, *args, **kwargs) -> Any:
        """
        Apply function to nested element.
        """
        def get_nested(d, keys):
            for key in keys:
                if isinstance(d, dict):
                    d = d.get(key)
                elif isinstance(d, list) and key.isdigit():
                    d = d[int(key)]
                else:
                    return None
            return d
        
        def set_nested(d, keys, value):
            for key in keys[:-1]:
                if key not in d:
                    d[key] = {}
                d = d[key]
            d[keys[-1]] = value
        
        keys = path.split('.')
        nested_value = get_nested(data, keys)
        
        if nested_value is not None:
            transformed = func(nested_value, *args, **kwargs)
            result = data.copy() if isinstance(data, dict) else data[:]
            set_nested(result, keys, transformed)
            return result
        
        return data

class SchemaGenerator:
    """
    Generate schemas from sample data.
    """
    
    @staticmethod
    def generate_json_schema(data: Any, title: str = "Generated Schema") -> Dict:
        """
        Generate JSON Schema from sample data.
        """
        def infer_type(value):
            if value is None:
                return {"type": "null"}
            elif isinstance(value, bool):
                return {"type": "boolean"}
            elif isinstance(value, int):
                return {"type": "integer"}
            elif isinstance(value, float):
                return {"type": "number"}
            elif isinstance(value, str):
                # Check for specific string formats
                if re.match(r'^\d{4}-\d{2}-\d{2}$', value):
                    return {"type": "string", "format": "date"}
                elif re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', value):
                    return {"type": "string", "format": "date-time"}
                elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
                    return {"type": "string", "format": "email"}
                elif re.match(r'^https?://', value):
                    return {"type": "string", "format": "uri"}
                else:
                    return {"type": "string"}
            elif isinstance(value, list):
                if not value:
                    return {"type": "array", "items": {}}
                
                # Infer items type from first element
                items_schema = infer_type(value[0])
                return {"type": "array", "items": items_schema}
            elif isinstance(value, dict):
                properties = {}
                required = []
                
                for key, val in value.items():
                    properties[key] = infer_type(val)
                    if val is not None:  # Consider non-null as required
                        required.append(key)
                
                schema = {"type": "object", "properties": properties}
                if required:
                    schema["required"] = required
                
                return schema
            else:
                return {"type": "string"}  # Default fallback
        
        schema = {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "title": title
        }
        
        base_schema = infer_type(data)
        schema.update(base_schema)
        
        return schema
    
    @staticmethod
    def generate_xsd_schema(xml_data: str, root_element: str = "root") -> str:
        """
        Generate XSD schema from sample XML.
        """
        # Parse XML
        root = ET.fromstring(xml_data)
        
        # Build XSD
        xsd = ET.Element("{http://www.w3.org/2001/XMLSchema}schema")
        xsd.set("elementFormDefault", "qualified")
        
        def process_element(element, parent_xsd):
            # Create element definition
            elem_def = ET.SubElement(parent_xsd, "{http://www.w3.org/2001/XMLSchema}element")
            elem_def.set("name", element.tag)
            
            # Check if it has children
            if len(element) > 0:
                complex_type = ET.SubElement(elem_def, "{http://www.w3.org/2001/XMLSchema}complexType")
                sequence = ET.SubElement(complex_type, "{http://www.w3.org/2001/XMLSchema}sequence")
                
                # Process children
                processed = set()
                for child in element:
                    if child.tag not in processed:
                        process_element(child, sequence)
                        processed.add(child.tag)
            else:
                # Simple type
                elem_def.set("type", "xs:string")
            
            # Add attributes
            if element.attrib:
                if len(element) == 0:
                    complex_type = ET.SubElement(elem_def, "{http://www.w3.org/2001/XMLSchema}complexType")
                    simple_content = ET.SubElement(complex_type, "{http://www.w3.org/2001/XMLSchema}simpleContent")
                    extension = ET.SubElement(simple_content, "{http://www.w3.org/2001/XMLSchema}extension")
                    extension.set("base", "xs:string")
                    
                    for attr_name in element.attrib:
                        attribute = ET.SubElement(extension, "{http://www.w3.org/2001/XMLSchema}attribute")
                        attribute.set("name", attr_name)
                        attribute.set("type", "xs:string")
        
        process_element(root, xsd)
        
        return ET.tostring(xsd, encoding='unicode')

# Example usage
if __name__ == "__main__":
    # Initialize parser
    parser = UniversalDataParser()
    
    # Example 1: Parse and validate JSON
    json_data = '''
    {
        "user": {
            "id": 123,
            "name": "John Doe",
            "email": "john@example.com",
            "roles": ["admin", "user"]
        }
    }
    '''
    
    parsed_json = parser.parse_json(json_data)
    print("Parsed JSON:", json.dumps(parsed_json, indent=2))
    
    # Generate schema from data
    schema_gen = SchemaGenerator()
    schema = schema_gen.generate_json_schema(parsed_json)
    print("\nGenerated JSON Schema:", json.dumps(schema, indent=2))
    
    # Example 2: Parse XML and convert to JSON
    xml_data = '''
    <?xml version="1.0"?>
    <catalog>
        <book id="1">
            <author>Gambardella, Matthew</author>
            <title>XML Developer's Guide</title>
            <price>44.95</price>
        </book>
        <book id="2">
            <author>Ralls, Kim</author>
            <title>Midnight Rain</title>
            <price>5.95</price>
        </book>
    </catalog>
    '''
    
    parsed_xml = parser.parse_xml(xml_data)
    print("\nParsed XML as dict:", json.dumps(parsed_xml, indent=2))
    
    # Convert XML to JSON
    json_from_xml = parser.convert_format(xml_data, 'xml', 'json')
    print("\nXML converted to JSON:", json_from_xml)
    
    # Example 3: Query JSON data
    books_data = {
        "store": {
            "book": [
                {"title": "Book 1", "price": 10.99, "author": "Author A"},
                {"title": "Book 2", "price": 8.99, "author": "Author B"},
                {"title": "Book 3", "price": 12.99, "author": "Author C"}
            ]
        }
    }
    
    # Query using JSONPath
    titles = parser.query_json(books_data, "$.store.book[*].title")
    print("\nBook titles:", titles)
    
    # Example 4: Transform data
    transformer = DataTransformer()
    
    # Build transformation pipeline
    transformer.add_transformation(
        DataTransformer.filter_keys, 
        keep=['title', 'price']
    ).add_transformation(
        DataTransformer.rename_keys,
        mapping={'title': 'name', 'price': 'cost'}
    )
    
    # Apply transformations
    book = books_data['store']['book'][0]
    transformed = transformer.apply(book)
    print("\nTransformed book:", transformed)
    
    # Example 5: Merge multiple JSON objects
    config1 = {
        "database": {
            "host": "localhost",
            "port": 5432
        },
        "cache": {
            "enabled": True
        }
    }
    
    config2 = {
        "database": {
            "username": "admin",
            "password": "secret"
        },
        "api": {
            "timeout": 30
        }
    }
    
    merged = parser.merge_json(config1, config2)
    print("\nMerged configuration:", json.dumps(merged, indent=2))
    
    # Example 6: Flatten and unflatten JSON
    nested_data = {
        "user": {
            "profile": {
                "name": "John",
                "age": 30
            },
            "settings": {
                "notifications": True
            }
        }
    }
    
    flattened = parser.flatten_json(nested_data)
    print("\nFlattened:", json.dumps(flattened, indent=2))
    
    unflattened = parser.unflatten_json(flattened)
    print("\nUnflattened:", json.dumps(unflattened, indent=2))
    
    print("\nāœ… JSON/XML parsing complete!")

Advanced Parsing Techniques šŸ”

Let's explore more sophisticated techniques for handling complex, real-world data scenarios!

sequenceDiagram participant Client participant Parser participant Validator participant Transformer participant Output Client->>Parser: Raw Data Parser->>Parser: Detect Format Parser->>Validator: Parsed Data Validator->>Validator: Schema Check Validator->>Transformer: Valid Data Transformer->>Transformer: Apply Rules Transformer->>Output: Formatted Result Output-->>Client: Processed Data
class AdvancedDataHandler:
    """
    Advanced techniques for complex data scenarios.
    """
    
    @staticmethod
    def handle_large_json_stream(file_path: str, callback):
        """
        Process large JSON files as streams.
        """
        import ijson
        
        with open(file_path, 'rb') as file:
            parser = ijson.items(file, 'item')
            
            for item in parser:
                result = callback(item)
                if result is False:  # Allow early termination
                    break
    
    @staticmethod
    def parse_ndjson(file_path: str) -> List[Dict]:
        """
        Parse newline-delimited JSON (NDJSON/JSONL).
        """
        results = []
        
        with open(file_path, 'r') as file:
            for line in file:
                if line.strip():
                    try:
                        obj = json.loads(line)
                        results.append(obj)
                    except json.JSONDecodeError as e:
                        print(f"Error parsing line: {e}")
        
        return results
    
    @staticmethod
    def parse_xml_with_namespaces(xml_data: str) -> Dict:
        """
        Parse XML with namespace handling.
        """
        namespaces = {
            'ns': 'http://example.com/namespace',
            'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
        }
        
        root = etree.fromstring(xml_data.encode())
        
        # Extract with namespace awareness
        result = {}
        
        for ns_prefix, ns_uri in namespaces.items():
            elements = root.xpath(f'//{{{ns_uri}}}*')
            
            for elem in elements:
                tag = elem.tag.split('}')[-1]
                result[tag] = elem.text
        
        return result
    
    @staticmethod
    def extract_json_from_text(text: str) -> List[Dict]:
        """
        Extract JSON objects from mixed text.
        """
        json_objects = []
        
        # Find JSON-like structures
        pattern = r'(\{[^{}]*\}|\[[^\[\]]*\])'
        potential_jsons = re.findall(pattern, text, re.DOTALL)
        
        for potential in potential_jsons:
            try:
                obj = json.loads(potential)
                json_objects.append(obj)
            except:
                # Try to fix common issues
                fixed = potential.replace("'", '"')
                try:
                    obj = json.loads(fixed)
                    json_objects.append(obj)
                except:
                    pass
        
        return json_objects
    
    @staticmethod
    def handle_circular_references(data: Any) -> str:
        """
        Serialize data with circular references.
        """
        seen = set()
        
        def serialize(obj, path="root"):
            if id(obj) in seen:
                return f""
            
            seen.add(id(obj))
            
            if isinstance(obj, dict):
                result = {}
                for key, value in obj.items():
                    result[key] = serialize(value, f"{path}.{key}")
                return result
            elif isinstance(obj, list):
                return [serialize(item, f"{path}[{i}]") 
                       for i, item in enumerate(obj)]
            else:
                return obj
        
        serialized = serialize(data)
        return json.dumps(serialized, indent=2, default=str)

class DataValidator:
    """
    Advanced data validation techniques.
    """
    
    @staticmethod
    def validate_json_structure(data: Dict, template: Dict) -> List[str]:
        """
        Validate JSON structure against a template.
        """
        errors = []
        
        def check_structure(actual, expected, path=""):
            if type(actual) != type(expected):
                errors.append(f"Type mismatch at {path}: expected {type(expected)}, got {type(actual)}")
                return
            
            if isinstance(expected, dict):
                for key in expected:
                    if key not in actual:
                        errors.append(f"Missing key at {path}.{key}")
                    else:
                        check_structure(actual[key], expected[key], f"{path}.{key}")
            
            elif isinstance(expected, list) and len(expected) > 0:
                if len(actual) == 0:
                    errors.append(f"Empty array at {path}")
                else:
                    # Check first element as template
                    for i, item in enumerate(actual):
                        check_structure(item, expected[0], f"{path}[{i}]")
        
        check_structure(data, template)
        return errors
    
    @staticmethod
    def validate_data_types(data: Dict, type_map: Dict) -> List[str]:
        """
        Validate data types in a structure.
        """
        errors = []
        
        for key, expected_type in type_map.items():
            if key in data:
                value = data[key]
                
                if expected_type == 'email':
                    if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', str(value)):
                        errors.append(f"Invalid email format: {key}")
                
                elif expected_type == 'url':
                    if not re.match(r'^https?://', str(value)):
                        errors.append(f"Invalid URL format: {key}")
                
                elif expected_type == 'date':
                    try:
                        datetime.strptime(str(value), '%Y-%m-%d')
                    except:
                        errors.append(f"Invalid date format: {key}")
                
                elif expected_type == 'phone':
                    if not re.match(r'^[\d\s\-\(\)\+]+$', str(value)):
                        errors.append(f"Invalid phone format: {key}")
        
        return errors

Key Takeaways and Best Practices šŸŽÆ

JSON/XML Parsing Best Practices šŸ“‹

Pro Tip: The key to mastering data parsing is understanding that data is rarely perfect. Real-world JSON has comments and trailing commas, XML has namespaces and CDATA sections, and APIs return inconsistent structures. Build your parsers defensively - validate everything, handle edge cases, and always have a plan B. Remember: it's better to reject bad data early than to process it incorrectly!

JSON and XML parsing mastery transforms you from a data consumer to a data conductor. You can integrate any API, process any configuration, and transform any data structure. Whether you're building microservices, data pipelines, or automation systems, these skills are fundamental to modern software development! šŸš€