Migrate to gitea
This commit is contained in:
419
scripts/transform_wiktionary.py
Normal file
419
scripts/transform_wiktionary.py
Normal file
@@ -0,0 +1,419 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Universal Wiktionary Format Transformer
|
||||
========================================
|
||||
Transforms any Wiktionary JSON format to a standardized universal schema.
|
||||
|
||||
Usage:
|
||||
python transform_wiktionary.py input.jsonl output.jsonl
|
||||
python transform_wiktionary.py input.jsonl output.jsonl --validate
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
from typing import Dict, List, Any, Optional
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class WiktionaryTransformer:
|
||||
"""Transforms Wiktionary entries to universal format."""
|
||||
|
||||
def __init__(self, validate: bool = False):
|
||||
self.validate = validate
|
||||
self.stats = {
|
||||
"total": 0,
|
||||
"successful": 0,
|
||||
"errors": 0,
|
||||
"warnings": []
|
||||
}
|
||||
|
||||
def transform_entry(self, raw_entry: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Transform a single Wiktionary entry to universal format.
|
||||
|
||||
Args:
|
||||
raw_entry: Raw entry from any Wiktionary edition
|
||||
|
||||
Returns:
|
||||
Transformed entry in universal format
|
||||
"""
|
||||
# === REQUIRED CORE FIELDS ===
|
||||
try:
|
||||
universal = {
|
||||
"word": raw_entry["word"],
|
||||
"lang_code": raw_entry["lang_code"],
|
||||
"pos": raw_entry["pos"],
|
||||
"senses": raw_entry["senses"]
|
||||
}
|
||||
except KeyError as e:
|
||||
raise ValueError(f"Missing required field: {e}")
|
||||
|
||||
# === PHONETICS ===
|
||||
phonetics = self._extract_phonetics(raw_entry)
|
||||
if phonetics:
|
||||
universal["phonetics"] = phonetics
|
||||
|
||||
# === HYPHENATION ===
|
||||
hyphenation = self._extract_hyphenation(raw_entry)
|
||||
if hyphenation:
|
||||
universal["hyphenation"] = hyphenation
|
||||
|
||||
# === FORMS ===
|
||||
if "forms" in raw_entry:
|
||||
universal["forms"] = raw_entry["forms"]
|
||||
|
||||
# === GRAMMATICAL FEATURES ===
|
||||
grammatical = self._extract_grammatical_features(raw_entry)
|
||||
if grammatical:
|
||||
universal["grammatical_features"] = grammatical
|
||||
|
||||
# === ETYMOLOGY ===
|
||||
etymology = self._extract_etymology(raw_entry)
|
||||
if etymology:
|
||||
universal["etymology"] = etymology
|
||||
|
||||
# === RELATIONS ===
|
||||
relations = self._extract_relations(raw_entry)
|
||||
if relations:
|
||||
universal["relations"] = relations
|
||||
|
||||
# === TRANSLATIONS ===
|
||||
if "translations" in raw_entry:
|
||||
universal["translations"] = raw_entry["translations"]
|
||||
|
||||
# === DESCENDANTS ===
|
||||
if "descendants" in raw_entry:
|
||||
universal["descendants"] = raw_entry["descendants"]
|
||||
|
||||
# === METADATA ===
|
||||
metadata = self._extract_metadata(raw_entry)
|
||||
universal["metadata"] = metadata
|
||||
|
||||
return universal
|
||||
|
||||
def _extract_phonetics(self, entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Extract and normalize phonetic information."""
|
||||
phonetics = {}
|
||||
|
||||
# Process sounds array
|
||||
if "sounds" in entry and entry["sounds"]:
|
||||
ipa_variations = []
|
||||
audio_list = []
|
||||
homophones = []
|
||||
|
||||
for sound in entry["sounds"]:
|
||||
# IPA transcription with country information
|
||||
if "ipa" in sound:
|
||||
ipa_entry = {"ipa": sound["ipa"]}
|
||||
|
||||
# Preserve country information from raw_tags
|
||||
if "raw_tags" in sound:
|
||||
ipa_entry["raw_tags"] = sound["raw_tags"]
|
||||
|
||||
# Clean IPA string by removing special characters at beginning/end
|
||||
cleaned_ipa = self._clean_ipa_string(sound["ipa"])
|
||||
ipa_entry["ipa_cleaned"] = cleaned_ipa
|
||||
|
||||
ipa_variations.append(ipa_entry)
|
||||
|
||||
# Audio files (keep for now, will be removed in filter step)
|
||||
if "audio" in sound:
|
||||
audio_obj = {}
|
||||
# Try multiple URL formats
|
||||
for url_key in ["ogg_url", "mp3_url", "url"]:
|
||||
if url_key in sound:
|
||||
audio_obj["url"] = sound[url_key]
|
||||
break
|
||||
audio_obj["text"] = sound.get("audio", "")
|
||||
if audio_obj:
|
||||
audio_list.append(audio_obj)
|
||||
|
||||
# Homophones
|
||||
if "homophone" in sound:
|
||||
homophones.append(sound["homophone"])
|
||||
|
||||
if ipa_variations:
|
||||
phonetics["ipa_variations"] = ipa_variations
|
||||
if audio_list:
|
||||
phonetics["audio"] = audio_list
|
||||
if homophones:
|
||||
phonetics["homophones"] = homophones
|
||||
|
||||
# Handle extra_sounds (some editions)
|
||||
if "extra_sounds" in entry:
|
||||
if "pronunciación" in entry["extra_sounds"]:
|
||||
phonetics["notes"] = entry["extra_sounds"]["pronunciación"]
|
||||
|
||||
return phonetics if phonetics else None
|
||||
|
||||
def _clean_ipa_string(self, ipa_string: str) -> str:
|
||||
"""Clean IPA string by removing special characters at beginning/end."""
|
||||
if not ipa_string:
|
||||
return ipa_string
|
||||
|
||||
# Remove leading/trailing special characters: [, ], \, :
|
||||
cleaned = ipa_string.strip("[]\\:")
|
||||
return cleaned
|
||||
|
||||
def _extract_hyphenation(self, entry: Dict[str, Any]) -> Optional[List[str]]:
|
||||
"""Extract and normalize hyphenation."""
|
||||
# Format 1: hyphenations array with parts
|
||||
if "hyphenations" in entry and entry["hyphenations"]:
|
||||
parts = []
|
||||
for h in entry["hyphenations"]:
|
||||
if isinstance(h, dict) and "parts" in h:
|
||||
parts.extend(h["parts"])
|
||||
elif isinstance(h, str):
|
||||
parts.append(h)
|
||||
if parts:
|
||||
return parts
|
||||
|
||||
# Format 2: hyphenation string with separator
|
||||
if "hyphenation" in entry:
|
||||
# Split on common separators
|
||||
hyph = entry["hyphenation"]
|
||||
for sep in ["‐", "-", "·", "•"]:
|
||||
if sep in hyph:
|
||||
return hyph.split(sep)
|
||||
return [hyph]
|
||||
|
||||
return None
|
||||
|
||||
def _extract_grammatical_features(self, entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Extract grammatical features and tags."""
|
||||
if "tags" not in entry:
|
||||
return None
|
||||
|
||||
grammatical = {"tags": entry["tags"]}
|
||||
|
||||
# Extract gender from tags
|
||||
gender_map = {
|
||||
"masculine": "masculine",
|
||||
"feminine": "feminine",
|
||||
"neuter": "neuter",
|
||||
"common": "common",
|
||||
"m": "masculine",
|
||||
"f": "feminine",
|
||||
"n": "neuter",
|
||||
"c": "common"
|
||||
}
|
||||
|
||||
for tag in entry["tags"]:
|
||||
tag_lower = tag.lower()
|
||||
if tag_lower in gender_map:
|
||||
grammatical["gender"] = gender_map[tag_lower]
|
||||
break
|
||||
|
||||
# Extract number
|
||||
number_map = {
|
||||
"singular": "singular",
|
||||
"plural": "plural",
|
||||
"dual": "dual",
|
||||
"sg": "singular",
|
||||
"pl": "plural"
|
||||
}
|
||||
|
||||
for tag in entry["tags"]:
|
||||
tag_lower = tag.lower()
|
||||
if tag_lower in number_map:
|
||||
grammatical["number"] = number_map[tag_lower]
|
||||
break
|
||||
|
||||
return grammatical
|
||||
|
||||
def _extract_etymology(self, entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Extract etymology information."""
|
||||
etymology = {}
|
||||
|
||||
if "etymology_text" in entry:
|
||||
etymology["text"] = entry["etymology_text"]
|
||||
|
||||
if "etymology_texts" in entry:
|
||||
etymology["texts"] = entry["etymology_texts"]
|
||||
|
||||
if "etymology_number" in entry:
|
||||
etymology["number"] = entry["etymology_number"]
|
||||
|
||||
return etymology if etymology else None
|
||||
|
||||
def _extract_relations(self, entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Extract semantic and lexical relations."""
|
||||
relations = {}
|
||||
|
||||
# Define all possible relation types
|
||||
relation_fields = [
|
||||
"synonyms", "antonyms", "hypernyms", "hyponyms",
|
||||
"meronyms", "holonyms", "related", "derived",
|
||||
"coordinate_terms", "troponyms", "compounds"
|
||||
]
|
||||
|
||||
for field in relation_fields:
|
||||
if field in entry and entry[field]:
|
||||
relations[field] = entry[field]
|
||||
|
||||
return relations if relations else None
|
||||
|
||||
def _extract_metadata(self, entry: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Extract metadata and source information."""
|
||||
metadata = {}
|
||||
|
||||
# Source language
|
||||
if "lang" in entry:
|
||||
metadata["source_lang"] = entry["lang"]
|
||||
|
||||
# Infer source language code if possible
|
||||
if "lang_code" in entry:
|
||||
metadata["source_lang_code"] = entry["lang_code"]
|
||||
|
||||
# POS title (localized)
|
||||
if "pos_title" in entry:
|
||||
metadata["pos_title"] = entry["pos_title"]
|
||||
elif "pos_text" in entry:
|
||||
metadata["pos_title"] = entry["pos_text"]
|
||||
|
||||
# Categories
|
||||
if "categories" in entry:
|
||||
metadata["categories"] = entry["categories"]
|
||||
|
||||
# Templates
|
||||
templates = []
|
||||
if "head_templates" in entry:
|
||||
templates.extend(entry["head_templates"])
|
||||
if "inflection_templates" in entry:
|
||||
templates.extend(entry["inflection_templates"])
|
||||
if templates:
|
||||
metadata["templates"] = templates
|
||||
|
||||
# Additional metadata
|
||||
if "attestations" in entry:
|
||||
metadata["attestations"] = entry["attestations"]
|
||||
|
||||
return metadata
|
||||
|
||||
def transform_file(self, input_path: str, output_path: str) -> None:
|
||||
"""
|
||||
Transform an entire JSONL file.
|
||||
|
||||
Args:
|
||||
input_path: Path to input JSONL file
|
||||
output_path: Path to output JSONL file
|
||||
"""
|
||||
input_file = Path(input_path)
|
||||
output_file = Path(output_path)
|
||||
|
||||
if not input_file.exists():
|
||||
raise FileNotFoundError(f"Input file not found: {input_path}")
|
||||
|
||||
print(f"Transforming: {input_path} → {output_path}")
|
||||
|
||||
with open(input_file, 'r', encoding='utf-8') as infile, \
|
||||
open(output_file, 'w', encoding='utf-8') as outfile:
|
||||
|
||||
for line_num, line in enumerate(infile, 1):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
self.stats["total"] += 1
|
||||
|
||||
try:
|
||||
# Parse input
|
||||
raw_entry = json.loads(line)
|
||||
|
||||
# Transform
|
||||
universal_entry = self.transform_entry(raw_entry)
|
||||
|
||||
# Validate if requested
|
||||
if self.validate:
|
||||
self._validate_entry(universal_entry)
|
||||
|
||||
# Write output
|
||||
outfile.write(json.dumps(universal_entry, ensure_ascii=False) + '\n')
|
||||
self.stats["successful"] += 1
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
self.stats["errors"] += 1
|
||||
warning = f"Line {line_num}: JSON decode error - {e}"
|
||||
self.stats["warnings"].append(warning)
|
||||
print(f"⚠ {warning}", file=sys.stderr)
|
||||
|
||||
except ValueError as e:
|
||||
self.stats["errors"] += 1
|
||||
warning = f"Line {line_num}: {e}"
|
||||
self.stats["warnings"].append(warning)
|
||||
print(f"⚠ {warning}", file=sys.stderr)
|
||||
|
||||
except Exception as e:
|
||||
self.stats["errors"] += 1
|
||||
warning = f"Line {line_num}: Unexpected error - {e}"
|
||||
self.stats["warnings"].append(warning)
|
||||
print(f"⚠ {warning}", file=sys.stderr)
|
||||
|
||||
self._print_summary()
|
||||
|
||||
def _validate_entry(self, entry: Dict[str, Any]) -> None:
|
||||
"""Validate a transformed entry."""
|
||||
required = ["word", "lang_code", "pos", "senses"]
|
||||
for field in required:
|
||||
if field not in entry:
|
||||
raise ValueError(f"Missing required field after transformation: {field}")
|
||||
|
||||
def _print_summary(self) -> None:
|
||||
"""Print transformation summary."""
|
||||
print("\n" + "="*60)
|
||||
print("TRANSFORMATION SUMMARY")
|
||||
print("="*60)
|
||||
print(f"Total entries: {self.stats['total']}")
|
||||
print(f"Successful: {self.stats['successful']}")
|
||||
print(f"Errors: {self.stats['errors']}")
|
||||
|
||||
if self.stats['successful'] > 0:
|
||||
success_rate = (self.stats['successful'] / self.stats['total']) * 100
|
||||
print(f"Success rate: {success_rate:.1f}%")
|
||||
|
||||
if self.stats['warnings']:
|
||||
print(f"\nWarnings: {len(self.stats['warnings'])}")
|
||||
if len(self.stats['warnings']) <= 10:
|
||||
for warning in self.stats['warnings']:
|
||||
print(f" - {warning}")
|
||||
else:
|
||||
print(f" (showing first 10 of {len(self.stats['warnings'])})")
|
||||
for warning in self.stats['warnings'][:10]:
|
||||
print(f" - {warning}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Transform Wiktionary JSONL to universal format",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s input.jsonl output.jsonl
|
||||
%(prog)s data/raw.jsonl data/transformed.jsonl --validate
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument("input", help="Input JSONL file")
|
||||
parser.add_argument("output", help="Output JSONL file")
|
||||
parser.add_argument("--validate", action="store_true",
|
||||
help="Validate transformed entries")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
transformer = WiktionaryTransformer(validate=args.validate)
|
||||
transformer.transform_file(args.input, args.output)
|
||||
|
||||
# Exit with error code if there were errors
|
||||
if transformer.stats["errors"] > 0:
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user