#!/usr/bin/env python3 """ Hybrid JSONL Schema Analyzer Intelligently chooses between sequential and parallel processing based on file size. For small files, uses sequential processing. For large files, uses parallel processing. """ import json import os import sys import time import mmap from collections import defaultdict, Counter from typing import Dict, List, Any, Set, Union, Tuple import argparse from pathlib import Path from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from multiprocessing import cpu_count import threading from functools import partial import gc # Import the optimized analyzer for parallel processing sys.path.insert(0, str(Path(__file__).parent)) try: from jsonl_schema_analyzer_optimized import OptimizedJSONLSchemaAnalyzer except ImportError: print("Warning: Could not import optimized analyzer, using fallback") OptimizedJSONLSchemaAnalyzer = None class HybridJSONLSchemaAnalyzer: """Hybrid analyzer that intelligently chooses processing strategy.""" def __init__(self, max_samples: int = 1000, max_workers: int = None, parallel_threshold_mb: int = 100, chunk_size: int = 1000): """ Initialize the hybrid analyzer. Args: max_samples: Maximum number of JSON objects to sample per file max_workers: Maximum number of worker processes (default: cpu_count) parallel_threshold_mb: File size threshold in MB to use parallel processing chunk_size: Number of lines to process in each chunk """ self.max_samples = max_samples self.max_workers = max_workers or min(cpu_count(), 8) self.parallel_threshold_mb = parallel_threshold_mb self.chunk_size = chunk_size # Import the original analyzer for small files sys.path.insert(0, str(Path(__file__).parent)) try: from jsonl_schema_analyzer import JSONLSchemaAnalyzer self.sequential_analyzer = JSONLSchemaAnalyzer(max_samples=max_samples) except ImportError: print("Warning: Could not import sequential analyzer") self.sequential_analyzer = None # Initialize optimized analyzer for large files if OptimizedJSONLSchemaAnalyzer: self.parallel_analyzer = OptimizedJSONLSchemaAnalyzer( max_samples=max_samples, max_workers=max_workers, chunk_size=chunk_size ) else: self.parallel_analyzer = None print(f"Hybrid analyzer initialized:") print(f" Parallel threshold: {parallel_threshold_mb} MB") print(f" Max workers: {self.max_workers}") print(f" Chunk size: {self.chunk_size}") def analyze_jsonl_file(self, file_path: Union[str, Path]) -> Dict[str, Any]: """ Analyze a JSONL file using the appropriate strategy. Args: file_path: Path to the JSONL file Returns: Dictionary containing schema analysis results """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Get file size in MB file_size_mb = file_path.stat().st_size / (1024 * 1024) print(f"Analyzing {file_path.name} ({file_size_mb:.2f} MB)...") # Choose processing strategy if file_size_mb >= self.parallel_threshold_mb and self.parallel_analyzer: print(f" Using parallel processing (file >= {self.parallel_threshold_mb} MB)") result = self.parallel_analyzer.analyze_jsonl_file(file_path) result["processing_strategy"] = "parallel" elif self.sequential_analyzer: print(f" Using sequential processing (file < {self.parallel_threshold_mb} MB)") result = self.sequential_analyzer.analyze_jsonl_file(file_path) result["processing_strategy"] = "sequential" else: # Fallback to parallel if sequential not available print(f" Using parallel processing (sequential analyzer unavailable)") if self.parallel_analyzer: result = self.parallel_analyzer.analyze_jsonl_file(file_path) result["processing_strategy"] = "parallel_fallback" else: raise RuntimeError("No analyzer available") # Add hybrid-specific metadata result["file_size_mb"] = file_size_mb result["parallel_threshold_mb"] = self.parallel_threshold_mb return result def analyze_directory(self, directory_path: Union[str, Path], pattern: str = "*.jsonl") -> Dict[str, Any]: """ Analyze all JSONL files in a directory using hybrid processing. Args: directory_path: Path to directory containing JSONL files pattern: File pattern to match (default: *.jsonl) Returns: Dictionary containing analysis results for all files """ directory_path = Path(directory_path) if not directory_path.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") # Find all JSONL files jsonl_files = list(directory_path.glob(pattern)) if not jsonl_files: print(f"No JSONL files found in {directory_path} with pattern {pattern}") return {"files": [], "summary": {}} print(f"Found {len(jsonl_files)} JSONL files to analyze...") start_time = time.time() # Categorize files by size small_files = [] large_files = [] for file_path in jsonl_files: size_mb = file_path.stat().st_size / (1024 * 1024) if size_mb >= self.parallel_threshold_mb: large_files.append(file_path) else: small_files.append(file_path) print(f" Small files (< {self.parallel_threshold_mb} MB): {len(small_files)}") print(f" Large files (>= {self.parallel_threshold_mb} MB): {len(large_files)}") file_results = {} # Process small files sequentially (they're fast anyway) if small_files and self.sequential_analyzer: print(f"Processing {len(small_files)} small files sequentially...") for file_path in small_files: try: result = self.analyze_jsonl_file(file_path) file_results[file_path.name] = result except Exception as e: print(f"Error analyzing {file_path.name}: {e}") file_results[file_path.name] = {"error": str(e)} # Process large files in parallel if large_files and self.parallel_analyzer: print(f"Processing {len(large_files)} large files in parallel...") if len(large_files) == 1: # Single large file - just process it directly file_path = large_files[0] try: result = self.analyze_jsonl_file(file_path) file_results[file_path.name] = result except Exception as e: print(f"Error analyzing {file_path.name}: {e}") file_results[file_path.name] = {"error": str(e)} else: # Multiple large files - process in parallel with ThreadPoolExecutor(max_workers=min(len(large_files), self.max_workers)) as executor: future_to_file = { executor.submit(self.analyze_jsonl_file, file_path): file_path for file_path in large_files } for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() file_results[file_path.name] = result except Exception as e: print(f"Error analyzing {file_path.name}: {e}") file_results[file_path.name] = {"error": str(e)} # Create summary successful_results = [r for r in file_results.values() if "error" not in r] summary = { "total_files": len(jsonl_files), "small_files": len(small_files), "large_files": len(large_files), "successfully_analyzed": len(successful_results), "total_size_bytes": sum( r.get("file_size_bytes", 0) for r in successful_results ), "total_lines": sum( r.get("total_lines", 0) for r in successful_results ), "total_valid_lines": sum( r.get("valid_lines", 0) for r in successful_results ), "total_processing_time": sum( r.get("processing_time_seconds", 0) for r in successful_results ), "parallel_threshold_mb": self.parallel_threshold_mb, "strategies_used": { "sequential": len([r for r in successful_results if r.get("processing_strategy") == "sequential"]), "parallel": len([r for r in successful_results if r.get("processing_strategy") in ["parallel", "parallel_fallback"]]) } } # Calculate processing speed if summary["total_processing_time"] > 0: total_mb = summary["total_size_bytes"] / (1024 * 1024) summary["average_processing_speed_mb_per_sec"] = total_mb / summary["total_processing_time"] elapsed_time = time.time() - start_time summary["total_elapsed_time"] = elapsed_time print(f"\nDirectory analysis completed in {elapsed_time:.2f}s") print(f"Processed {summary['total_valid_lines']:,} valid lines from {summary['successfully_analyzed']} files") print(f"Sequential: {summary['strategies_used']['sequential']}, Parallel: {summary['strategies_used']['parallel']}") print(f"Average speed: {summary['average_processing_speed_mb_per_sec']:.2f} MB/sec") return { "directory": str(directory_path), "pattern": pattern, "files": file_results, "summary": summary } def save_results(self, results: Dict[str, Any], output_path: Union[str, Path]): """ Save analysis results to a JSON file. Args: results: Analysis results to save output_path: Path to save the results """ output_path = Path(output_path) try: start_time = time.time() with open(output_path, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) save_time = time.time() - start_time file_size = output_path.stat().st_size print(f"Results saved to {output_path} ({file_size / (1024*1024):.2f} MB) in {save_time:.2f}s") except Exception as e: raise RuntimeError(f"Error saving results to {output_path}: {e}") def main(): """Main function for command-line usage.""" parser = argparse.ArgumentParser( description="Hybrid JSONL schema analyzer with intelligent processing strategy" ) parser.add_argument( "path", help="Path to JSONL file or directory containing JSONL files" ) parser.add_argument( "-o", "--output", help="Output file for analysis results (JSON format)" ) parser.add_argument( "-p", "--pattern", default="*.jsonl", help="File pattern when analyzing directory (default: *.jsonl)" ) parser.add_argument( "-s", "--max-samples", type=int, default=1000, help="Maximum number of JSON objects to sample per file (default: 1000)" ) parser.add_argument( "-w", "--workers", type=int, default=None, help="Number of worker processes for parallel processing (default: CPU count, max 8)" ) parser.add_argument( "-t", "--threshold", type=int, default=100, help="File size threshold in MB for parallel processing (default: 100)" ) parser.add_argument( "-c", "--chunk-size", type=int, default=1000, help="Number of lines to process in each chunk (default: 1000)" ) parser.add_argument( "--directory", action="store_true", help="Treat path as directory instead of single file" ) args = parser.parse_args() # Initialize hybrid analyzer analyzer = HybridJSONLSchemaAnalyzer( max_samples=args.max_samples, max_workers=args.workers, parallel_threshold_mb=args.threshold, chunk_size=args.chunk_size ) try: start_time = time.time() # Analyze file or directory if args.directory or Path(args.path).is_dir(): results = analyzer.analyze_directory(args.path, args.pattern) else: results = analyzer.analyze_jsonl_file(args.path) total_time = time.time() - start_time # Save or print results if args.output: analyzer.save_results(results, args.output) else: print("\n" + "="*50) print("ANALYSIS RESULTS") print("="*50) print(json.dumps(results, indent=2, ensure_ascii=False)) print(f"\nTotal analysis time: {total_time:.2f}s") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()