505 lines
21 KiB
Python
505 lines
21 KiB
Python
"""
|
|
Excel Filter Module
|
|
Main functionality for filtering Excel files based on regex patterns
|
|
"""
|
|
|
|
import re
|
|
import pandas as pd
|
|
from typing import List, Dict, Any, Optional
|
|
import logging
|
|
import time
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Simple translation system for backend messages
|
|
class BackendTranslations:
|
|
"""
|
|
Simple translation system for backend modules
|
|
"""
|
|
|
|
def __init__(self, language="de"):
|
|
self.current_language = language
|
|
self.translations = {}
|
|
|
|
# Load translations from JSON files
|
|
self.load_translations()
|
|
|
|
def load_translations(self):
|
|
"""
|
|
Load translation files from the locales directory
|
|
"""
|
|
# Get the directory where this script is located
|
|
script_dir = Path(__file__).parent
|
|
locales_dir = script_dir / "locales"
|
|
|
|
# Load the language file
|
|
lang_file = locales_dir / f"{self.current_language}.json"
|
|
if lang_file.exists():
|
|
try:
|
|
with open(lang_file, 'r', encoding='utf-8') as f:
|
|
self.translations = json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {lang_file}: {e}")
|
|
self.translations = {}
|
|
else:
|
|
self.translations = {}
|
|
|
|
def get(self, key, default=None):
|
|
"""
|
|
Get a translation for a key
|
|
"""
|
|
return self.translations.get(key, default if default is not None else key)
|
|
|
|
def __getitem__(self, key):
|
|
"""
|
|
Get a translation using dictionary-style access
|
|
"""
|
|
return self.get(key)
|
|
|
|
# Global backend translations instance
|
|
_backend_translations = BackendTranslations()
|
|
|
|
def get_backend_translation(key, **kwargs):
|
|
"""
|
|
Get a backend translation and format it with provided arguments
|
|
"""
|
|
message = _backend_translations.get(key, key)
|
|
if kwargs:
|
|
try:
|
|
message = message.format(**kwargs)
|
|
except (KeyError, ValueError):
|
|
pass # Keep original message if formatting fails
|
|
return message
|
|
|
|
|
|
class ExcelFilter:
|
|
"""
|
|
Class for filtering Excel files based on regex patterns and numeric filters
|
|
"""
|
|
|
|
def __init__(self, input_file: str, output_file: str, pattern: str = None,
|
|
sheet_name: str = None, columns: List[str] = None,
|
|
numeric_filter: Dict[str, Any] = None, language: str = "de"):
|
|
"""
|
|
Initializes the ExcelFilter
|
|
|
|
Args:
|
|
input_file: Path to the input file
|
|
output_file: Path to the output file
|
|
pattern: Regex pattern for filtering (optional)
|
|
sheet_name: Name of the worksheet (optional)
|
|
columns: List of column names to search (optional)
|
|
numeric_filter: Dictionary with numeric filter settings (optional)
|
|
Format: {'column': str or None, 'operator': str, 'value': float}
|
|
If 'column' is None, the filter applies to all columns
|
|
"""
|
|
self.input_file = input_file
|
|
self.output_file = output_file
|
|
self.pattern = pattern
|
|
self.sheet_name = sheet_name
|
|
self.columns = columns
|
|
self.numeric_filter = numeric_filter
|
|
|
|
# Statistics collection
|
|
self.stats = {
|
|
'start_time': None,
|
|
'end_time': None,
|
|
'input_file_size': 0,
|
|
'output_file_size': 0,
|
|
'input_rows': 0,
|
|
'input_columns': 0,
|
|
'output_rows': 0,
|
|
'output_columns': 0,
|
|
'memory_usage_mb': 0,
|
|
'filters_applied': [],
|
|
'processing_time_seconds': 0,
|
|
'compression_ratio': 0.0,
|
|
'rows_filtered': 0,
|
|
'rows_removed': 0
|
|
}
|
|
|
|
# Log initialization with all parameters
|
|
logger.info(f"ExcelFilter initialized: input_file='{input_file}', output_file='{output_file}', "
|
|
f"pattern='{pattern}', sheet_name='{sheet_name}', columns={columns}, "
|
|
f"numeric_filter={numeric_filter}")
|
|
|
|
def read_excel(self) -> pd.DataFrame:
|
|
"""
|
|
Reads the Excel file and returns a DataFrame
|
|
"""
|
|
try:
|
|
# Get input file size
|
|
if os.path.exists(self.input_file):
|
|
self.stats['input_file_size'] = os.path.getsize(self.input_file)
|
|
|
|
if self.sheet_name:
|
|
df = pd.read_excel(self.input_file, sheet_name=self.sheet_name)
|
|
else:
|
|
df = pd.read_excel(self.input_file)
|
|
|
|
# Collect input statistics
|
|
self.stats['input_rows'] = len(df)
|
|
self.stats['input_columns'] = len(df.columns)
|
|
self.stats['memory_usage_mb'] = df.memory_usage(deep=True).sum() / (1024 * 1024)
|
|
|
|
logger.info(get_backend_translation("input_file_loaded", rows=len(df), columns=len(df.columns)))
|
|
logger.info(get_backend_translation("file_size_info", size=self.stats['input_file_size'] / (1024*1024)))
|
|
logger.info(get_backend_translation("memory_usage_info", size=self.stats['memory_usage_mb']))
|
|
|
|
return df
|
|
except FileNotFoundError:
|
|
logger.error(get_backend_translation("file_not_found_error", input_file=self.input_file))
|
|
raise FileNotFoundError(f"The file {self.input_file} was not found")
|
|
except Exception as e:
|
|
logger.error(get_backend_translation("error_reading_excel_file", error=str(e)))
|
|
raise Exception(f"Error reading the Excel file: {e}")
|
|
|
|
def filter_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Filters the DataFrame based on regex patterns and/or numeric filters
|
|
"""
|
|
try:
|
|
filtered_df = df
|
|
applied_filters = []
|
|
|
|
# Apply regex filtering if pattern is provided
|
|
if self.pattern and self.pattern.strip():
|
|
filtered_df = self._apply_regex_filter(filtered_df)
|
|
applied_filters.append("Regex")
|
|
|
|
# Apply numeric filtering if enabled
|
|
if self.numeric_filter:
|
|
filtered_df = self._apply_numeric_filter(filtered_df)
|
|
applied_filters.append("Numeric")
|
|
|
|
# Update statistics
|
|
self.stats['filters_applied'] = applied_filters
|
|
self.stats['rows_filtered'] = len(filtered_df)
|
|
self.stats['rows_removed'] = len(df) - len(filtered_df)
|
|
|
|
if not applied_filters:
|
|
logger.warning(get_backend_translation("no_filter_criteria_specified"))
|
|
logger.info(get_backend_translation("no_filters_applied_rows_remain", rows=len(df)))
|
|
return df
|
|
|
|
# Calculate filtering efficiency
|
|
retention_rate = (len(filtered_df) / len(df)) * 100 if len(df) > 0 else 0
|
|
removal_rate = (self.stats['rows_removed'] / len(df)) * 100 if len(df) > 0 else 0
|
|
|
|
logger.info(get_backend_translation("filters_applied_list", filters=', '.join(applied_filters)))
|
|
logger.info(get_backend_translation("filter_results_summary", retained=len(filtered_df), removed=self.stats['rows_removed']))
|
|
logger.info(get_backend_translation("retention_removal_rates", retention=retention_rate, removal=removal_rate))
|
|
|
|
return filtered_df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error filtering: {e}")
|
|
raise Exception(f"Error filtering: {e}")
|
|
|
|
def _apply_regex_filter(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Applies regex filtering to the DataFrame
|
|
"""
|
|
try:
|
|
# Compile the regex pattern
|
|
# Intelligent pattern recognition:
|
|
# - If the pattern contains spaces, search as exact phrase
|
|
# - If the pattern seems to be a complete word, use word boundaries
|
|
# - Otherwise allow substring matching
|
|
|
|
if ' ' in self.pattern:
|
|
# Exact phrase with word boundaries
|
|
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
|
|
elif len(self.pattern) <= 4:
|
|
# Short patterns (4 or fewer characters) - allow substring matching
|
|
regex_pattern = self.pattern
|
|
elif len(self.pattern) > 2 and self.pattern.isalpha():
|
|
# Probably a complete word
|
|
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
|
|
else:
|
|
# Substring matching for other cases
|
|
regex_pattern = self.pattern
|
|
|
|
regex = re.compile(regex_pattern, re.IGNORECASE)
|
|
logger.info(get_backend_translation("regex_pattern_compiled", original=self.pattern, compiled=regex_pattern))
|
|
|
|
# Determine the columns to search
|
|
if self.columns:
|
|
columns_to_search = self.columns
|
|
logger.info(get_backend_translation("regex_filter_searching_columns", columns=columns_to_search))
|
|
else:
|
|
columns_to_search = df.columns
|
|
logger.info(get_backend_translation("regex_filter_searching_all_columns", columns=list(columns_to_search)))
|
|
|
|
# Filter function with detailed logging
|
|
def regex_filter_row(row):
|
|
row_matches = False
|
|
for col in columns_to_search:
|
|
if col in row and pd.notna(row[col]):
|
|
cell_value = str(row[col])
|
|
if regex.search(cell_value):
|
|
logger.debug(get_backend_translation("regex_match_found", row=row.name, column=col, value=cell_value))
|
|
row_matches = True
|
|
break
|
|
|
|
return row_matches
|
|
|
|
# Apply filter
|
|
filtered_df = df[df.apply(regex_filter_row, axis=1)]
|
|
logger.info(get_backend_translation("regex_filter_results", rows=len(filtered_df)))
|
|
|
|
return filtered_df
|
|
|
|
except re.error as e:
|
|
logger.error(get_backend_translation("invalid_regex_pattern", error=str(e)))
|
|
raise Exception(f"Invalid regex pattern: {e}")
|
|
|
|
def _apply_numeric_filter(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Applies numeric filtering to the DataFrame
|
|
"""
|
|
column = self.numeric_filter['column']
|
|
operator = self.numeric_filter['operator']
|
|
value = self.numeric_filter['value']
|
|
|
|
logger.info(get_backend_translation("numeric_filter_applied", column=column, operator=operator, value=value))
|
|
|
|
if column is None:
|
|
# Apply filter across all columns - a row matches if ANY column meets the criteria
|
|
return self._apply_numeric_filter_all_columns(df, operator, value)
|
|
else:
|
|
# Apply filter to specific column
|
|
return self._apply_numeric_filter_single_column(df, column, operator, value)
|
|
|
|
def _apply_numeric_filter_single_column(self, df: pd.DataFrame, column: str,
|
|
operator: str, value: float) -> pd.DataFrame:
|
|
"""
|
|
Apply numeric filter to a single column
|
|
"""
|
|
# Check if the column exists
|
|
if column not in df.columns:
|
|
logger.error(get_backend_translation("column_does_not_exist", column=column))
|
|
raise ValueError(f"Column '{column}' does not exist in the DataFrame")
|
|
|
|
# Convert the column to numeric values (ignore errors for non-numeric values)
|
|
numeric_series = pd.to_numeric(df[column], errors='coerce')
|
|
|
|
# Apply the comparison operator
|
|
if operator == '>':
|
|
mask = numeric_series > value
|
|
elif operator == '<':
|
|
mask = numeric_series < value
|
|
elif operator == '>=':
|
|
mask = numeric_series >= value
|
|
elif operator == '<=':
|
|
mask = numeric_series <= value
|
|
elif operator == '=':
|
|
mask = numeric_series == value
|
|
else:
|
|
logger.error(get_backend_translation("unknown_operator", operator=operator))
|
|
raise ValueError(f"Unknown operator: {operator}")
|
|
|
|
# Apply filter
|
|
filtered_df = df[mask]
|
|
logger.info(get_backend_translation("numeric_filter_single_column_results", matches=mask.sum(), total=len(df), column=column, operator=operator, value=value))
|
|
|
|
# Log some examples of the filtered values
|
|
if len(filtered_df) > 0:
|
|
sample_values = filtered_df[column].head(3).tolist()
|
|
logger.debug(get_backend_translation("sample_filtered_values", values=sample_values))
|
|
|
|
return filtered_df
|
|
|
|
def _apply_numeric_filter_all_columns(self, df: pd.DataFrame, operator: str, value: float) -> pd.DataFrame:
|
|
"""
|
|
Apply numeric filter across all columns - a row matches if ANY column meets the criteria
|
|
"""
|
|
logger.info(get_backend_translation("numeric_filter_all_columns", operator=operator, value=value))
|
|
|
|
# Create a mask that will be True for rows where ANY column meets the criteria
|
|
combined_mask = pd.Series([False] * len(df), index=df.index)
|
|
|
|
# Check each column
|
|
for col in df.columns:
|
|
# Convert the column to numeric values
|
|
numeric_series = pd.to_numeric(df[col], errors='coerce')
|
|
|
|
# Apply the comparison operator
|
|
if operator == '>':
|
|
col_mask = numeric_series > value
|
|
elif operator == '<':
|
|
col_mask = numeric_series < value
|
|
elif operator == '>=':
|
|
col_mask = numeric_series >= value
|
|
elif operator == '<=':
|
|
col_mask = numeric_series <= value
|
|
elif operator == '=':
|
|
col_mask = numeric_series == value
|
|
else:
|
|
logger.error(get_backend_translation("unknown_operator", operator=operator))
|
|
raise ValueError(f"Unknown operator: {operator}")
|
|
|
|
# Combine with OR logic (any column matching makes the row match)
|
|
combined_mask = combined_mask | col_mask
|
|
|
|
# Log matches for this column
|
|
matches = col_mask.sum()
|
|
if matches > 0:
|
|
logger.debug(get_backend_translation("column_matches_found", column=col, matches=matches))
|
|
|
|
# Apply filter
|
|
filtered_df = df[combined_mask]
|
|
logger.info(get_backend_translation("numeric_filter_all_columns_results", matches=combined_mask.sum(), total=len(df), operator=operator, value=value))
|
|
|
|
return filtered_df
|
|
|
|
def write_excel(self, df: pd.DataFrame):
|
|
"""
|
|
Writes the filtered DataFrame to a new Excel file
|
|
"""
|
|
try:
|
|
# If specific columns were selected, only write those
|
|
if self.columns:
|
|
# Only keep the selected columns (if they exist in the DataFrame)
|
|
columns_to_keep = [col for col in self.columns if col in df.columns]
|
|
df_filtered = df[columns_to_keep]
|
|
logger.info(get_backend_translation("writing_selected_columns", columns=columns_to_keep))
|
|
else:
|
|
# Write all columns
|
|
df_filtered = df
|
|
logger.info(get_backend_translation("writing_all_columns", columns=list(df.columns)))
|
|
|
|
# Collect output statistics
|
|
self.stats['output_rows'] = len(df_filtered)
|
|
self.stats['output_columns'] = len(df_filtered.columns)
|
|
|
|
df_filtered.to_excel(self.output_file, index=False)
|
|
|
|
# Get output file size and calculate compression ratio
|
|
if os.path.exists(self.output_file):
|
|
self.stats['output_file_size'] = os.path.getsize(self.output_file)
|
|
if self.stats['input_file_size'] > 0:
|
|
self.stats['compression_ratio'] = self.stats['output_file_size'] / self.stats['input_file_size']
|
|
|
|
logger.info(get_backend_translation("output_file_written", file=self.output_file))
|
|
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
|
|
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
|
|
|
|
if self.stats['input_file_size'] > 0:
|
|
compression_pct = (self.stats['compression_ratio'] - 1) * 100
|
|
if compression_pct > 0:
|
|
logger.info(get_backend_translation("compression_larger", percent=compression_pct))
|
|
else:
|
|
logger.info(get_backend_translation("compression_smaller", percent=compression_pct))
|
|
|
|
except PermissionError:
|
|
logger.error(get_backend_translation("no_write_permission", file=self.output_file))
|
|
raise PermissionError(f"No write permission for the file {self.output_file}")
|
|
except Exception as e:
|
|
logger.error(get_backend_translation("error_writing_excel_file", error=str(e)))
|
|
raise Exception(f"Error writing the Excel file: {e}")
|
|
|
|
def process(self):
|
|
"""
|
|
Main method for processing the Excel file
|
|
"""
|
|
# Start timing
|
|
self.stats['start_time'] = time.time()
|
|
|
|
try:
|
|
logger.info(get_backend_translation("starting_excel_filter_processing"))
|
|
df = self.read_excel()
|
|
filtered_df = self.filter_dataframe(df)
|
|
self.write_excel(filtered_df)
|
|
|
|
# End timing and calculate final statistics
|
|
self.stats['end_time'] = time.time()
|
|
self.stats['processing_time_seconds'] = self.stats['end_time'] - self.stats['start_time']
|
|
|
|
self._log_final_statistics()
|
|
|
|
logger.info(get_backend_translation("excel_filter_processing_completed"))
|
|
return True, None
|
|
|
|
except FileNotFoundError as e:
|
|
error_msg = get_backend_translation("error_file_not_found", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except PermissionError as e:
|
|
error_msg = get_backend_translation("error_permission", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except pd.errors.EmptyDataError as e:
|
|
error_msg = get_backend_translation("error_empty_excel", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except pd.errors.ParserError as e:
|
|
error_msg = get_backend_translation("error_parser", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except re.error as e:
|
|
error_msg = get_backend_translation("error_invalid_regex", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except ValueError as e:
|
|
error_msg = get_backend_translation("error_invalid_input", error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except Exception as e:
|
|
error_msg = get_backend_translation("error_unexpected", type=type(e).__name__, error=str(e))
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""
|
|
Returns the collected statistics
|
|
|
|
Returns:
|
|
Dictionary with all collected statistics
|
|
"""
|
|
return self.stats.copy()
|
|
|
|
def _log_final_statistics(self):
|
|
"""
|
|
Logs the final comprehensive statistics of the processing
|
|
"""
|
|
logger.info(get_backend_translation("processing_statistics"))
|
|
logger.info(get_backend_translation("processing_time", time=self.stats['processing_time_seconds']))
|
|
|
|
# File statistics
|
|
logger.info(get_backend_translation("file_statistics"))
|
|
logger.info(get_backend_translation("input_file_size", size=self.stats['input_file_size'] / (1024*1024)))
|
|
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
|
|
if self.stats['compression_ratio'] > 0:
|
|
compression_pct = (self.stats['compression_ratio'] - 1) * 100
|
|
logger.info(get_backend_translation("compression_rate", rate=compression_pct))
|
|
|
|
# Data dimensions
|
|
logger.info(get_backend_translation("data_dimensions"))
|
|
logger.info(get_backend_translation("input_dimensions", rows=self.stats['input_rows'], columns=self.stats['input_columns']))
|
|
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
|
|
|
|
# Filtering results
|
|
if self.stats['filters_applied']:
|
|
logger.info(get_backend_translation("filter_results"))
|
|
logger.info(get_backend_translation("applied_filters", filters=', '.join(self.stats['filters_applied'])))
|
|
if self.stats['input_rows'] > 0:
|
|
retention_rate = (self.stats['rows_filtered'] / self.stats['input_rows']) * 100
|
|
removal_rate = (self.stats['rows_removed'] / self.stats['input_rows']) * 100
|
|
logger.info(get_backend_translation("rows_retained", rows=self.stats['rows_filtered'], rate=retention_rate))
|
|
logger.info(get_backend_translation("rows_removed", rows=self.stats['rows_removed'], rate=removal_rate))
|
|
|
|
# Performance metrics
|
|
logger.info(get_backend_translation("performance_metrics"))
|
|
logger.info(get_backend_translation("memory_usage", size=self.stats['memory_usage_mb']))
|
|
if self.stats['processing_time_seconds'] > 0 and self.stats['input_rows'] > 0:
|
|
rows_per_second = self.stats['input_rows'] / self.stats['processing_time_seconds']
|
|
logger.info(get_backend_translation("processing_speed", speed=rows_per_second))
|
|
|
|
logger.info(get_backend_translation("end_statistics"))
|