Files
BatchVocabListGenerator/batch_generate.py
jonasgaudian eabe2e2969 welcome gitea
2026-02-19 17:18:23 +01:00

436 lines
18 KiB
Python

"""
VocabListGenerator — Batch runner
-----------------------------------
Reads batch.yaml and generates every vocabulary list defined there,
writing all output files into the configured output folder and
keeping the manifest up to date after each successful generation.
Multi-language expansion
------------------------
If a batch entry has more than 2 language IDs, all C(n, 2) unordered pairs
are automatically generated. E.g. languages: [15, 7, 1, 3] produces 6 lists:
DE-PT, DE-EN, DE-ES, PT-EN, PT-ES, EN-ES
Usage:
python batch_generate.py # process all batches (skips existing files)
python batch_generate.py --force # regenerate everything, even existing files
python batch_generate.py --dry-run # preview without calling the LLM
python batch_generate.py --list # list all batches (after expansion)
python batch_generate.py --prune # remove stale manifest entries and exit
python batch_generate.py --config FILE # use a different batch file
"""
import argparse
import os
import re
import sys
import time
import yaml
from datetime import date, timedelta
from itertools import combinations
from pathlib import Path
from typing import Any, Dict, List, Tuple
from config import Config
from llm_client import LLMClient
from generate import load_language_map, load_language_code_map, load_language_instructions, run_generation
from manifest_manager import print_manifest, prune_missing_files
from check_duplicates import check_file_for_true_duplicates, find_json_files
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def sanitize_for_filename(name: str) -> str:
"""
Convert a name into a filename-safe string.
- Lowercase
- Replace spaces and special characters with underscores
- Remove non-alphanumeric characters (except underscores)
"""
# Convert to lowercase
name = name.lower()
# Replace spaces, dashes (ASCII and em-dash), and other separators with underscore
name = re.sub(r'[\s\-–—]+', '_', name)
# Remove any non-alphanumeric characters (keep underscores)
name = re.sub(r'[^a-z0-9_]', '', name)
# Remove consecutive underscores
name = re.sub(r'_+', '_', name)
# Strip leading/trailing underscores
name = name.strip('_')
return name
def generate_output_filename(
entry: Dict[str, Any],
code_map: Dict[int, str],
) -> str:
"""
Generate the output filename with the new format:
YYYY_MM_DD_name_lang1_lang2_level.json
Example: 2026_02_19_verbs_beginners_en_de_A1.json
"""
# Get today's date in YYYY_MM_DD format
today = date.today().strftime("%Y_%m_%d")
# Get the name and sanitize it for filename
# Try 'name' first, then 'category', then fallback to 'unknown'
name = entry.get("name") or entry.get("category") or "unknown"
sanitized_name = sanitize_for_filename(name)
# Fallback if sanitized name is empty
if not sanitized_name:
sanitized_name = "vocab"
# Get language codes
lang_ids = entry["languages"]
code1 = code_map.get(lang_ids[0], str(lang_ids[0])).lower()
code2 = code_map.get(lang_ids[1], str(lang_ids[1])).lower()
# Get level (default to A2 if not specified)
level = entry.get("level", "A2").strip().upper()
# Build the new filename format
filename = f"{today}_{sanitized_name}_{code1}_{code2}_{level}.json"
return filename
# ---------------------------------------------------------------------------
# Batch config loader & validator
# ---------------------------------------------------------------------------
def load_batch_config(path: str = "batch.yaml") -> Dict[str, Any]:
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except FileNotFoundError:
print(f"ERROR: '{path}' not found.")
sys.exit(1)
except yaml.YAMLError as e:
print(f"ERROR: Could not parse '{path}': {e}")
sys.exit(1)
def validate_batch_entry(entry: Dict[str, Any]) -> List[str]:
"""Return a list of validation error strings (empty = valid)."""
errors = []
for field in ("name", "category", "output_filename"):
if not entry.get(field):
errors.append(f"missing '{field}'")
langs = entry.get("languages")
if not isinstance(langs, list) or len(langs) < 2:
errors.append("'languages' must be a list of at least 2 IDs")
amount = entry.get("amount")
if not isinstance(amount, int) or amount < 1:
errors.append("'amount' must be a positive integer")
return errors
def expand_entry(
entry: Dict[str, Any],
code_map: Dict[int, str],
) -> List[Dict[str, Any]]:
"""
Expand a batch entry into individual (lang1, lang2) sub-entries.
- If languages has exactly 2 IDs → returns [entry] with new filename format.
- If languages has 3+ IDs → returns one entry per C(n,2) combination,
with auto-generated name suffix and output_filename using the new format.
"""
langs: List[int] = entry["languages"]
# For entries with exactly 2 languages, just update the filename format
if len(langs) == 2:
sub = dict(entry)
sub["output_filename"] = generate_output_filename(entry, code_map)
return [sub]
expanded: List[Dict[str, Any]] = []
name_template = entry.get("name", entry["category"])
for lang1, lang2 in combinations(langs, 2):
sub = dict(entry)
sub["languages"] = [lang1, lang2]
sub["name"] = name_template
# Use new filename format with date, name, languages, and level
sub["output_filename"] = generate_output_filename(sub, code_map)
expanded.append(sub)
return expanded
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="VocabListGenerator — Batch runner")
parser.add_argument("--dry-run", action="store_true",
help="Print what would be generated without calling the LLM")
parser.add_argument("--list", action="store_true",
help="List all batches (after expansion) and exit")
parser.add_argument("--prune", action="store_true",
help="Remove manifest entries whose output files no longer exist, then exit")
parser.add_argument("--force", action="store_true",
help="Regenerate all lists, even those whose output file already exists")
parser.add_argument("--config", default="batch.yaml", metavar="FILE",
help="Path to batch config file (default: batch.yaml)")
args = parser.parse_args()
# ── Load configs ─────────────────────────────────────────────────────────
batch_cfg = load_batch_config(args.config)
main_cfg = Config()
language_map = load_language_map()
code_map = load_language_code_map()
language_instructions = load_language_instructions()
settings = batch_cfg.get("settings", {})
output_dir = settings.get("output_dir", "output")
manifest_file = settings.get("manifest_filename", "vocab_manifest.json")
stop_on_error = settings.get("stop_on_error", False)
script_dir = os.path.dirname(os.path.abspath(__file__))
output_dir = os.path.join(script_dir, output_dir)
manifest_path = os.path.join(output_dir, manifest_file)
raw_batches: List[Dict[str, Any]] = batch_cfg.get("batches", [])
# Separate skipped entries before expansion
active_raw = [b for b in raw_batches if not b.get("skip", False)]
skipped_raw = [b for b in raw_batches if b.get("skip", False)]
# Validate raw entries before expanding (catches config mistakes early)
invalid = []
for i, entry in enumerate(active_raw, 1):
errs = validate_batch_entry(entry)
if errs:
invalid.append((i, entry.get("name", f"entry #{i}"), errs))
if invalid:
print("ERROR: The following batch entries have validation problems:\n")
for i, name, errs in invalid:
print(f" [{i}] {name}")
for e in errs:
print(f"{e}")
sys.exit(1)
# Expand multi-language entries into individual pairs
active: List[Dict[str, Any]] = []
for entry in active_raw:
active.extend(expand_entry(entry, code_map))
skipped_expanded: List[Dict[str, Any]] = []
for entry in skipped_raw:
skipped_expanded.extend(expand_entry(entry, code_map))
total_pairs = sum(b["amount"] for b in active)
# ── --list mode ──────────────────────────────────────────────────────────
if args.list:
print(f"\nbatch.yaml — {len(raw_batches)} template(s) → "
f"{len(active)} lists to generate ({len(skipped_expanded)} skipped)\n")
for i, b in enumerate(active, 1):
langs = b["languages"]
l1 = language_map.get(langs[0], f"ID {langs[0]}")
l2 = language_map.get(langs[1], f"ID {langs[1]}")
print(f" {i:3}. [{b['output_filename']}]")
print(f" {b['name']}")
print(f" {l1}{l2} | {b['amount']} pairs | {b['category']}")
if skipped_expanded:
print(f"\n Skipped ({len(skipped_expanded)}):")
for b in skipped_expanded:
print(f" - {b.get('name', '?')}")
print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n")
return
# ── --prune mode ─────────────────────────────────────────────────────────
if args.prune:
if not os.path.isfile(manifest_path):
print(f" [prune] No manifest found at {manifest_path} — nothing to do.")
return
removed = prune_missing_files(manifest_path, output_dir)
if removed == 0:
print(" [prune] Manifest is clean — no stale entries found.")
return
# ── Banner ───────────────────────────────────────────────────────────────
print("=" * 60)
print(" VocabListGenerator — Batch Run")
print("=" * 60)
print(f" Templates : {len(raw_batches)} defined → {len(active)} lists after expansion")
print(f" Skipped : {len(skipped_expanded)} lists")
print(f" Total pairs: ≈ {total_pairs:,}")
print(f" Output dir : {output_dir}")
print(f" Manifest : {manifest_path}")
if args.force:
print(" Mode : FORCE (regenerate all, ignoring existing files)")
elif args.dry_run:
print(" Mode : DRY RUN (no API calls)")
else:
already = sum(
1 for b in active
if os.path.isfile(os.path.join(output_dir, b["output_filename"]))
)
if already:
print(f" Resuming : {already} existing file(s) will be skipped (use --force to override)")
print()
os.makedirs(output_dir, exist_ok=True)
# ── Prune stale manifest entries before generating ────────────────────────
if os.path.isfile(manifest_path):
prune_missing_files(manifest_path, output_dir)
# ── Dry-run preview ──────────────────────────────────────────────────────
if args.dry_run:
print("Lists that would be generated:\n")
for i, b in enumerate(active, 1):
langs = b["languages"]
l1 = language_map.get(langs[0], f"ID {langs[0]}")
l2 = language_map.get(langs[1], f"ID {langs[1]}")
print(f" {i:3}. {b['name']}")
print(f" {l1} ({langs[0]}) → {l2} ({langs[1]}) | "
f"{b['amount']} pairs → {b['output_filename']}")
print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n")
return
# ── Build LLM client once ────────────────────────────────────────────────
llm = LLMClient(main_cfg)
# ── Run batches ──────────────────────────────────────────────────────────
ok, failed, skipped_existing = 0, 0, 0
start_time = time.time()
generated_count = 0 # Track only generated items for time estimation
for i, entry in enumerate(active, 1):
name = entry["name"]
category = entry["category"]
description = entry.get("description", "").strip()
instructions = entry.get("instructions", "").strip()
emoji = entry.get("emoji", "").strip()
level = entry.get("level", "A2").strip().upper()
amount = entry["amount"]
lang_ids = entry["languages"]
output_filename = entry["output_filename"]
vocab_file_path = os.path.join(output_dir, output_filename)
# Calculate time estimation based only on generated items
current_time = time.time()
elapsed = current_time - start_time
avg_time_per_item = elapsed / generated_count if generated_count > 0 else 0
remaining = len(active) - i - skipped_existing
eta_seconds = avg_time_per_item * remaining
eta_str = str(timedelta(seconds=int(eta_seconds))) if remaining > 0 else "done"
header = f"[{i}/{len(active)}] {emoji} {name}" if emoji else f"[{i}/{len(active)}] {name}"
print(f"{header} [{level}]")
print(f" File : {output_filename}")
if generated_count > 0:
print(f" ETA : {eta_str} ({int(avg_time_per_item)}s/item)")
# Skip if already generated (unless --force)
if not args.force and os.path.isfile(vocab_file_path):
print(f" ✔ Already exists — skipping (use --force to regenerate)")
print("-" * 60)
skipped_existing += 1
continue
# Track time before generation
item_start_time = time.time()
success = run_generation(
llm=llm,
language_map=language_map,
lang_first_id=lang_ids[0],
lang_second_id=lang_ids[1],
amount=amount,
category=category,
name=name,
description=description,
instructions=instructions,
output_file_path=vocab_file_path,
manifest_path=manifest_path,
emoji=emoji,
level=level,
language_instructions=language_instructions,
)
if success:
ok += 1
generated_count += 1
else:
failed += 1
print(f" ✗ FAILED: {name}\n")
if stop_on_error:
print("stop_on_error is set — aborting.")
break
print("-" * 60)
# ── Summary ──────────────────────────────────────────────────────────────
total_time = time.time() - start_time
print(f"\n{'=' * 60}")
print(f" Batch complete.")
print(f" ✓ Success : {ok}")
print(f" ✗ Failed : {failed}")
print(f" ⏱ Total time: {str(timedelta(seconds=int(total_time)))}")
if skipped_existing:
print(f" ⏭ Existing : {skipped_existing} (already generated, skipped)")
if skipped_expanded:
print(f" - Disabled : {len(skipped_expanded)} (skip: true in batch.yaml)")
print(f"{'=' * 60}\n")
# ── Check for TRUE duplicates and delete bad files ─────────────────────
print("Checking for TRUE duplicates (both wordFirst AND wordSecond identical)...\n")
json_files = find_json_files(output_dir)
files_with_dupes = 0
files_deleted = 0
for file_path in json_files:
result = check_file_for_true_duplicates(file_path, threshold=3)
if "error" in result:
continue
true_dupes = result.get("true_dupes", {})
if true_dupes:
files_with_dupes += 1
try:
rel_path = file_path.relative_to(Path(output_dir))
except ValueError:
rel_path = file_path.name
print(f" ⚠️ Deleting {rel_path}")
print(f" TRUE duplicates found: {len(true_dupes)} pairs appearing 3+ times")
for pair, count in list(true_dupes.items())[:3]:
wf, ws = pair
print(f" - \"{wf}\"\"{ws}\" = {count} times")
# Delete the file
try:
os.remove(file_path)
files_deleted += 1
print(f" ✅ DELETED\n")
except Exception as e:
print(f" ❌ Failed to delete: {e}\n")
if files_with_dupes > 0:
print(f"\n{'=' * 60}")
print(f" 🗑️ Deleted {files_deleted} files with 3+ TRUE duplicates")
print(f"{'=' * 60}\n")
else:
print(" ✅ No files with TRUE duplicates found\n")
print_manifest(manifest_path)
if failed > 0:
sys.exit(1)
if __name__ == "__main__":
main()