476 lines
19 KiB
Python
476 lines
19 KiB
Python
"""
|
|
VocabListGenerator — Batch runner
|
|
-----------------------------------
|
|
Reads batch.yaml and generates every vocabulary list defined there,
|
|
writing all output files into the configured output folder and
|
|
keeping the manifest up to date after each successful generation.
|
|
|
|
Multi-language expansion
|
|
------------------------
|
|
If a batch entry has more than 2 language IDs, all C(n, 2) unordered pairs
|
|
are automatically generated. E.g. languages: [15, 7, 1, 3] produces 6 lists:
|
|
DE-PT, DE-EN, DE-ES, PT-EN, PT-ES, EN-ES
|
|
|
|
Usage:
|
|
python batch_generate.py # process all batches (skips existing files)
|
|
python batch_generate.py --force # regenerate everything, even existing files
|
|
python batch_generate.py --dry-run # preview without calling the LLM
|
|
python batch_generate.py --list # list all batches (after expansion)
|
|
python batch_generate.py --prune # remove stale manifest entries and exit
|
|
python batch_generate.py --config FILE # use a different batch file
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import yaml
|
|
from datetime import date, timedelta
|
|
from itertools import combinations
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
from config import Config
|
|
from llm_client import LLMClient
|
|
from generate import load_language_map, load_language_code_map, load_language_instructions, run_generation
|
|
from manifest_manager import print_manifest, prune_missing_files
|
|
from check_duplicates import check_file_for_true_duplicates, find_json_files
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def sanitize_for_filename(name: str) -> str:
|
|
"""
|
|
Convert a name into a filename-safe string.
|
|
- Lowercase
|
|
- Replace spaces and special characters with underscores
|
|
- Remove non-alphanumeric characters (except underscores)
|
|
"""
|
|
# Convert to lowercase
|
|
name = name.lower()
|
|
# Replace spaces, dashes (ASCII and em-dash), and other separators with underscore
|
|
name = re.sub(r'[\s\-–—]+', '_', name)
|
|
# Remove any non-alphanumeric characters (keep underscores)
|
|
name = re.sub(r'[^a-z0-9_]', '', name)
|
|
# Remove consecutive underscores
|
|
name = re.sub(r'_+', '_', name)
|
|
# Strip leading/trailing underscores
|
|
name = name.strip('_')
|
|
return name
|
|
|
|
|
|
def strip_date_prefix(filename: str) -> str:
|
|
"""
|
|
Strip the date prefix (YYYY_MM_DD_) from a filename.
|
|
If the filename doesn't have a date prefix, return it unchanged.
|
|
|
|
Example: '2026_02_19_verbs_en_de_A1.json' -> 'verbs_en_de_A1.json'
|
|
"""
|
|
# Match date pattern at the start: YYYY_MM_DD_
|
|
date_pattern = r'^\d{4}_\d{2}_\d{2}_'
|
|
return re.sub(date_pattern, '', filename)
|
|
|
|
|
|
def find_existing_file_ignoring_date(output_dir: str, target_filename: str) -> str | None:
|
|
"""
|
|
Check if a file with the same name (ignoring date prefix) already exists.
|
|
|
|
Args:
|
|
output_dir: The directory to search in
|
|
target_filename: The target filename (may include date prefix)
|
|
|
|
Returns:
|
|
The path to the existing file if found, None otherwise.
|
|
"""
|
|
# Strip the date prefix from our target filename
|
|
target_without_date = strip_date_prefix(target_filename)
|
|
|
|
if not os.path.isdir(output_dir):
|
|
return None
|
|
|
|
# Search for files matching the pattern
|
|
for existing_file in os.listdir(output_dir):
|
|
existing_without_date = strip_date_prefix(existing_file)
|
|
if existing_without_date == target_without_date:
|
|
return os.path.join(output_dir, existing_file)
|
|
|
|
return None
|
|
|
|
|
|
def generate_output_filename(
|
|
entry: Dict[str, Any],
|
|
code_map: Dict[int, str],
|
|
) -> str:
|
|
"""
|
|
Generate the output filename with the new format:
|
|
YYYY_MM_DD_name_lang1_lang2_level.json
|
|
|
|
Example: 2026_02_19_verbs_beginners_en_de_A1.json
|
|
"""
|
|
# Get today's date in YYYY_MM_DD format
|
|
today = date.today().strftime("%Y_%m_%d")
|
|
|
|
# Get the name and sanitize it for filename
|
|
# Try 'name' first, then 'category', then fallback to 'unknown'
|
|
name = entry.get("name") or entry.get("category") or "unknown"
|
|
sanitized_name = sanitize_for_filename(name)
|
|
|
|
# Fallback if sanitized name is empty
|
|
if not sanitized_name:
|
|
sanitized_name = "vocab"
|
|
|
|
# Get language codes
|
|
lang_ids = entry["languages"]
|
|
code1 = code_map.get(lang_ids[0], str(lang_ids[0])).lower()
|
|
code2 = code_map.get(lang_ids[1], str(lang_ids[1])).lower()
|
|
|
|
# Get level (default to A2 if not specified)
|
|
level = entry.get("level", "A2").strip().upper()
|
|
|
|
# Build the new filename format
|
|
filename = f"{today}_{sanitized_name}_{code1}_{code2}_{level}.json"
|
|
|
|
return filename
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch config loader & validator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_batch_config(path: str = "batch.yaml") -> Dict[str, Any]:
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return yaml.safe_load(f)
|
|
except FileNotFoundError:
|
|
print(f"ERROR: '{path}' not found.")
|
|
sys.exit(1)
|
|
except yaml.YAMLError as e:
|
|
print(f"ERROR: Could not parse '{path}': {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def validate_batch_entry(entry: Dict[str, Any]) -> List[str]:
|
|
"""Return a list of validation error strings (empty = valid)."""
|
|
errors = []
|
|
for field in ("name", "category", "output_filename"):
|
|
if not entry.get(field):
|
|
errors.append(f"missing '{field}'")
|
|
langs = entry.get("languages")
|
|
if not isinstance(langs, list) or len(langs) < 2:
|
|
errors.append("'languages' must be a list of at least 2 IDs")
|
|
amount = entry.get("amount")
|
|
if not isinstance(amount, int) or amount < 1:
|
|
errors.append("'amount' must be a positive integer")
|
|
return errors
|
|
|
|
|
|
def expand_entry(
|
|
entry: Dict[str, Any],
|
|
code_map: Dict[int, str],
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Expand a batch entry into individual (lang1, lang2) sub-entries.
|
|
|
|
- If languages has exactly 2 IDs → returns [entry] with new filename format.
|
|
- If languages has 3+ IDs → returns one entry per C(n,2) combination,
|
|
with auto-generated name suffix and output_filename using the new format.
|
|
"""
|
|
langs: List[int] = entry["languages"]
|
|
|
|
# For entries with exactly 2 languages, just update the filename format
|
|
if len(langs) == 2:
|
|
sub = dict(entry)
|
|
sub["output_filename"] = generate_output_filename(entry, code_map)
|
|
return [sub]
|
|
|
|
expanded: List[Dict[str, Any]] = []
|
|
name_template = entry.get("name", entry["category"])
|
|
|
|
for lang1, lang2 in combinations(langs, 2):
|
|
sub = dict(entry)
|
|
sub["languages"] = [lang1, lang2]
|
|
sub["name"] = name_template
|
|
# Use new filename format with date, name, languages, and level
|
|
sub["output_filename"] = generate_output_filename(sub, code_map)
|
|
expanded.append(sub)
|
|
|
|
return expanded
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="VocabListGenerator — Batch runner")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="Print what would be generated without calling the LLM")
|
|
parser.add_argument("--list", action="store_true",
|
|
help="List all batches (after expansion) and exit")
|
|
parser.add_argument("--prune", action="store_true",
|
|
help="Remove manifest entries whose output files no longer exist, then exit")
|
|
parser.add_argument("--force", action="store_true",
|
|
help="Regenerate all lists, even those whose output file already exists")
|
|
parser.add_argument("--config", default="batch.yaml", metavar="FILE",
|
|
help="Path to batch config file (default: batch.yaml)")
|
|
args = parser.parse_args()
|
|
|
|
# ── Load configs ─────────────────────────────────────────────────────────
|
|
batch_cfg = load_batch_config(args.config)
|
|
main_cfg = Config()
|
|
language_map = load_language_map()
|
|
code_map = load_language_code_map()
|
|
language_instructions = load_language_instructions()
|
|
|
|
settings = batch_cfg.get("settings", {})
|
|
output_dir = settings.get("output_dir", "output")
|
|
manifest_file = settings.get("manifest_filename", "vocab_manifest.json")
|
|
stop_on_error = settings.get("stop_on_error", False)
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
output_dir = os.path.join(script_dir, output_dir)
|
|
manifest_path = os.path.join(output_dir, manifest_file)
|
|
|
|
raw_batches: List[Dict[str, Any]] = batch_cfg.get("batches", [])
|
|
|
|
# Separate skipped entries before expansion
|
|
active_raw = [b for b in raw_batches if not b.get("skip", False)]
|
|
skipped_raw = [b for b in raw_batches if b.get("skip", False)]
|
|
|
|
# Validate raw entries before expanding (catches config mistakes early)
|
|
invalid = []
|
|
for i, entry in enumerate(active_raw, 1):
|
|
errs = validate_batch_entry(entry)
|
|
if errs:
|
|
invalid.append((i, entry.get("name", f"entry #{i}"), errs))
|
|
if invalid:
|
|
print("ERROR: The following batch entries have validation problems:\n")
|
|
for i, name, errs in invalid:
|
|
print(f" [{i}] {name}")
|
|
for e in errs:
|
|
print(f" • {e}")
|
|
sys.exit(1)
|
|
|
|
# Expand multi-language entries into individual pairs
|
|
active: List[Dict[str, Any]] = []
|
|
for entry in active_raw:
|
|
active.extend(expand_entry(entry, code_map))
|
|
|
|
skipped_expanded: List[Dict[str, Any]] = []
|
|
for entry in skipped_raw:
|
|
skipped_expanded.extend(expand_entry(entry, code_map))
|
|
|
|
total_pairs = sum(b["amount"] for b in active)
|
|
|
|
# ── --list mode ──────────────────────────────────────────────────────────
|
|
if args.list:
|
|
print(f"\nbatch.yaml — {len(raw_batches)} template(s) → "
|
|
f"{len(active)} lists to generate ({len(skipped_expanded)} skipped)\n")
|
|
for i, b in enumerate(active, 1):
|
|
langs = b["languages"]
|
|
l1 = language_map.get(langs[0], f"ID {langs[0]}")
|
|
l2 = language_map.get(langs[1], f"ID {langs[1]}")
|
|
print(f" {i:3}. [{b['output_filename']}]")
|
|
print(f" {b['name']}")
|
|
print(f" {l1} → {l2} | {b['amount']} pairs | {b['category']}")
|
|
if skipped_expanded:
|
|
print(f"\n Skipped ({len(skipped_expanded)}):")
|
|
for b in skipped_expanded:
|
|
print(f" - {b.get('name', '?')}")
|
|
print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n")
|
|
return
|
|
|
|
# ── --prune mode ─────────────────────────────────────────────────────────
|
|
if args.prune:
|
|
if not os.path.isfile(manifest_path):
|
|
print(f" [prune] No manifest found at {manifest_path} — nothing to do.")
|
|
return
|
|
removed = prune_missing_files(manifest_path, output_dir)
|
|
if removed == 0:
|
|
print(" [prune] Manifest is clean — no stale entries found.")
|
|
return
|
|
|
|
# ── Banner ───────────────────────────────────────────────────────────────
|
|
print("=" * 60)
|
|
print(" VocabListGenerator — Batch Run")
|
|
print("=" * 60)
|
|
print(f" Templates : {len(raw_batches)} defined → {len(active)} lists after expansion")
|
|
print(f" Skipped : {len(skipped_expanded)} lists")
|
|
print(f" Total pairs: ≈ {total_pairs:,}")
|
|
print(f" Output dir : {output_dir}")
|
|
print(f" Manifest : {manifest_path}")
|
|
if args.force:
|
|
print(" Mode : FORCE (regenerate all, ignoring existing files)")
|
|
elif args.dry_run:
|
|
print(" Mode : DRY RUN (no API calls)")
|
|
else:
|
|
already = sum(
|
|
1 for b in active
|
|
if find_existing_file_ignoring_date(output_dir, b["output_filename"]) is not None
|
|
)
|
|
if already:
|
|
print(f" Resuming : {already} existing file(s) will be skipped (use --force to override)")
|
|
print()
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# ── Prune stale manifest entries before generating ────────────────────────
|
|
if os.path.isfile(manifest_path):
|
|
prune_missing_files(manifest_path, output_dir)
|
|
|
|
# ── Dry-run preview ──────────────────────────────────────────────────────
|
|
if args.dry_run:
|
|
print("Lists that would be generated:\n")
|
|
for i, b in enumerate(active, 1):
|
|
langs = b["languages"]
|
|
l1 = language_map.get(langs[0], f"ID {langs[0]}")
|
|
l2 = language_map.get(langs[1], f"ID {langs[1]}")
|
|
print(f" {i:3}. {b['name']}")
|
|
print(f" {l1} ({langs[0]}) → {l2} ({langs[1]}) | "
|
|
f"{b['amount']} pairs → {b['output_filename']}")
|
|
print(f"\n Total: {len(active)} lists ≈ {total_pairs:,} word pairs\n")
|
|
return
|
|
|
|
# ── Build LLM client once ────────────────────────────────────────────────
|
|
llm = LLMClient(main_cfg)
|
|
|
|
# ── Run batches ──────────────────────────────────────────────────────────
|
|
ok, failed, skipped_existing = 0, 0, 0
|
|
start_time = time.time()
|
|
generated_count = 0 # Track only generated items for time estimation
|
|
|
|
for i, entry in enumerate(active, 1):
|
|
name = entry["name"]
|
|
category = entry["category"]
|
|
description = entry.get("description", "").strip()
|
|
instructions = entry.get("instructions", "").strip()
|
|
emoji = entry.get("emoji", "").strip()
|
|
level = entry.get("level", "A2").strip().upper()
|
|
amount = entry["amount"]
|
|
lang_ids = entry["languages"]
|
|
output_filename = entry["output_filename"]
|
|
vocab_file_path = os.path.join(output_dir, output_filename)
|
|
|
|
# Calculate time estimation based only on generated items
|
|
current_time = time.time()
|
|
elapsed = current_time - start_time
|
|
avg_time_per_item = elapsed / generated_count if generated_count > 0 else 0
|
|
remaining = len(active) - i - skipped_existing
|
|
eta_seconds = avg_time_per_item * remaining
|
|
eta_str = str(timedelta(seconds=int(eta_seconds))) if remaining > 0 else "done"
|
|
|
|
header = f"[{i}/{len(active)}] {emoji} {name}" if emoji else f"[{i}/{len(active)}] {name}"
|
|
print(f"{header} [{level}]")
|
|
print(f" File : {output_filename}")
|
|
if generated_count > 0:
|
|
print(f" ETA : {eta_str} ({int(avg_time_per_item)}s/item)")
|
|
|
|
# Skip if already generated (unless --force)
|
|
existing_file = find_existing_file_ignoring_date(output_dir, output_filename)
|
|
if not args.force and existing_file is not None:
|
|
existing_filename = os.path.basename(existing_file)
|
|
print(f" ✔ Already exists ({existing_filename}) — skipping (use --force to regenerate)")
|
|
print("-" * 60)
|
|
skipped_existing += 1
|
|
continue
|
|
|
|
# Track time before generation
|
|
item_start_time = time.time()
|
|
|
|
success = run_generation(
|
|
llm=llm,
|
|
language_map=language_map,
|
|
lang_first_id=lang_ids[0],
|
|
lang_second_id=lang_ids[1],
|
|
amount=amount,
|
|
category=category,
|
|
name=name,
|
|
description=description,
|
|
instructions=instructions,
|
|
output_file_path=vocab_file_path,
|
|
manifest_path=manifest_path,
|
|
emoji=emoji,
|
|
level=level,
|
|
language_instructions=language_instructions,
|
|
)
|
|
|
|
if success:
|
|
ok += 1
|
|
generated_count += 1
|
|
else:
|
|
failed += 1
|
|
print(f" ✗ FAILED: {name}\n")
|
|
if stop_on_error:
|
|
print("stop_on_error is set — aborting.")
|
|
break
|
|
|
|
print("-" * 60)
|
|
|
|
# ── Summary ──────────────────────────────────────────────────────────────
|
|
total_time = time.time() - start_time
|
|
print(f"\n{'=' * 60}")
|
|
print(f" Batch complete.")
|
|
print(f" ✓ Success : {ok}")
|
|
print(f" ✗ Failed : {failed}")
|
|
print(f" ⏱ Total time: {str(timedelta(seconds=int(total_time)))}")
|
|
if skipped_existing:
|
|
print(f" ⏭ Existing : {skipped_existing} (already generated, skipped)")
|
|
if skipped_expanded:
|
|
print(f" - Disabled : {len(skipped_expanded)} (skip: true in batch.yaml)")
|
|
print(f"{'=' * 60}\n")
|
|
|
|
# ── Check for TRUE duplicates and delete bad files ─────────────────────
|
|
print("Checking for TRUE duplicates (both wordFirst AND wordSecond identical)...\n")
|
|
|
|
json_files = find_json_files(output_dir)
|
|
files_with_dupes = 0
|
|
files_deleted = 0
|
|
|
|
for file_path in json_files:
|
|
result = check_file_for_true_duplicates(file_path, threshold=3)
|
|
|
|
if "error" in result:
|
|
continue
|
|
|
|
true_dupes = result.get("true_dupes", {})
|
|
|
|
if true_dupes:
|
|
files_with_dupes += 1
|
|
try:
|
|
rel_path = file_path.relative_to(Path(output_dir))
|
|
except ValueError:
|
|
rel_path = file_path.name
|
|
|
|
print(f" ⚠️ Deleting {rel_path}")
|
|
print(f" TRUE duplicates found: {len(true_dupes)} pairs appearing 3+ times")
|
|
for pair, count in list(true_dupes.items())[:3]:
|
|
wf, ws = pair
|
|
print(f" - \"{wf}\" → \"{ws}\" = {count} times")
|
|
|
|
# Delete the file
|
|
try:
|
|
os.remove(file_path)
|
|
files_deleted += 1
|
|
print(f" ✅ DELETED\n")
|
|
except Exception as e:
|
|
print(f" ❌ Failed to delete: {e}\n")
|
|
|
|
if files_with_dupes > 0:
|
|
print(f"\n{'=' * 60}")
|
|
print(f" 🗑️ Deleted {files_deleted} files with 3+ TRUE duplicates")
|
|
print(f"{'=' * 60}\n")
|
|
else:
|
|
print(" ✅ No files with TRUE duplicates found\n")
|
|
|
|
print_manifest(manifest_path)
|
|
|
|
if failed > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|