#!/usr/bin/env python3 """ Script to count all different "pos" values in JSONL files using parallel processing. Analyzes all JSONL files in the raw_data directory and displays frequency counts. """ import json import os import glob from collections import Counter from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import cpu_count import time from typing import Dict, List, Tuple def process_jsonl_file(file_path: str) -> Tuple[str, Counter]: """ Process a single JSONL file and count POS values. Args: file_path: Path to the JSONL file Returns: Tuple of (filename, Counter of POS values) """ pos_counter = Counter() line_count = 0 try: with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: data = json.loads(line) if 'pos' in data and data['pos']: pos_counter[data['pos']] += 1 line_count += 1 except json.JSONDecodeError as e: print(f"Warning: JSON decode error in {file_path} at line {line_num}: {e}") continue except Exception as e: print(f"Error processing file {file_path}: {e}") return file_path, Counter() print(f"Processed {file_path}: {line_count} lines, {sum(pos_counter.values())} POS entries") return file_path, pos_counter def main(): """Main function to process all JSONL files and display POS statistics.""" # Find all JSONL files in raw_data directory raw_data_dir = "raw_data" jsonl_files = glob.glob(os.path.join(raw_data_dir, "*.jsonl")) if not jsonl_files: print(f"No JSONL files found in {raw_data_dir}") return print(f"Found {len(jsonl_files)} JSONL files to process") print(f"Using {cpu_count()} CPU cores for parallel processing") print("-" * 60) # Process files in parallel start_time = time.time() all_pos_counts = Counter() file_results = {} with ProcessPoolExecutor(max_workers=cpu_count()) as executor: # Submit all files for processing future_to_file = { executor.submit(process_jsonl_file, file_path): file_path for file_path in jsonl_files } # Collect results as they complete for future in as_completed(future_to_file): file_path = future_to_file[future] try: filename, pos_counter = future.result() file_results[filename] = pos_counter all_pos_counts.update(pos_counter) except Exception as e: print(f"Error processing {file_path}: {e}") end_time = time.time() processing_time = end_time - start_time # Display results print("\n" + "=" * 80) print("POS VALUE COUNTS ACROSS ALL FILES") print("=" * 80) print(f"Total processing time: {processing_time:.2f} seconds") print(f"Total POS entries found: {sum(all_pos_counts.values()):,}") print(f"Unique POS values: {len(all_pos_counts)}") print("\nTop 50 most common POS values:") print("-" * 80) # Sort by frequency (descending) sorted_pos = sorted(all_pos_counts.items(), key=lambda x: x[1], reverse=True) for pos, count in sorted_pos[:100]: percentage = (count / sum(all_pos_counts.values())) * 100 print(f"{pos:<20} {count:>10,} ({percentage:5.2f}%)") if len(sorted_pos) > 100: print(f"\n... and {len(sorted_pos) - 100} more POS values") # Show all unique POS values (alphabetical) print("\n" + "=" * 80) print("ALL UNIQUE POS VALUES (ALPHABETICAL)") print("=" * 80) for pos, count in sorted(all_pos_counts.items(), key=lambda x: x[0].lower()): print(f"{pos:<30} {count:>10,}") # Per-file breakdown print("\n" + "=" * 80) print("PER-FILE BREAKDOWN") print("=" * 80) for filename, pos_counter in sorted(file_results.items()): total_entries = sum(pos_counter.values()) if total_entries > 0: print(f"\n{os.path.basename(filename)}:") print(f" Total entries: {total_entries:,}") print(f" Unique POS values: {len(pos_counter)}") # All POS values for this file (sorted by frequency) all_pos = sorted(pos_counter.items(), key=lambda x: x[1], reverse=True) for pos, count in all_pos: print(f" {pos:<15} {count:>8,}") print(f"\nProcessing completed in {processing_time:.2f} seconds") if __name__ == "__main__": main()