š JSON/XML Parsing: Navigate Complex Data Structures
JSON and XML are the languages of the internet. APIs speak JSON, configuration files use XML, and every modern application exchanges data in these formats. They're like the DNA of digital communication - structured, hierarchical, and sometimes surprisingly complex. Let's master the art of parsing, transforming, and generating these ubiquitous data formats! š
The Architecture of Structured Data
Think of JSON as a modern skyscraper - clean lines, efficient structure, easy to navigate. XML is like a Gothic cathedral - ornate, detailed, with namespaces like flying buttresses supporting complex structures. Both have their place, and Python gives you the tools to be an architect of both!
Real-World Scenario: The API Integration Hub š
You're building an integration platform that connects 20 different services. Each API returns different JSON structures, legacy systems send XML, configuration comes in YAML, and you need to transform everything into a unified format for your data warehouse. Let's build a universal parser!
import json
import xml.etree.ElementTree as ET
import xml.dom.minidom as minidom
from lxml import etree
import xmltodict
import yaml
import jsonschema
from jsonschema import validate, ValidationError
import xmlschema
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
import re
from datetime import datetime
from collections import defaultdict, OrderedDict
import logging
from json import JSONEncoder
import requests
from bs4 import BeautifulSoup
class UniversalDataParser:
"""
Comprehensive parser for JSON, XML, YAML and other structured data formats.
"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.setup_logging()
# JSON configuration
self.json_config = {
'strict': False,
'ensure_ascii': False,
'indent': 2,
'sort_keys': False
}
# XML configuration
self.xml_config = {
'parser': 'lxml', # 'lxml' or 'etree'
'remove_namespaces': False,
'preserve_attributes': True,
'encoding': 'utf-8'
}
# Schema cache
self.schema_cache = {}
def setup_logging(self):
"""Setup logging for parser operations."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def detect_format(self, data: Union[str, bytes], file_path: str = None) -> str:
"""
Intelligently detect data format.
"""
# Check file extension if provided
if file_path:
ext = Path(file_path).suffix.lower()
if ext in ['.json', '.jsonl']:
return 'json'
elif ext in ['.xml', '.xsd']:
return 'xml'
elif ext in ['.yaml', '.yml']:
return 'yaml'
elif ext == '.html':
return 'html'
# Content-based detection
if isinstance(data, bytes):
data = data.decode('utf-8', errors='ignore')
data_stripped = data.strip()
# JSON detection
if (data_stripped.startswith('{') and data_stripped.endswith('}')) or \
(data_stripped.startswith('[') and data_stripped.endswith(']')):
try:
json.loads(data_stripped)
return 'json'
except:
pass
# XML detection
if data_stripped.startswith('<') and ('<?xml' in data_stripped or '<!' in data_stripped):
try:
ET.fromstring(data_stripped)
return 'xml'
except:
pass
# YAML detection
if ':' in data_stripped and not data_stripped.startswith('<'):
try:
yaml.safe_load(data_stripped)
return 'yaml'
except:
pass
# HTML detection
if '<html' in data_stripped.lower() or '<!doctype html' in data_stripped.lower():
return 'html'
return 'unknown'
def parse_json(self, data: Union[str, bytes], schema: Dict = None,
strict: bool = False) -> Dict:
"""
Parse JSON with validation and error handling.
"""
try:
# Parse JSON
if isinstance(data, bytes):
data = data.decode('utf-8')
if strict:
# Strict parsing - no comments, trailing commas, etc.
parsed = json.loads(data, strict=True)
else:
# Lenient parsing - try to fix common issues
# Remove comments
data = re.sub(r'//.*?\n|/\*.*?\*/', '', data, flags=re.DOTALL)
# Remove trailing commas
data = re.sub(r',\s*}', '}', data)
data = re.sub(r',\s*\]', ']', data)
parsed = json.loads(data)
# Validate against schema if provided
if schema:
try:
validate(instance=parsed, schema=schema)
self.logger.info("JSON validated against schema successfully")
except ValidationError as e:
self.logger.error(f"Schema validation failed: {e.message}")
raise
return parsed
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error: {e}")
# Try to provide helpful error information
lines = data.split('\n')
error_line = lines[e.lineno - 1] if e.lineno <= len(lines) else ''
self.logger.error(f"Error at line {e.lineno}: {error_line[:100]}")
raise
def parse_xml(self, data: Union[str, bytes], schema_path: str = None,
remove_namespaces: bool = False) -> Dict:
"""
Parse XML with multiple strategies and options.
"""
try:
if isinstance(data, str):
data = data.encode('utf-8')
# Parse with lxml for better performance and features
if self.xml_config['parser'] == 'lxml':
parser = etree.XMLParser(remove_blank_text=True, recover=True)
root = etree.fromstring(data, parser)
# Validate against schema if provided
if schema_path:
self.validate_xml_schema(root, schema_path)
# Remove namespaces if requested
if remove_namespaces:
root = self.remove_xml_namespaces(root)
# Convert to dictionary
result = self.xml_to_dict_lxml(root)
else:
# Use ElementTree
root = ET.fromstring(data)
result = self.xml_to_dict_etree(root)
return result
except Exception as e:
self.logger.error(f"XML parsing error: {e}")
raise
def xml_to_dict_lxml(self, element) -> Dict:
"""
Convert lxml element to dictionary.
"""
result = {}
# Add attributes
if element.attrib and self.xml_config['preserve_attributes']:
result['@attributes'] = dict(element.attrib)
# Add text content
if element.text and element.text.strip():
if len(element) == 0: # No children
if element.attrib:
result['#text'] = element.text.strip()
else:
return element.text.strip()
else:
result['#text'] = element.text.strip()
# Process children
children = defaultdict(list)
for child in element:
child_data = self.xml_to_dict_lxml(child)
children[child.tag].append(child_data)
# Add children to result
for tag, values in children.items():
if len(values) == 1:
result[tag] = values[0]
else:
result[tag] = values
return result if result else None
def xml_to_dict_etree(self, element) -> Dict:
"""
Convert ElementTree element to dictionary.
"""
def parse_element(elem):
result = {}
# Attributes
if elem.attrib:
result['@attributes'] = elem.attrib
# Text content
if elem.text and elem.text.strip():
result['#text'] = elem.text.strip()
# Children
for child in elem:
child_result = parse_element(child)
if child.tag in result:
# Multiple children with same tag
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_result)
else:
result[child.tag] = child_result
# Simplify if only text content
if len(result) == 1 and '#text' in result:
return result['#text']
return result if result else None
return {element.tag: parse_element(element)}
def remove_xml_namespaces(self, root):
"""
Remove namespaces from XML element tree.
"""
for elem in root.iter():
# Remove namespace from tag
elem.tag = elem.tag.split('}')[-1]
# Remove namespace from attributes
new_attrib = {}
for key, value in elem.attrib.items():
new_key = key.split('}')[-1]
new_attrib[new_key] = value
elem.attrib = new_attrib
return root
def validate_xml_schema(self, root, schema_path: str):
"""
Validate XML against XSD schema.
"""
try:
with open(schema_path, 'r') as schema_file:
schema_doc = etree.parse(schema_file)
schema = etree.XMLSchema(schema_doc)
if not schema.validate(root):
errors = schema.error_log
for error in errors:
self.logger.error(f"XML validation error: {error}")
raise ValueError("XML schema validation failed")
self.logger.info("XML validated against schema successfully")
except Exception as e:
self.logger.error(f"Schema validation error: {e}")
raise
def parse_yaml(self, data: Union[str, bytes]) -> Dict:
"""
Parse YAML data safely.
"""
try:
if isinstance(data, bytes):
data = data.decode('utf-8')
# Use safe_load to prevent code execution
parsed = yaml.safe_load(data)
self.logger.info("YAML parsed successfully")
return parsed
except yaml.YAMLError as e:
self.logger.error(f"YAML parsing error: {e}")
raise
def convert_format(self, data: Any, from_format: str, to_format: str,
pretty: bool = True) -> str:
"""
Convert between different data formats.
"""
# Parse input if it's a string
if isinstance(data, str):
if from_format == 'json':
data = self.parse_json(data)
elif from_format == 'xml':
data = self.parse_xml(data)
elif from_format == 'yaml':
data = self.parse_yaml(data)
# Convert to target format
if to_format == 'json':
return self.to_json(data, pretty)
elif to_format == 'xml':
return self.to_xml(data, pretty)
elif to_format == 'yaml':
return self.to_yaml(data)
else:
raise ValueError(f"Unsupported output format: {to_format}")
def to_json(self, data: Any, pretty: bool = True) -> str:
"""
Convert data to JSON string.
"""
if pretty:
return json.dumps(data, indent=2, ensure_ascii=False, default=str)
else:
return json.dumps(data, ensure_ascii=False, default=str)
def to_xml(self, data: Dict, pretty: bool = True, root_name: str = 'root') -> str:
"""
Convert dictionary to XML string.
"""
def dict_to_element(tag, d):
elem = ET.Element(tag)
if isinstance(d, dict):
for key, val in d.items():
if key == '@attributes':
elem.attrib.update(val)
elif key == '#text':
elem.text = str(val)
elif isinstance(val, list):
for item in val:
elem.append(dict_to_element(key, item))
else:
elem.append(dict_to_element(key, val))
else:
elem.text = str(d)
return elem
root = dict_to_element(root_name, data)
if pretty:
return self.prettify_xml(ET.tostring(root, encoding='unicode'))
else:
return ET.tostring(root, encoding='unicode')
def prettify_xml(self, xml_string: str) -> str:
"""
Prettify XML string.
"""
parsed = minidom.parseString(xml_string)
return parsed.toprettyxml(indent=" ")
def to_yaml(self, data: Any) -> str:
"""
Convert data to YAML string.
"""
return yaml.dump(data, default_flow_style=False, allow_unicode=True)
def query_json(self, data: Dict, path: str) -> Any:
"""
Query JSON data using JSONPath-like syntax.
Examples:
- "$.store.book[0].title"
- "$.store.book[*].author"
- "$.store.book[?(@.price < 10)]"
"""
from jsonpath_ng import parse
try:
jsonpath_expr = parse(path)
matches = jsonpath_expr.find(data)
if not matches:
return None
elif len(matches) == 1:
return matches[0].value
else:
return [match.value for match in matches]
except Exception as e:
self.logger.error(f"JSONPath query error: {e}")
return None
def query_xml(self, xml_data: Union[str, bytes, ET.Element],
xpath: str) -> List:
"""
Query XML data using XPath.
"""
try:
if isinstance(xml_data, (str, bytes)):
if isinstance(xml_data, str):
xml_data = xml_data.encode('utf-8')
root = etree.fromstring(xml_data)
else:
root = xml_data
results = root.xpath(xpath)
# Convert elements to strings or values
processed_results = []
for result in results:
if isinstance(result, etree._Element):
processed_results.append(etree.tostring(result, encoding='unicode'))
else:
processed_results.append(result)
return processed_results
except Exception as e:
self.logger.error(f"XPath query error: {e}")
return []
def merge_json(self, *json_objects, strategy: str = 'deep') -> Dict:
"""
Merge multiple JSON objects.
Strategies:
- 'deep': Deep merge, combining nested structures
- 'shallow': Shallow merge, last one wins
- 'append': Append arrays instead of replacing
"""
def deep_merge(dict1, dict2):
result = dict1.copy()
for key, value in dict2.items():
if key in result:
if isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
elif isinstance(result[key], list) and isinstance(value, list):
if strategy == 'append':
result[key].extend(value)
else:
result[key] = value
else:
result[key] = value
else:
result[key] = value
return result
if strategy == 'shallow':
result = {}
for obj in json_objects:
result.update(obj)
return result
else:
result = {}
for obj in json_objects:
result = deep_merge(result, obj)
return result
def flatten_json(self, data: Dict, separator: str = '.',
prefix: str = '') -> Dict:
"""
Flatten nested JSON structure.
"""
items = []
for key, value in data.items():
new_key = f"{prefix}{separator}{key}" if prefix else key
if isinstance(value, dict):
items.extend(
self.flatten_json(value, separator, new_key).items()
)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
items.extend(
self.flatten_json(item, separator, f"{new_key}[{i}]").items()
)
else:
items.append((f"{new_key}[{i}]", item))
else:
items.append((new_key, value))
return dict(items)
def unflatten_json(self, data: Dict, separator: str = '.') -> Dict:
"""
Unflatten a flattened JSON structure.
"""
result = {}
for key, value in data.items():
parts = key.split(separator)
d = result
for part in parts[:-1]:
# Handle array indices
if '[' in part and ']' in part:
base, index = part.split('[')
index = int(index.rstrip(']'))
if base not in d:
d[base] = []
# Extend array if needed
while len(d[base]) <= index:
d[base].append({})
d = d[base][index]
else:
if part not in d:
d[part] = {}
d = d[part]
# Set the final value
final_key = parts[-1]
if '[' in final_key and ']' in final_key:
base, index = final_key.split('[')
index = int(index.rstrip(']'))
if base not in d:
d[base] = []
while len(d[base]) <= index:
d[base].append(None)
d[base][index] = value
else:
d[final_key] = value
return result
class APIDataProcessor:
"""
Process data from various API responses.
"""
def __init__(self, parser: UniversalDataParser = None):
self.parser = parser or UniversalDataParser()
self.session = requests.Session()
def fetch_and_parse(self, url: str, headers: Dict = None,
params: Dict = None) -> Any:
"""
Fetch data from API and parse response.
"""
try:
response = self.session.get(url, headers=headers, params=params)
response.raise_for_status()
# Detect content type
content_type = response.headers.get('content-type', '')
if 'json' in content_type:
return response.json()
elif 'xml' in content_type:
return self.parser.parse_xml(response.content)
else:
# Try to detect format
format_type = self.parser.detect_format(response.text)
if format_type == 'json':
return response.json()
elif format_type == 'xml':
return self.parser.parse_xml(response.content)
elif format_type == 'html':
return self.parse_html(response.text)
else:
return response.text
except requests.exceptions.RequestException as e:
self.parser.logger.error(f"API request failed: {e}")
raise
def parse_html(self, html: str) -> Dict:
"""
Parse HTML and extract structured data.
"""
soup = BeautifulSoup(html, 'html.parser')
# Extract common structured data
result = {
'title': soup.title.string if soup.title else None,
'meta': {},
'links': [],
'images': [],
'text': soup.get_text(strip=True)[:1000] # First 1000 chars
}
# Extract meta tags
for meta in soup.find_all('meta'):
name = meta.get('name') or meta.get('property')
content = meta.get('content')
if name and content:
result['meta'][name] = content
# Extract links
for link in soup.find_all('a', href=True):
result['links'].append({
'href': link['href'],
'text': link.get_text(strip=True)
})
# Extract images
for img in soup.find_all('img', src=True):
result['images'].append({
'src': img['src'],
'alt': img.get('alt', '')
})
return result
def process_paginated_api(self, base_url: str,
page_param: str = 'page',
max_pages: int = None) -> List:
"""
Process paginated API responses.
"""
all_results = []
page = 1
while True:
if max_pages and page > max_pages:
break
try:
# Fetch page
params = {page_param: page}
data = self.fetch_and_parse(base_url, params=params)
# Check if we have results
if not data or (isinstance(data, dict) and not data.get('results')):
break
# Add results
if isinstance(data, dict):
results = data.get('results', data.get('data', []))
if not results:
break
all_results.extend(results)
else:
all_results.extend(data)
page += 1
except Exception as e:
self.parser.logger.error(f"Error processing page {page}: {e}")
break
return all_results
class DataTransformer:
"""
Transform and manipulate structured data.
"""
def __init__(self):
self.transformations = []
def add_transformation(self, func, *args, **kwargs):
"""
Add a transformation to the pipeline.
"""
self.transformations.append((func, args, kwargs))
return self
def apply(self, data: Any) -> Any:
"""
Apply all transformations in sequence.
"""
result = data
for func, args, kwargs in self.transformations:
result = func(result, *args, **kwargs)
return result
@staticmethod
def filter_keys(data: Dict, keep: List[str] = None,
remove: List[str] = None) -> Dict:
"""
Filter dictionary keys.
"""
if keep:
return {k: v for k, v in data.items() if k in keep}
elif remove:
return {k: v for k, v in data.items() if k not in remove}
return data
@staticmethod
def rename_keys(data: Dict, mapping: Dict[str, str]) -> Dict:
"""
Rename dictionary keys.
"""
result = {}
for key, value in data.items():
new_key = mapping.get(key, key)
result[new_key] = value
return result
@staticmethod
def apply_to_nested(data: Any, path: str, func, *args, **kwargs) -> Any:
"""
Apply function to nested element.
"""
def get_nested(d, keys):
for key in keys:
if isinstance(d, dict):
d = d.get(key)
elif isinstance(d, list) and key.isdigit():
d = d[int(key)]
else:
return None
return d
def set_nested(d, keys, value):
for key in keys[:-1]:
if key not in d:
d[key] = {}
d = d[key]
d[keys[-1]] = value
keys = path.split('.')
nested_value = get_nested(data, keys)
if nested_value is not None:
transformed = func(nested_value, *args, **kwargs)
result = data.copy() if isinstance(data, dict) else data[:]
set_nested(result, keys, transformed)
return result
return data
class SchemaGenerator:
"""
Generate schemas from sample data.
"""
@staticmethod
def generate_json_schema(data: Any, title: str = "Generated Schema") -> Dict:
"""
Generate JSON Schema from sample data.
"""
def infer_type(value):
if value is None:
return {"type": "null"}
elif isinstance(value, bool):
return {"type": "boolean"}
elif isinstance(value, int):
return {"type": "integer"}
elif isinstance(value, float):
return {"type": "number"}
elif isinstance(value, str):
# Check for specific string formats
if re.match(r'^\d{4}-\d{2}-\d{2}$', value):
return {"type": "string", "format": "date"}
elif re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', value):
return {"type": "string", "format": "date-time"}
elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return {"type": "string", "format": "email"}
elif re.match(r'^https?://', value):
return {"type": "string", "format": "uri"}
else:
return {"type": "string"}
elif isinstance(value, list):
if not value:
return {"type": "array", "items": {}}
# Infer items type from first element
items_schema = infer_type(value[0])
return {"type": "array", "items": items_schema}
elif isinstance(value, dict):
properties = {}
required = []
for key, val in value.items():
properties[key] = infer_type(val)
if val is not None: # Consider non-null as required
required.append(key)
schema = {"type": "object", "properties": properties}
if required:
schema["required"] = required
return schema
else:
return {"type": "string"} # Default fallback
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": title
}
base_schema = infer_type(data)
schema.update(base_schema)
return schema
@staticmethod
def generate_xsd_schema(xml_data: str, root_element: str = "root") -> str:
"""
Generate XSD schema from sample XML.
"""
# Parse XML
root = ET.fromstring(xml_data)
# Build XSD
xsd = ET.Element("{http://www.w3.org/2001/XMLSchema}schema")
xsd.set("elementFormDefault", "qualified")
def process_element(element, parent_xsd):
# Create element definition
elem_def = ET.SubElement(parent_xsd, "{http://www.w3.org/2001/XMLSchema}element")
elem_def.set("name", element.tag)
# Check if it has children
if len(element) > 0:
complex_type = ET.SubElement(elem_def, "{http://www.w3.org/2001/XMLSchema}complexType")
sequence = ET.SubElement(complex_type, "{http://www.w3.org/2001/XMLSchema}sequence")
# Process children
processed = set()
for child in element:
if child.tag not in processed:
process_element(child, sequence)
processed.add(child.tag)
else:
# Simple type
elem_def.set("type", "xs:string")
# Add attributes
if element.attrib:
if len(element) == 0:
complex_type = ET.SubElement(elem_def, "{http://www.w3.org/2001/XMLSchema}complexType")
simple_content = ET.SubElement(complex_type, "{http://www.w3.org/2001/XMLSchema}simpleContent")
extension = ET.SubElement(simple_content, "{http://www.w3.org/2001/XMLSchema}extension")
extension.set("base", "xs:string")
for attr_name in element.attrib:
attribute = ET.SubElement(extension, "{http://www.w3.org/2001/XMLSchema}attribute")
attribute.set("name", attr_name)
attribute.set("type", "xs:string")
process_element(root, xsd)
return ET.tostring(xsd, encoding='unicode')
# Example usage
if __name__ == "__main__":
# Initialize parser
parser = UniversalDataParser()
# Example 1: Parse and validate JSON
json_data = '''
{
"user": {
"id": 123,
"name": "John Doe",
"email": "john@example.com",
"roles": ["admin", "user"]
}
}
'''
parsed_json = parser.parse_json(json_data)
print("Parsed JSON:", json.dumps(parsed_json, indent=2))
# Generate schema from data
schema_gen = SchemaGenerator()
schema = schema_gen.generate_json_schema(parsed_json)
print("\nGenerated JSON Schema:", json.dumps(schema, indent=2))
# Example 2: Parse XML and convert to JSON
xml_data = '''
<?xml version="1.0"?>
<catalog>
<book id="1">
<author>Gambardella, Matthew</author>
<title>XML Developer's Guide</title>
<price>44.95</price>
</book>
<book id="2">
<author>Ralls, Kim</author>
<title>Midnight Rain</title>
<price>5.95</price>
</book>
</catalog>
'''
parsed_xml = parser.parse_xml(xml_data)
print("\nParsed XML as dict:", json.dumps(parsed_xml, indent=2))
# Convert XML to JSON
json_from_xml = parser.convert_format(xml_data, 'xml', 'json')
print("\nXML converted to JSON:", json_from_xml)
# Example 3: Query JSON data
books_data = {
"store": {
"book": [
{"title": "Book 1", "price": 10.99, "author": "Author A"},
{"title": "Book 2", "price": 8.99, "author": "Author B"},
{"title": "Book 3", "price": 12.99, "author": "Author C"}
]
}
}
# Query using JSONPath
titles = parser.query_json(books_data, "$.store.book[*].title")
print("\nBook titles:", titles)
# Example 4: Transform data
transformer = DataTransformer()
# Build transformation pipeline
transformer.add_transformation(
DataTransformer.filter_keys,
keep=['title', 'price']
).add_transformation(
DataTransformer.rename_keys,
mapping={'title': 'name', 'price': 'cost'}
)
# Apply transformations
book = books_data['store']['book'][0]
transformed = transformer.apply(book)
print("\nTransformed book:", transformed)
# Example 5: Merge multiple JSON objects
config1 = {
"database": {
"host": "localhost",
"port": 5432
},
"cache": {
"enabled": True
}
}
config2 = {
"database": {
"username": "admin",
"password": "secret"
},
"api": {
"timeout": 30
}
}
merged = parser.merge_json(config1, config2)
print("\nMerged configuration:", json.dumps(merged, indent=2))
# Example 6: Flatten and unflatten JSON
nested_data = {
"user": {
"profile": {
"name": "John",
"age": 30
},
"settings": {
"notifications": True
}
}
}
flattened = parser.flatten_json(nested_data)
print("\nFlattened:", json.dumps(flattened, indent=2))
unflattened = parser.unflatten_json(flattened)
print("\nUnflattened:", json.dumps(unflattened, indent=2))
print("\nā
JSON/XML parsing complete!")
Advanced Parsing Techniques š
Let's explore more sophisticated techniques for handling complex, real-world data scenarios!
class AdvancedDataHandler:
"""
Advanced techniques for complex data scenarios.
"""
@staticmethod
def handle_large_json_stream(file_path: str, callback):
"""
Process large JSON files as streams.
"""
import ijson
with open(file_path, 'rb') as file:
parser = ijson.items(file, 'item')
for item in parser:
result = callback(item)
if result is False: # Allow early termination
break
@staticmethod
def parse_ndjson(file_path: str) -> List[Dict]:
"""
Parse newline-delimited JSON (NDJSON/JSONL).
"""
results = []
with open(file_path, 'r') as file:
for line in file:
if line.strip():
try:
obj = json.loads(line)
results.append(obj)
except json.JSONDecodeError as e:
print(f"Error parsing line: {e}")
return results
@staticmethod
def parse_xml_with_namespaces(xml_data: str) -> Dict:
"""
Parse XML with namespace handling.
"""
namespaces = {
'ns': 'http://example.com/namespace',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
}
root = etree.fromstring(xml_data.encode())
# Extract with namespace awareness
result = {}
for ns_prefix, ns_uri in namespaces.items():
elements = root.xpath(f'//{{{ns_uri}}}*')
for elem in elements:
tag = elem.tag.split('}')[-1]
result[tag] = elem.text
return result
@staticmethod
def extract_json_from_text(text: str) -> List[Dict]:
"""
Extract JSON objects from mixed text.
"""
json_objects = []
# Find JSON-like structures
pattern = r'(\{[^{}]*\}|\[[^\[\]]*\])'
potential_jsons = re.findall(pattern, text, re.DOTALL)
for potential in potential_jsons:
try:
obj = json.loads(potential)
json_objects.append(obj)
except:
# Try to fix common issues
fixed = potential.replace("'", '"')
try:
obj = json.loads(fixed)
json_objects.append(obj)
except:
pass
return json_objects
@staticmethod
def handle_circular_references(data: Any) -> str:
"""
Serialize data with circular references.
"""
seen = set()
def serialize(obj, path="root"):
if id(obj) in seen:
return f""
seen.add(id(obj))
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
result[key] = serialize(value, f"{path}.{key}")
return result
elif isinstance(obj, list):
return [serialize(item, f"{path}[{i}]")
for i, item in enumerate(obj)]
else:
return obj
serialized = serialize(data)
return json.dumps(serialized, indent=2, default=str)
class DataValidator:
"""
Advanced data validation techniques.
"""
@staticmethod
def validate_json_structure(data: Dict, template: Dict) -> List[str]:
"""
Validate JSON structure against a template.
"""
errors = []
def check_structure(actual, expected, path=""):
if type(actual) != type(expected):
errors.append(f"Type mismatch at {path}: expected {type(expected)}, got {type(actual)}")
return
if isinstance(expected, dict):
for key in expected:
if key not in actual:
errors.append(f"Missing key at {path}.{key}")
else:
check_structure(actual[key], expected[key], f"{path}.{key}")
elif isinstance(expected, list) and len(expected) > 0:
if len(actual) == 0:
errors.append(f"Empty array at {path}")
else:
# Check first element as template
for i, item in enumerate(actual):
check_structure(item, expected[0], f"{path}[{i}]")
check_structure(data, template)
return errors
@staticmethod
def validate_data_types(data: Dict, type_map: Dict) -> List[str]:
"""
Validate data types in a structure.
"""
errors = []
for key, expected_type in type_map.items():
if key in data:
value = data[key]
if expected_type == 'email':
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', str(value)):
errors.append(f"Invalid email format: {key}")
elif expected_type == 'url':
if not re.match(r'^https?://', str(value)):
errors.append(f"Invalid URL format: {key}")
elif expected_type == 'date':
try:
datetime.strptime(str(value), '%Y-%m-%d')
except:
errors.append(f"Invalid date format: {key}")
elif expected_type == 'phone':
if not re.match(r'^[\d\s\-\(\)\+]+$', str(value)):
errors.append(f"Invalid phone format: {key}")
return errors
Key Takeaways and Best Practices šÆ
- Always Validate: Never trust external data. Validate against schemas before processing.
- Handle Encodings: JSON should be UTF-8, but XML can have various encodings declared.
- Use Appropriate Parsers: json for simple JSON, lxml for complex XML, BeautifulSoup for HTML.
- Stream Large Files: Use streaming parsers (ijson, iterparse) for files too large for memory.
- Preserve Data Types: Be careful with number precision and date formats during conversions.
- Handle Namespaces: XML namespaces can be tricky - decide whether to preserve or strip them.
- Error Recovery: Implement graceful error handling for malformed data.
JSON/XML Parsing Best Practices š
JSON and XML parsing mastery transforms you from a data consumer to a data conductor. You can integrate any API, process any configuration, and transform any data structure. Whether you're building microservices, data pipelines, or automation systems, these skills are fundamental to modern software development! š
Pro Tip: The key to mastering data parsing is understanding that data is rarely perfect. Real-world JSON has comments and trailing commas, XML has namespaces and CDATA sections, and APIs return inconsistent structures. Build your parsers defensively - validate everything, handle edge cases, and always have a plan B. Remember: it's better to reject bad data early than to process it incorrectly!