""" VocabListGenerator — Batch runner ----------------------------------- Reads batch.yaml and generates every vocabulary list defined there, writing all output files into the configured output folder and keeping the manifest up to date after each successful generation. Multi-language expansion ------------------------ If a batch entry has more than 2 language IDs, all C(n, 2) unordered pairs are automatically generated. E.g. languages: [15, 7, 1, 3] produces 6 lists: DE-PT, DE-EN, DE-ES, PT-EN, PT-ES, EN-ES Usage: python batch_generate.py # process all batches (skips existing files) python batch_generate.py --force # regenerate everything, even existing files python batch_generate.py --dry-run # preview without calling the LLM python batch_generate.py --list # list all batches (after expansion) python batch_generate.py --prune # remove stale manifest entries and exit python batch_generate.py --config FILE # use a different batch file """ import argparse import os import re import sys import time import yaml from datetime import date, timedelta from itertools import combinations from pathlib import Path from typing import Any, Dict, List, Tuple from config import Config from llm_client import LLMClient from generate import load_language_map, load_language_code_map, load_language_instructions, run_generation from manifest_manager import print_manifest, prune_missing_files from check_duplicates import check_file_for_true_duplicates, find_json_files # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def sanitize_for_filename(name: str) -> str: """ Convert a name into a filename-safe string. - Lowercase - Replace spaces and special characters with underscores - Remove non-alphanumeric characters (except underscores) """ # Convert to lowercase name = name.lower() # Replace spaces, dashes (ASCII and em-dash), and other separators with underscore name = re.sub(r'[\s\-–—]+', '_', name) # Remove any non-alphanumeric characters (keep underscores) name = re.sub(r'[^a-z0-9_]', '', name) # Remove consecutive underscores name = re.sub(r'_+', '_', name) # Strip leading/trailing underscores name = name.strip('_') return name def generate_output_filename( entry: Dict[str, Any], code_map: Dict[int, str], ) -> str: """ Generate the output filename with the new format: YYYY_MM_DD_name_lang1_lang2_level.json Example: 2026_02_19_verbs_beginners_en_de_A1.json """ # Get today's date in YYYY_MM_DD format today = date.today().strftime("%Y_%m_%d") # Get the name and sanitize it for filename # Try 'name' first, then 'category', then fallback to 'unknown' name = entry.get("name") or entry.get("category") or "unknown" sanitized_name = sanitize_for_filename(name) # Fallback if sanitized name is empty if not sanitized_name: sanitized_name = "vocab" # Get language codes lang_ids = entry["languages"] code1 = code_map.get(lang_ids[0], str(lang_ids[0])).lower() code2 = code_map.get(lang_ids[1], str(lang_ids[1])).lower() # Get level (default to A2 if not specified) level = entry.get("level", "A2").strip().upper() # Build the new filename format filename = f"{today}_{sanitized_name}_{code1}_{code2}_{level}.json" return filename # --------------------------------------------------------------------------- # Batch config loader & validator # --------------------------------------------------------------------------- def load_batch_config(path: str = "batch.yaml") -> Dict[str, Any]: try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except FileNotFoundError: print(f"ERROR: '{path}' not found.") sys.exit(1) except yaml.YAMLError as e: print(f"ERROR: Could not parse '{path}': {e}") sys.exit(1) def validate_batch_entry(entry: Dict[str, Any]) -> List[str]: """Return a list of validation error strings (empty = valid).""" errors = [] for field in ("name", "category", "output_filename"): if not entry.get(field): errors.append(f"missing '{field}'") langs = entry.get("languages") if not isinstance(langs, list) or len(langs) < 2: errors.append("'languages' must be a list of at least 2 IDs") amount = entry.get("amount") if not isinstance(amount, int) or amount < 1: errors.append("'amount' must be a positive integer") return errors def expand_entry( entry: Dict[str, Any], code_map: Dict[int, str], ) -> List[Dict[str, Any]]: """ Expand a batch entry into individual (lang1, lang2) sub-entries. - If languages has exactly 2 IDs → returns [entry] with new filename format. - If languages has 3+ IDs → returns one entry per C(n,2) combination, with auto-generated name suffix and output_filename using the new format. """ langs: List[int] = entry["languages"] # For entries with exactly 2 languages, just update the filename format if len(langs) == 2: sub = dict(entry) sub["output_filename"] = generate_output_filename(entry, code_map) return [sub] expanded: List[Dict[str, Any]] = [] name_template = entry.get("name", entry["category"]) for lang1, lang2 in combinations(langs, 2): sub = dict(entry) sub["languages"] = [lang1, lang2] sub["name"] = name_template # Use new filename format with date, name, languages, and level sub["output_filename"] = generate_output_filename(sub, code_map) expanded.append(sub) return expanded # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="VocabListGenerator — Batch runner") parser.add_argument("--dry-run", action="store_true", help="Print what would be generated without calling the LLM") parser.add_argument("--list", action="store_true", help="List all batches (after expansion) and exit") parser.add_argument("--prune", action="store_true", help="Remove manifest entries whose output files no longer exist, then exit") parser.add_argument("--force", action="store_true", help="Regenerate all lists, even those whose output file already exists") parser.add_argument("--config", default="batch.yaml", metavar="FILE", help="Path to batch config file (default: batch.yaml)") args = parser.parse_args() # ── Load configs ───────────────────────────────────────────────────────── batch_cfg = load_batch_config(args.config) main_cfg = Config() language_map = load_language_map() code_map = load_language_code_map() language_instructions = load_language_instructions() settings = batch_cfg.get("settings", {}) output_dir = settings.get("output_dir", "output") manifest_file = settings.get("manifest_filename", "vocab_manifest.json") stop_on_error = settings.get("stop_on_error", False) script_dir = os.path.dirname(os.path.abspath(__file__)) output_dir = os.path.join(script_dir, output_dir) manifest_path = os.path.join(output_dir, manifest_file) raw_batches: List[Dict[str, Any]] = batch_cfg.get("batches", []) # Separate skipped entries before expansion active_raw = [b for b in raw_batches if not b.get("skip", False)] skipped_raw = [b for b in raw_batches if b.get("skip", False)] # Validate raw entries before expanding (catches config mistakes early) invalid = [] for i, entry in enumerate(active_raw, 1): errs = validate_batch_entry(entry) if errs: invalid.append((i, entry.get("name", f"entry #{i}"), errs)) if invalid: print("ERROR: The following batch entries have validation problems:\n") for i, name, errs in invalid: print(f" [{i}] {name}") for e in errs: print(f" • {e}") sys.exit(1) # Expand multi-language entries into individual pairs active: List[Dict[str, Any]] = [] for entry in active_raw: active.extend(expand_entry(entry, code_map)) skipped_expanded: List[Dict[str, Any]] = [] for entry in skipped_raw: skipped_expanded.extend(expand_entry(entry, code_map)) total_pairs = sum(b["amount"] for b in active) # ── --list mode ────────────────────────────────────────────────────────── if args.list: print(f"\nbatch.yaml — {len(raw_batches)} template(s) → " f"{len(active)} lists to generate ({len(skipped_expanded)} skipped)\n") for i, b in enumerate(active, 1): langs = b["languages"] l1 = language_map.get(langs[0], f"ID {langs[0]}") l2 = language_map.get(langs[1], f"ID {langs[1]}") print(f" {i:3}. [{b['output_filename']}]") print(f" {b['name']}") print(f" {l1} → {l2} | {b['amount']} pairs | {b['category']}") if skipped_expanded: print(f"\n Skipped ({len(skipped_expanded)}):") for b in skipped_expanded: print(f" - {b.get('name', '?')}") print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n") return # ── --prune mode ───────────────────────────────────────────────────────── if args.prune: if not os.path.isfile(manifest_path): print(f" [prune] No manifest found at {manifest_path} — nothing to do.") return removed = prune_missing_files(manifest_path, output_dir) if removed == 0: print(" [prune] Manifest is clean — no stale entries found.") return # ── Banner ─────────────────────────────────────────────────────────────── print("=" * 60) print(" VocabListGenerator — Batch Run") print("=" * 60) print(f" Templates : {len(raw_batches)} defined → {len(active)} lists after expansion") print(f" Skipped : {len(skipped_expanded)} lists") print(f" Total pairs: ≈ {total_pairs:,}") print(f" Output dir : {output_dir}") print(f" Manifest : {manifest_path}") if args.force: print(" Mode : FORCE (regenerate all, ignoring existing files)") elif args.dry_run: print(" Mode : DRY RUN (no API calls)") else: already = sum( 1 for b in active if os.path.isfile(os.path.join(output_dir, b["output_filename"])) ) if already: print(f" Resuming : {already} existing file(s) will be skipped (use --force to override)") print() os.makedirs(output_dir, exist_ok=True) # ── Prune stale manifest entries before generating ──────────────────────── if os.path.isfile(manifest_path): prune_missing_files(manifest_path, output_dir) # ── Dry-run preview ────────────────────────────────────────────────────── if args.dry_run: print("Lists that would be generated:\n") for i, b in enumerate(active, 1): langs = b["languages"] l1 = language_map.get(langs[0], f"ID {langs[0]}") l2 = language_map.get(langs[1], f"ID {langs[1]}") print(f" {i:3}. {b['name']}") print(f" {l1} ({langs[0]}) → {l2} ({langs[1]}) | " f"{b['amount']} pairs → {b['output_filename']}") print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n") return # ── Build LLM client once ──────────────────────────────────────────────── llm = LLMClient(main_cfg) # ── Run batches ────────────────────────────────────────────────────────── ok, failed, skipped_existing = 0, 0, 0 start_time = time.time() generated_count = 0 # Track only generated items for time estimation for i, entry in enumerate(active, 1): name = entry["name"] category = entry["category"] description = entry.get("description", "").strip() instructions = entry.get("instructions", "").strip() emoji = entry.get("emoji", "").strip() level = entry.get("level", "A2").strip().upper() amount = entry["amount"] lang_ids = entry["languages"] output_filename = entry["output_filename"] vocab_file_path = os.path.join(output_dir, output_filename) # Calculate time estimation based only on generated items current_time = time.time() elapsed = current_time - start_time avg_time_per_item = elapsed / generated_count if generated_count > 0 else 0 remaining = len(active) - i - skipped_existing eta_seconds = avg_time_per_item * remaining eta_str = str(timedelta(seconds=int(eta_seconds))) if remaining > 0 else "done" header = f"[{i}/{len(active)}] {emoji} {name}" if emoji else f"[{i}/{len(active)}] {name}" print(f"{header} [{level}]") print(f" File : {output_filename}") if generated_count > 0: print(f" ETA : {eta_str} ({int(avg_time_per_item)}s/item)") # Skip if already generated (unless --force) if not args.force and os.path.isfile(vocab_file_path): print(f" ✔ Already exists — skipping (use --force to regenerate)") print("-" * 60) skipped_existing += 1 continue # Track time before generation item_start_time = time.time() success = run_generation( llm=llm, language_map=language_map, lang_first_id=lang_ids[0], lang_second_id=lang_ids[1], amount=amount, category=category, name=name, description=description, instructions=instructions, output_file_path=vocab_file_path, manifest_path=manifest_path, emoji=emoji, level=level, language_instructions=language_instructions, ) if success: ok += 1 generated_count += 1 else: failed += 1 print(f" ✗ FAILED: {name}\n") if stop_on_error: print("stop_on_error is set — aborting.") break print("-" * 60) # ── Summary ────────────────────────────────────────────────────────────── total_time = time.time() - start_time print(f"\n{'=' * 60}") print(f" Batch complete.") print(f" ✓ Success : {ok}") print(f" ✗ Failed : {failed}") print(f" ⏱ Total time: {str(timedelta(seconds=int(total_time)))}") if skipped_existing: print(f" ⏭ Existing : {skipped_existing} (already generated, skipped)") if skipped_expanded: print(f" - Disabled : {len(skipped_expanded)} (skip: true in batch.yaml)") print(f"{'=' * 60}\n") # ── Check for TRUE duplicates and delete bad files ───────────────────── print("Checking for TRUE duplicates (both wordFirst AND wordSecond identical)...\n") json_files = find_json_files(output_dir) files_with_dupes = 0 files_deleted = 0 for file_path in json_files: result = check_file_for_true_duplicates(file_path, threshold=3) if "error" in result: continue true_dupes = result.get("true_dupes", {}) if true_dupes: files_with_dupes += 1 try: rel_path = file_path.relative_to(Path(output_dir)) except ValueError: rel_path = file_path.name print(f" ⚠️ Deleting {rel_path}") print(f" TRUE duplicates found: {len(true_dupes)} pairs appearing 3+ times") for pair, count in list(true_dupes.items())[:3]: wf, ws = pair print(f" - \"{wf}\" → \"{ws}\" = {count} times") # Delete the file try: os.remove(file_path) files_deleted += 1 print(f" ✅ DELETED\n") except Exception as e: print(f" ❌ Failed to delete: {e}\n") if files_with_dupes > 0: print(f"\n{'=' * 60}") print(f" 🗑️ Deleted {files_deleted} files with 3+ TRUE duplicates") print(f"{'=' * 60}\n") else: print(" ✅ No files with TRUE duplicates found\n") print_manifest(manifest_path) if failed > 0: sys.exit(1) if __name__ == "__main__": main()