473 lines
18 KiB
Python
473 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Tests for JSONL Schema Analyzer
|
||
|
||
Comprehensive tests for the JSONL schema analyzer functionality.
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import tempfile
|
||
import unittest
|
||
from pathlib import Path
|
||
import sys
|
||
|
||
# Add the scripts directory to the path so we can import the analyzer
|
||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||
|
||
from jsonl_schema_analyzer import JSONLSchemaAnalyzer
|
||
|
||
|
||
class TestJSONLSchemaAnalyzer(unittest.TestCase):
|
||
"""Test cases for JSONLSchemaAnalyzer class."""
|
||
|
||
def setUp(self):
|
||
"""Set up test fixtures."""
|
||
self.analyzer = JSONLSchemaAnalyzer(max_samples=100)
|
||
self.temp_dir = tempfile.mkdtemp()
|
||
self.temp_dir_path = Path(self.temp_dir)
|
||
|
||
def tearDown(self):
|
||
"""Clean up test fixtures."""
|
||
# Clean up temporary files
|
||
import shutil
|
||
shutil.rmtree(self.temp_dir)
|
||
|
||
def create_test_jsonl_file(self, filename: str, data: list) -> Path:
|
||
"""Create a test JSONL file with the given data."""
|
||
file_path = self.temp_dir_path / filename
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
for item in data:
|
||
f.write(json.dumps(item, ensure_ascii=False) + '\n')
|
||
|
||
return file_path
|
||
|
||
def test_analyze_json_value_simple_types(self):
|
||
"""Test analysis of simple JSON value types."""
|
||
# Test null
|
||
result = self.analyzer.analyze_json_value(None)
|
||
self.assertEqual(result["type"], "null")
|
||
|
||
# Test boolean
|
||
result = self.analyzer.analyze_json_value(True)
|
||
self.assertEqual(result["type"], "boolean")
|
||
|
||
# Test integer
|
||
result = self.analyzer.analyze_json_value(42)
|
||
self.assertEqual(result["type"], "integer")
|
||
|
||
# Test float
|
||
result = self.analyzer.analyze_json_value(3.14)
|
||
self.assertEqual(result["type"], "number")
|
||
|
||
# Test string
|
||
result = self.analyzer.analyze_json_value("hello")
|
||
self.assertEqual(result["type"], "string")
|
||
self.assertEqual(result["sample_length"], 5)
|
||
|
||
def test_analyze_json_value_array(self):
|
||
"""Test analysis of JSON arrays."""
|
||
# Empty array
|
||
result = self.analyzer.analyze_json_value([])
|
||
self.assertEqual(result["type"], "array")
|
||
self.assertEqual(result["item_types"], [])
|
||
self.assertEqual(result["length_range"], [0, 0])
|
||
|
||
# Array with mixed types
|
||
result = self.analyzer.analyze_json_value([1, "hello", True, None])
|
||
self.assertEqual(result["type"], "array")
|
||
self.assertEqual(set(result["item_types"]), {"integer", "string", "boolean", "null"})
|
||
self.assertEqual(result["length_range"], [4, 4])
|
||
|
||
# Array of objects
|
||
result = self.analyzer.analyze_json_value([{"a": 1}, {"b": 2}])
|
||
self.assertEqual(result["type"], "array")
|
||
self.assertEqual(result["item_types"], ["object"])
|
||
self.assertEqual(len(result["sample_items"]), 2)
|
||
|
||
def test_analyze_json_value_object(self):
|
||
"""Test analysis of JSON objects."""
|
||
# Empty object
|
||
result = self.analyzer.analyze_json_value({})
|
||
self.assertEqual(result["type"], "object")
|
||
self.assertEqual(result["properties"], {})
|
||
self.assertEqual(result["required_keys"], [])
|
||
|
||
# Simple object
|
||
result = self.analyzer.analyze_json_value({"name": "test", "age": 25})
|
||
self.assertEqual(result["type"], "object")
|
||
self.assertEqual(result["properties"]["name"]["type"], "string")
|
||
self.assertEqual(result["properties"]["age"]["type"], "integer")
|
||
self.assertEqual(set(result["required_keys"]), {"name", "age"})
|
||
|
||
# Nested object
|
||
result = self.analyzer.analyze_json_value({
|
||
"user": {"name": "test", "age": 25},
|
||
"tags": ["a", "b", "c"]
|
||
})
|
||
self.assertEqual(result["type"], "object")
|
||
self.assertEqual(result["properties"]["user"]["type"], "object")
|
||
self.assertEqual(result["properties"]["tags"]["type"], "array")
|
||
|
||
def test_merge_schemas_same_type(self):
|
||
"""Test merging schemas of the same type."""
|
||
# Merge two integer schemas
|
||
schema1 = {"type": "integer"}
|
||
schema2 = {"type": "integer"}
|
||
result = self.analyzer.merge_schemas(schema1, schema2)
|
||
self.assertEqual(result["type"], "integer")
|
||
|
||
# Merge two string schemas
|
||
schema1 = {"type": "string", "sample_length": 5}
|
||
schema2 = {"type": "string", "sample_length": 10}
|
||
result = self.analyzer.merge_schemas(schema1, schema2)
|
||
self.assertEqual(result["type"], "string")
|
||
self.assertEqual(result["sample_length"], 5) # Keeps first schema's value
|
||
|
||
def test_merge_schemas_different_types(self):
|
||
"""Test merging schemas of different types."""
|
||
schema1 = {"type": "integer"}
|
||
schema2 = {"type": "string"}
|
||
result = self.analyzer.merge_schemas(schema1, schema2)
|
||
self.assertEqual(result["type"], "union")
|
||
self.assertEqual(set(result["possible_types"]), {"integer", "string"})
|
||
|
||
def test_merge_schemas_arrays(self):
|
||
"""Test merging array schemas."""
|
||
schema1 = {
|
||
"type": "array",
|
||
"item_types": ["integer", "string"],
|
||
"length_range": [2, 5]
|
||
}
|
||
schema2 = {
|
||
"type": "array",
|
||
"item_types": ["boolean"],
|
||
"length_range": [1, 3]
|
||
}
|
||
result = self.analyzer.merge_schemas(schema1, schema2)
|
||
self.assertEqual(result["type"], "array")
|
||
self.assertEqual(set(result["item_types"]), {"integer", "string", "boolean"})
|
||
self.assertEqual(result["length_range"], [1, 5])
|
||
|
||
def test_merge_schemas_objects(self):
|
||
"""Test merging object schemas."""
|
||
schema1 = {
|
||
"type": "object",
|
||
"properties": {
|
||
"name": {"type": "string"},
|
||
"age": {"type": "integer"}
|
||
},
|
||
"required_keys": ["name", "age"]
|
||
}
|
||
schema2 = {
|
||
"type": "object",
|
||
"properties": {
|
||
"name": {"type": "string"},
|
||
"email": {"type": "string"}
|
||
},
|
||
"required_keys": ["name", "email"]
|
||
}
|
||
result = self.analyzer.merge_schemas(schema1, schema2)
|
||
self.assertEqual(result["type"], "object")
|
||
self.assertEqual(set(result["required_keys"]), {"name", "age", "email"})
|
||
self.assertEqual(result["properties"]["name"]["type"], "string")
|
||
self.assertEqual(result["properties"]["age"]["type"], "integer")
|
||
self.assertEqual(result["properties"]["email"]["type"], "string")
|
||
|
||
def test_extract_all_keys(self):
|
||
"""Test extraction of all keys from JSON objects."""
|
||
# Simple object
|
||
obj = {"name": "test", "age": 25}
|
||
keys = self.analyzer._extract_all_keys(obj)
|
||
self.assertEqual(set(keys), {"name", "age"})
|
||
|
||
# Nested object
|
||
obj = {
|
||
"user": {"name": "test", "age": 25},
|
||
"tags": ["a", "b", "c"]
|
||
}
|
||
keys = self.analyzer._extract_all_keys(obj)
|
||
# The current implementation only extracts object keys, not array indices
|
||
expected_keys = {"user", "user.name", "user.age", "tags"}
|
||
self.assertEqual(set(keys), expected_keys)
|
||
|
||
# Array of objects
|
||
obj = [{"name": "test1"}, {"name": "test2", "age": 25}]
|
||
keys = self.analyzer._extract_all_keys(obj)
|
||
# For arrays of objects, we should get the object properties with indices
|
||
expected_keys = {"[0].name", "[1].name", "[1].age"}
|
||
self.assertEqual(set(keys), expected_keys)
|
||
|
||
def test_analyze_jsonl_file_simple(self):
|
||
"""Test analyzing a simple JSONL file."""
|
||
data = [
|
||
{"name": "Alice", "age": 30},
|
||
{"name": "Bob", "age": 25, "city": "NYC"},
|
||
{"name": "Charlie", "age": 35, "city": "LA", "hobbies": ["reading", "coding"]}
|
||
]
|
||
|
||
file_path = self.create_test_jsonl_file("test.jsonl", data)
|
||
result = self.analyzer.analyze_jsonl_file(file_path)
|
||
|
||
# Check basic statistics
|
||
self.assertEqual(result["total_lines"], 3)
|
||
self.assertEqual(result["valid_lines"], 3)
|
||
self.assertEqual(result["error_lines"], 0)
|
||
self.assertEqual(result["sample_count"], 3)
|
||
|
||
# Check keys
|
||
self.assertIn("name", result["all_keys"])
|
||
self.assertIn("age", result["all_keys"])
|
||
self.assertIn("city", result["all_keys"])
|
||
self.assertIn("hobbies", result["all_keys"])
|
||
|
||
# Check schema
|
||
self.assertEqual(result["schema"]["type"], "object")
|
||
self.assertIn("name", result["schema"]["properties"])
|
||
self.assertIn("age", result["schema"]["properties"])
|
||
self.assertIn("city", result["schema"]["properties"])
|
||
self.assertIn("hobbies", result["schema"]["properties"])
|
||
|
||
def test_analyze_jsonl_file_with_errors(self):
|
||
"""Test analyzing a JSONL file with invalid JSON lines."""
|
||
data = [
|
||
{"name": "Alice", "age": 30},
|
||
"invalid json line",
|
||
{"name": "Bob", "age": 25},
|
||
"another invalid line"
|
||
]
|
||
|
||
file_path = self.create_test_jsonl_file("test_errors.jsonl", data)
|
||
|
||
# Manually write invalid lines
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write('{"name": "Alice", "age": 30}\n')
|
||
f.write('invalid json line\n')
|
||
f.write('{"name": "Bob", "age": 25}\n')
|
||
f.write('another invalid line\n')
|
||
|
||
result = self.analyzer.analyze_jsonl_file(file_path)
|
||
|
||
self.assertEqual(result["total_lines"], 4)
|
||
self.assertEqual(result["valid_lines"], 2)
|
||
self.assertEqual(result["error_lines"], 2)
|
||
|
||
def test_analyze_jsonl_file_empty(self):
|
||
"""Test analyzing an empty JSONL file."""
|
||
file_path = self.create_test_jsonl_file("empty.jsonl", [])
|
||
result = self.analyzer.analyze_jsonl_file(file_path)
|
||
|
||
self.assertEqual(result["total_lines"], 0)
|
||
self.assertEqual(result["valid_lines"], 0)
|
||
self.assertEqual(result["sample_count"], 0)
|
||
self.assertEqual(result["unique_key_count"], 0)
|
||
|
||
def test_analyze_jsonl_file_nonexistent(self):
|
||
"""Test analyzing a non-existent file."""
|
||
with self.assertRaises(FileNotFoundError):
|
||
self.analyzer.analyze_jsonl_file("nonexistent.jsonl")
|
||
|
||
def test_analyze_directory(self):
|
||
"""Test analyzing a directory of JSONL files."""
|
||
# Create multiple test files
|
||
data1 = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
|
||
data2 = [{"city": "NYC", "population": 8000000}, {"city": "LA", "population": 4000000}]
|
||
data3 = [{"product": "laptop", "price": 999.99}]
|
||
|
||
self.create_test_jsonl_file("file1.jsonl", data1)
|
||
self.create_test_jsonl_file("file2.jsonl", data2)
|
||
self.create_test_jsonl_file("file3.jsonl", data3)
|
||
|
||
# Create a non-JSONL file to test filtering
|
||
(self.temp_dir_path / "not_jsonl.txt").write_text("not a jsonl file")
|
||
|
||
result = self.analyzer.analyze_directory(self.temp_dir_path)
|
||
|
||
self.assertEqual(result["summary"]["total_files"], 3)
|
||
self.assertEqual(result["summary"]["successfully_analyzed"], 3)
|
||
|
||
# Check that all files were analyzed
|
||
self.assertIn("file1.jsonl", result["files"])
|
||
self.assertIn("file2.jsonl", result["files"])
|
||
self.assertIn("file3.jsonl", result["files"])
|
||
|
||
def test_analyze_directory_no_files(self):
|
||
"""Test analyzing a directory with no JSONL files."""
|
||
empty_dir = self.temp_dir_path / "empty"
|
||
empty_dir.mkdir()
|
||
|
||
result = self.analyzer.analyze_directory(empty_dir)
|
||
|
||
self.assertEqual(result["files"], [])
|
||
self.assertEqual(result["summary"], {})
|
||
|
||
def test_save_results(self):
|
||
"""Test saving analysis results to a file."""
|
||
data = [{"name": "Alice", "age": 30}]
|
||
file_path = self.create_test_jsonl_file("test.jsonl", data)
|
||
result = self.analyzer.analyze_jsonl_file(file_path)
|
||
|
||
output_path = self.temp_dir_path / "results.json"
|
||
self.analyzer.save_results(result, output_path)
|
||
|
||
# Verify the file was created and contains valid JSON
|
||
self.assertTrue(output_path.exists())
|
||
|
||
with open(output_path, 'r', encoding='utf-8') as f:
|
||
saved_data = json.load(f)
|
||
|
||
self.assertEqual(saved_data["file_path"], str(file_path))
|
||
self.assertEqual(saved_data["valid_lines"], 1)
|
||
|
||
def test_complex_nested_structure(self):
|
||
"""Test analysis of complex nested JSON structures."""
|
||
data = [
|
||
{
|
||
"word": "test",
|
||
"lang": "en",
|
||
"pos": "noun",
|
||
"senses": [
|
||
{
|
||
"glosses": ["a test"],
|
||
"examples": [{"text": "This is a test"}],
|
||
"tags": ["main"]
|
||
}
|
||
],
|
||
"translations": [
|
||
{"lang_code": "es", "word": "prueba"},
|
||
{"lang_code": "fr", "word": "test"}
|
||
],
|
||
"metadata": {"created": "2023-01-01", "version": 1}
|
||
}
|
||
]
|
||
|
||
file_path = self.create_test_jsonl_file("complex.jsonl", data)
|
||
result = self.analyzer.analyze_jsonl_file(file_path)
|
||
|
||
# Check that complex structure is properly analyzed
|
||
schema = result["schema"]
|
||
self.assertEqual(schema["type"], "object")
|
||
|
||
# Check nested structures
|
||
self.assertEqual(schema["properties"]["senses"]["type"], "array")
|
||
self.assertEqual(schema["properties"]["translations"]["type"], "array")
|
||
self.assertEqual(schema["properties"]["metadata"]["type"], "object")
|
||
|
||
# Check that all expected keys are found
|
||
# Adjust expectations based on actual key extraction behavior
|
||
expected_core_keys = [
|
||
"word", "lang", "pos", "senses", "translations", "metadata"
|
||
]
|
||
expected_nested_keys = [
|
||
"senses[0].glosses", "senses[0].examples", "senses[0].examples[0].text",
|
||
"senses[0].tags", "translations[0].lang_code", "translations[0].word",
|
||
"translations[1].lang_code", "translations[1].word", "metadata.created", "metadata.version"
|
||
]
|
||
|
||
found_keys = set(result["all_keys"].keys())
|
||
|
||
# Check core keys are present
|
||
for key in expected_core_keys:
|
||
self.assertIn(key, found_keys, f"Core key '{key}' not found in analysis")
|
||
|
||
# Check that we have some nested keys (the exact indices may vary)
|
||
nested_found = any(key in found_keys for key in expected_nested_keys)
|
||
self.assertTrue(nested_found, "No nested keys found in analysis")
|
||
|
||
def test_max_samples_limit(self):
|
||
"""Test that the max_samples limit is respected."""
|
||
# Create a file with many records
|
||
data = [{"id": i, "value": f"item_{i}"} for i in range(100)]
|
||
file_path = self.create_test_jsonl_file("large.jsonl", data)
|
||
|
||
# Create analyzer with small sample limit
|
||
analyzer = JSONLSchemaAnalyzer(max_samples=10)
|
||
result = analyzer.analyze_jsonl_file(file_path)
|
||
|
||
self.assertEqual(result["sample_count"], 10)
|
||
self.assertEqual(result["valid_lines"], 100) # All lines should be counted
|
||
|
||
|
||
class TestIntegration(unittest.TestCase):
|
||
"""Integration tests for the JSONL schema analyzer."""
|
||
|
||
def setUp(self):
|
||
"""Set up integration test fixtures."""
|
||
self.temp_dir = tempfile.mkdtemp()
|
||
self.temp_dir_path = Path(self.temp_dir)
|
||
|
||
def tearDown(self):
|
||
"""Clean up integration test fixtures."""
|
||
import shutil
|
||
shutil.rmtree(self.temp_dir)
|
||
|
||
def test_real_world_like_data(self):
|
||
"""Test with data that resembles real-world dictionary data."""
|
||
data = [
|
||
{
|
||
"word": "dictionary",
|
||
"lang_code": "en",
|
||
"lang": "English",
|
||
"pos": "noun",
|
||
"pos_title": "noun",
|
||
"senses": [
|
||
{
|
||
"glosses": ["a reference work"],
|
||
"examples": [{"text": "I looked it up in the dictionary"}],
|
||
"tags": ["main"]
|
||
}
|
||
],
|
||
"sounds": [{"ipa": "/ˈdɪk.ʃə.nə.ɹi/"}],
|
||
"translations": [
|
||
{"lang_code": "es", "lang": "Spanish", "word": "diccionario"},
|
||
{"lang_code": "fr", "lang": "French", "word": "dictionnaire"}
|
||
]
|
||
},
|
||
{
|
||
"word": "test",
|
||
"lang_code": "en",
|
||
"lang": "English",
|
||
"pos": "noun",
|
||
"pos_title": "noun",
|
||
"senses": [
|
||
{
|
||
"glosses": ["a procedure"],
|
||
"examples": [{"text": "We ran a test"}]
|
||
}
|
||
],
|
||
"forms": [{"form": "tests", "tags": ["plural"]}],
|
||
"etymology_text": "From Latin testum"
|
||
}
|
||
]
|
||
|
||
file_path = self.temp_dir_path / "dictionary.jsonl"
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
for item in data:
|
||
f.write(json.dumps(item, ensure_ascii=False) + '\n')
|
||
|
||
analyzer = JSONLSchemaAnalyzer()
|
||
result = analyzer.analyze_jsonl_file(file_path)
|
||
|
||
# Verify the analysis captures the structure
|
||
self.assertEqual(result["valid_lines"], 2)
|
||
self.assertIn("word", result["all_keys"])
|
||
self.assertIn("lang_code", result["all_keys"])
|
||
self.assertIn("senses", result["all_keys"])
|
||
self.assertIn("translations", result["all_keys"])
|
||
self.assertIn("forms", result["all_keys"])
|
||
|
||
# Check schema structure
|
||
schema = result["schema"]
|
||
self.assertEqual(schema["type"], "object")
|
||
self.assertIn("word", schema["properties"])
|
||
self.assertIn("senses", schema["properties"])
|
||
|
||
# Check that optional fields are handled correctly
|
||
self.assertIn("translations", schema["properties"])
|
||
self.assertIn("forms", schema["properties"])
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|