Files
2026-02-12 09:51:22 +01:00

505 lines
21 KiB
Python

"""
Excel Filter Module
Main functionality for filtering Excel files based on regex patterns
"""
import re
import pandas as pd
from typing import List, Dict, Any, Optional
import logging
import time
import os
import json
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Simple translation system for backend messages
class BackendTranslations:
"""
Simple translation system for backend modules
"""
def __init__(self, language="de"):
self.current_language = language
self.translations = {}
# Load translations from JSON files
self.load_translations()
def load_translations(self):
"""
Load translation files from the locales directory
"""
# Get the directory where this script is located
script_dir = Path(__file__).parent
locales_dir = script_dir / "locales"
# Load the language file
lang_file = locales_dir / f"{self.current_language}.json"
if lang_file.exists():
try:
with open(lang_file, 'r', encoding='utf-8') as f:
self.translations = json.load(f)
except Exception as e:
print(f"Error loading {lang_file}: {e}")
self.translations = {}
else:
self.translations = {}
def get(self, key, default=None):
"""
Get a translation for a key
"""
return self.translations.get(key, default if default is not None else key)
def __getitem__(self, key):
"""
Get a translation using dictionary-style access
"""
return self.get(key)
# Global backend translations instance
_backend_translations = BackendTranslations()
def get_backend_translation(key, **kwargs):
"""
Get a backend translation and format it with provided arguments
"""
message = _backend_translations.get(key, key)
if kwargs:
try:
message = message.format(**kwargs)
except (KeyError, ValueError):
pass # Keep original message if formatting fails
return message
class ExcelFilter:
"""
Class for filtering Excel files based on regex patterns and numeric filters
"""
def __init__(self, input_file: str, output_file: str, pattern: str = None,
sheet_name: str = None, columns: List[str] = None,
numeric_filter: Dict[str, Any] = None, language: str = "de"):
"""
Initializes the ExcelFilter
Args:
input_file: Path to the input file
output_file: Path to the output file
pattern: Regex pattern for filtering (optional)
sheet_name: Name of the worksheet (optional)
columns: List of column names to search (optional)
numeric_filter: Dictionary with numeric filter settings (optional)
Format: {'column': str or None, 'operator': str, 'value': float}
If 'column' is None, the filter applies to all columns
"""
self.input_file = input_file
self.output_file = output_file
self.pattern = pattern
self.sheet_name = sheet_name
self.columns = columns
self.numeric_filter = numeric_filter
# Statistics collection
self.stats = {
'start_time': None,
'end_time': None,
'input_file_size': 0,
'output_file_size': 0,
'input_rows': 0,
'input_columns': 0,
'output_rows': 0,
'output_columns': 0,
'memory_usage_mb': 0,
'filters_applied': [],
'processing_time_seconds': 0,
'compression_ratio': 0.0,
'rows_filtered': 0,
'rows_removed': 0
}
# Log initialization with all parameters
logger.info(f"ExcelFilter initialized: input_file='{input_file}', output_file='{output_file}', "
f"pattern='{pattern}', sheet_name='{sheet_name}', columns={columns}, "
f"numeric_filter={numeric_filter}")
def read_excel(self) -> pd.DataFrame:
"""
Reads the Excel file and returns a DataFrame
"""
try:
# Get input file size
if os.path.exists(self.input_file):
self.stats['input_file_size'] = os.path.getsize(self.input_file)
if self.sheet_name:
df = pd.read_excel(self.input_file, sheet_name=self.sheet_name)
else:
df = pd.read_excel(self.input_file)
# Collect input statistics
self.stats['input_rows'] = len(df)
self.stats['input_columns'] = len(df.columns)
self.stats['memory_usage_mb'] = df.memory_usage(deep=True).sum() / (1024 * 1024)
logger.info(get_backend_translation("input_file_loaded", rows=len(df), columns=len(df.columns)))
logger.info(get_backend_translation("file_size_info", size=self.stats['input_file_size'] / (1024*1024)))
logger.info(get_backend_translation("memory_usage_info", size=self.stats['memory_usage_mb']))
return df
except FileNotFoundError:
logger.error(get_backend_translation("file_not_found_error", input_file=self.input_file))
raise FileNotFoundError(f"The file {self.input_file} was not found")
except Exception as e:
logger.error(get_backend_translation("error_reading_excel_file", error=str(e)))
raise Exception(f"Error reading the Excel file: {e}")
def filter_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Filters the DataFrame based on regex patterns and/or numeric filters
"""
try:
filtered_df = df
applied_filters = []
# Apply regex filtering if pattern is provided
if self.pattern and self.pattern.strip():
filtered_df = self._apply_regex_filter(filtered_df)
applied_filters.append("Regex")
# Apply numeric filtering if enabled
if self.numeric_filter:
filtered_df = self._apply_numeric_filter(filtered_df)
applied_filters.append("Numeric")
# Update statistics
self.stats['filters_applied'] = applied_filters
self.stats['rows_filtered'] = len(filtered_df)
self.stats['rows_removed'] = len(df) - len(filtered_df)
if not applied_filters:
logger.warning(get_backend_translation("no_filter_criteria_specified"))
logger.info(get_backend_translation("no_filters_applied_rows_remain", rows=len(df)))
return df
# Calculate filtering efficiency
retention_rate = (len(filtered_df) / len(df)) * 100 if len(df) > 0 else 0
removal_rate = (self.stats['rows_removed'] / len(df)) * 100 if len(df) > 0 else 0
logger.info(get_backend_translation("filters_applied_list", filters=', '.join(applied_filters)))
logger.info(get_backend_translation("filter_results_summary", retained=len(filtered_df), removed=self.stats['rows_removed']))
logger.info(get_backend_translation("retention_removal_rates", retention=retention_rate, removal=removal_rate))
return filtered_df
except Exception as e:
logger.error(f"Error filtering: {e}")
raise Exception(f"Error filtering: {e}")
def _apply_regex_filter(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies regex filtering to the DataFrame
"""
try:
# Compile the regex pattern
# Intelligent pattern recognition:
# - If the pattern contains spaces, search as exact phrase
# - If the pattern seems to be a complete word, use word boundaries
# - Otherwise allow substring matching
if ' ' in self.pattern:
# Exact phrase with word boundaries
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
elif len(self.pattern) <= 4:
# Short patterns (4 or fewer characters) - allow substring matching
regex_pattern = self.pattern
elif len(self.pattern) > 2 and self.pattern.isalpha():
# Probably a complete word
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
else:
# Substring matching for other cases
regex_pattern = self.pattern
regex = re.compile(regex_pattern, re.IGNORECASE)
logger.info(get_backend_translation("regex_pattern_compiled", original=self.pattern, compiled=regex_pattern))
# Determine the columns to search
if self.columns:
columns_to_search = self.columns
logger.info(get_backend_translation("regex_filter_searching_columns", columns=columns_to_search))
else:
columns_to_search = df.columns
logger.info(get_backend_translation("regex_filter_searching_all_columns", columns=list(columns_to_search)))
# Filter function with detailed logging
def regex_filter_row(row):
row_matches = False
for col in columns_to_search:
if col in row and pd.notna(row[col]):
cell_value = str(row[col])
if regex.search(cell_value):
logger.debug(get_backend_translation("regex_match_found", row=row.name, column=col, value=cell_value))
row_matches = True
break
return row_matches
# Apply filter
filtered_df = df[df.apply(regex_filter_row, axis=1)]
logger.info(get_backend_translation("regex_filter_results", rows=len(filtered_df)))
return filtered_df
except re.error as e:
logger.error(get_backend_translation("invalid_regex_pattern", error=str(e)))
raise Exception(f"Invalid regex pattern: {e}")
def _apply_numeric_filter(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies numeric filtering to the DataFrame
"""
column = self.numeric_filter['column']
operator = self.numeric_filter['operator']
value = self.numeric_filter['value']
logger.info(get_backend_translation("numeric_filter_applied", column=column, operator=operator, value=value))
if column is None:
# Apply filter across all columns - a row matches if ANY column meets the criteria
return self._apply_numeric_filter_all_columns(df, operator, value)
else:
# Apply filter to specific column
return self._apply_numeric_filter_single_column(df, column, operator, value)
def _apply_numeric_filter_single_column(self, df: pd.DataFrame, column: str,
operator: str, value: float) -> pd.DataFrame:
"""
Apply numeric filter to a single column
"""
# Check if the column exists
if column not in df.columns:
logger.error(get_backend_translation("column_does_not_exist", column=column))
raise ValueError(f"Column '{column}' does not exist in the DataFrame")
# Convert the column to numeric values (ignore errors for non-numeric values)
numeric_series = pd.to_numeric(df[column], errors='coerce')
# Apply the comparison operator
if operator == '>':
mask = numeric_series > value
elif operator == '<':
mask = numeric_series < value
elif operator == '>=':
mask = numeric_series >= value
elif operator == '<=':
mask = numeric_series <= value
elif operator == '=':
mask = numeric_series == value
else:
logger.error(get_backend_translation("unknown_operator", operator=operator))
raise ValueError(f"Unknown operator: {operator}")
# Apply filter
filtered_df = df[mask]
logger.info(get_backend_translation("numeric_filter_single_column_results", matches=mask.sum(), total=len(df), column=column, operator=operator, value=value))
# Log some examples of the filtered values
if len(filtered_df) > 0:
sample_values = filtered_df[column].head(3).tolist()
logger.debug(get_backend_translation("sample_filtered_values", values=sample_values))
return filtered_df
def _apply_numeric_filter_all_columns(self, df: pd.DataFrame, operator: str, value: float) -> pd.DataFrame:
"""
Apply numeric filter across all columns - a row matches if ANY column meets the criteria
"""
logger.info(get_backend_translation("numeric_filter_all_columns", operator=operator, value=value))
# Create a mask that will be True for rows where ANY column meets the criteria
combined_mask = pd.Series([False] * len(df), index=df.index)
# Check each column
for col in df.columns:
# Convert the column to numeric values
numeric_series = pd.to_numeric(df[col], errors='coerce')
# Apply the comparison operator
if operator == '>':
col_mask = numeric_series > value
elif operator == '<':
col_mask = numeric_series < value
elif operator == '>=':
col_mask = numeric_series >= value
elif operator == '<=':
col_mask = numeric_series <= value
elif operator == '=':
col_mask = numeric_series == value
else:
logger.error(get_backend_translation("unknown_operator", operator=operator))
raise ValueError(f"Unknown operator: {operator}")
# Combine with OR logic (any column matching makes the row match)
combined_mask = combined_mask | col_mask
# Log matches for this column
matches = col_mask.sum()
if matches > 0:
logger.debug(get_backend_translation("column_matches_found", column=col, matches=matches))
# Apply filter
filtered_df = df[combined_mask]
logger.info(get_backend_translation("numeric_filter_all_columns_results", matches=combined_mask.sum(), total=len(df), operator=operator, value=value))
return filtered_df
def write_excel(self, df: pd.DataFrame):
"""
Writes the filtered DataFrame to a new Excel file
"""
try:
# If specific columns were selected, only write those
if self.columns:
# Only keep the selected columns (if they exist in the DataFrame)
columns_to_keep = [col for col in self.columns if col in df.columns]
df_filtered = df[columns_to_keep]
logger.info(get_backend_translation("writing_selected_columns", columns=columns_to_keep))
else:
# Write all columns
df_filtered = df
logger.info(get_backend_translation("writing_all_columns", columns=list(df.columns)))
# Collect output statistics
self.stats['output_rows'] = len(df_filtered)
self.stats['output_columns'] = len(df_filtered.columns)
df_filtered.to_excel(self.output_file, index=False)
# Get output file size and calculate compression ratio
if os.path.exists(self.output_file):
self.stats['output_file_size'] = os.path.getsize(self.output_file)
if self.stats['input_file_size'] > 0:
self.stats['compression_ratio'] = self.stats['output_file_size'] / self.stats['input_file_size']
logger.info(get_backend_translation("output_file_written", file=self.output_file))
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
if self.stats['input_file_size'] > 0:
compression_pct = (self.stats['compression_ratio'] - 1) * 100
if compression_pct > 0:
logger.info(get_backend_translation("compression_larger", percent=compression_pct))
else:
logger.info(get_backend_translation("compression_smaller", percent=compression_pct))
except PermissionError:
logger.error(get_backend_translation("no_write_permission", file=self.output_file))
raise PermissionError(f"No write permission for the file {self.output_file}")
except Exception as e:
logger.error(get_backend_translation("error_writing_excel_file", error=str(e)))
raise Exception(f"Error writing the Excel file: {e}")
def process(self):
"""
Main method for processing the Excel file
"""
# Start timing
self.stats['start_time'] = time.time()
try:
logger.info(get_backend_translation("starting_excel_filter_processing"))
df = self.read_excel()
filtered_df = self.filter_dataframe(df)
self.write_excel(filtered_df)
# End timing and calculate final statistics
self.stats['end_time'] = time.time()
self.stats['processing_time_seconds'] = self.stats['end_time'] - self.stats['start_time']
self._log_final_statistics()
logger.info(get_backend_translation("excel_filter_processing_completed"))
return True, None
except FileNotFoundError as e:
error_msg = get_backend_translation("error_file_not_found", error=str(e))
logger.error(error_msg)
return False, error_msg
except PermissionError as e:
error_msg = get_backend_translation("error_permission", error=str(e))
logger.error(error_msg)
return False, error_msg
except pd.errors.EmptyDataError as e:
error_msg = get_backend_translation("error_empty_excel", error=str(e))
logger.error(error_msg)
return False, error_msg
except pd.errors.ParserError as e:
error_msg = get_backend_translation("error_parser", error=str(e))
logger.error(error_msg)
return False, error_msg
except re.error as e:
error_msg = get_backend_translation("error_invalid_regex", error=str(e))
logger.error(error_msg)
return False, error_msg
except ValueError as e:
error_msg = get_backend_translation("error_invalid_input", error=str(e))
logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = get_backend_translation("error_unexpected", type=type(e).__name__, error=str(e))
logger.error(error_msg)
return False, error_msg
def get_statistics(self) -> Dict[str, Any]:
"""
Returns the collected statistics
Returns:
Dictionary with all collected statistics
"""
return self.stats.copy()
def _log_final_statistics(self):
"""
Logs the final comprehensive statistics of the processing
"""
logger.info(get_backend_translation("processing_statistics"))
logger.info(get_backend_translation("processing_time", time=self.stats['processing_time_seconds']))
# File statistics
logger.info(get_backend_translation("file_statistics"))
logger.info(get_backend_translation("input_file_size", size=self.stats['input_file_size'] / (1024*1024)))
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
if self.stats['compression_ratio'] > 0:
compression_pct = (self.stats['compression_ratio'] - 1) * 100
logger.info(get_backend_translation("compression_rate", rate=compression_pct))
# Data dimensions
logger.info(get_backend_translation("data_dimensions"))
logger.info(get_backend_translation("input_dimensions", rows=self.stats['input_rows'], columns=self.stats['input_columns']))
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
# Filtering results
if self.stats['filters_applied']:
logger.info(get_backend_translation("filter_results"))
logger.info(get_backend_translation("applied_filters", filters=', '.join(self.stats['filters_applied'])))
if self.stats['input_rows'] > 0:
retention_rate = (self.stats['rows_filtered'] / self.stats['input_rows']) * 100
removal_rate = (self.stats['rows_removed'] / self.stats['input_rows']) * 100
logger.info(get_backend_translation("rows_retained", rows=self.stats['rows_filtered'], rate=retention_rate))
logger.info(get_backend_translation("rows_removed", rows=self.stats['rows_removed'], rate=removal_rate))
# Performance metrics
logger.info(get_backend_translation("performance_metrics"))
logger.info(get_backend_translation("memory_usage", size=self.stats['memory_usage_mb']))
if self.stats['processing_time_seconds'] > 0 and self.stats['input_rows'] > 0:
rows_per_second = self.stats['input_rows'] / self.stats['processing_time_seconds']
logger.info(get_backend_translation("processing_speed", speed=rows_per_second))
logger.info(get_backend_translation("end_statistics"))