""" Excel Filter Module Main functionality for filtering Excel files based on regex patterns """ import re import pandas as pd from typing import List, Dict, Any, Optional import logging import time import os import json from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Simple translation system for backend messages class BackendTranslations: """ Simple translation system for backend modules """ def __init__(self, language="de"): self.current_language = language self.translations = {} # Load translations from JSON files self.load_translations() def load_translations(self): """ Load translation files from the locales directory """ # Get the directory where this script is located script_dir = Path(__file__).parent locales_dir = script_dir / "locales" # Load the language file lang_file = locales_dir / f"{self.current_language}.json" if lang_file.exists(): try: with open(lang_file, 'r', encoding='utf-8') as f: self.translations = json.load(f) except Exception as e: print(f"Error loading {lang_file}: {e}") self.translations = {} else: self.translations = {} def get(self, key, default=None): """ Get a translation for a key """ return self.translations.get(key, default if default is not None else key) def __getitem__(self, key): """ Get a translation using dictionary-style access """ return self.get(key) # Global backend translations instance _backend_translations = BackendTranslations() def get_backend_translation(key, **kwargs): """ Get a backend translation and format it with provided arguments """ message = _backend_translations.get(key, key) if kwargs: try: message = message.format(**kwargs) except (KeyError, ValueError): pass # Keep original message if formatting fails return message class ExcelFilter: """ Class for filtering Excel files based on regex patterns and numeric filters """ def __init__(self, input_file: str, output_file: str, pattern: str = None, sheet_name: str = None, columns: List[str] = None, numeric_filter: Dict[str, Any] = None, language: str = "de"): """ Initializes the ExcelFilter Args: input_file: Path to the input file output_file: Path to the output file pattern: Regex pattern for filtering (optional) sheet_name: Name of the worksheet (optional) columns: List of column names to search (optional) numeric_filter: Dictionary with numeric filter settings (optional) Format: {'column': str or None, 'operator': str, 'value': float} If 'column' is None, the filter applies to all columns """ self.input_file = input_file self.output_file = output_file self.pattern = pattern self.sheet_name = sheet_name self.columns = columns self.numeric_filter = numeric_filter # Statistics collection self.stats = { 'start_time': None, 'end_time': None, 'input_file_size': 0, 'output_file_size': 0, 'input_rows': 0, 'input_columns': 0, 'output_rows': 0, 'output_columns': 0, 'memory_usage_mb': 0, 'filters_applied': [], 'processing_time_seconds': 0, 'compression_ratio': 0.0, 'rows_filtered': 0, 'rows_removed': 0 } # Log initialization with all parameters logger.info(f"ExcelFilter initialized: input_file='{input_file}', output_file='{output_file}', " f"pattern='{pattern}', sheet_name='{sheet_name}', columns={columns}, " f"numeric_filter={numeric_filter}") def read_excel(self) -> pd.DataFrame: """ Reads the Excel file and returns a DataFrame """ try: # Get input file size if os.path.exists(self.input_file): self.stats['input_file_size'] = os.path.getsize(self.input_file) if self.sheet_name: df = pd.read_excel(self.input_file, sheet_name=self.sheet_name) else: df = pd.read_excel(self.input_file) # Collect input statistics self.stats['input_rows'] = len(df) self.stats['input_columns'] = len(df.columns) self.stats['memory_usage_mb'] = df.memory_usage(deep=True).sum() / (1024 * 1024) logger.info(get_backend_translation("input_file_loaded", rows=len(df), columns=len(df.columns))) logger.info(get_backend_translation("file_size_info", size=self.stats['input_file_size'] / (1024*1024))) logger.info(get_backend_translation("memory_usage_info", size=self.stats['memory_usage_mb'])) return df except FileNotFoundError: logger.error(get_backend_translation("file_not_found_error", input_file=self.input_file)) raise FileNotFoundError(f"The file {self.input_file} was not found") except Exception as e: logger.error(get_backend_translation("error_reading_excel_file", error=str(e))) raise Exception(f"Error reading the Excel file: {e}") def filter_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """ Filters the DataFrame based on regex patterns and/or numeric filters """ try: filtered_df = df applied_filters = [] # Apply regex filtering if pattern is provided if self.pattern and self.pattern.strip(): filtered_df = self._apply_regex_filter(filtered_df) applied_filters.append("Regex") # Apply numeric filtering if enabled if self.numeric_filter: filtered_df = self._apply_numeric_filter(filtered_df) applied_filters.append("Numeric") # Update statistics self.stats['filters_applied'] = applied_filters self.stats['rows_filtered'] = len(filtered_df) self.stats['rows_removed'] = len(df) - len(filtered_df) if not applied_filters: logger.warning(get_backend_translation("no_filter_criteria_specified")) logger.info(get_backend_translation("no_filters_applied_rows_remain", rows=len(df))) return df # Calculate filtering efficiency retention_rate = (len(filtered_df) / len(df)) * 100 if len(df) > 0 else 0 removal_rate = (self.stats['rows_removed'] / len(df)) * 100 if len(df) > 0 else 0 logger.info(get_backend_translation("filters_applied_list", filters=', '.join(applied_filters))) logger.info(get_backend_translation("filter_results_summary", retained=len(filtered_df), removed=self.stats['rows_removed'])) logger.info(get_backend_translation("retention_removal_rates", retention=retention_rate, removal=removal_rate)) return filtered_df except Exception as e: logger.error(f"Error filtering: {e}") raise Exception(f"Error filtering: {e}") def _apply_regex_filter(self, df: pd.DataFrame) -> pd.DataFrame: """ Applies regex filtering to the DataFrame """ try: # Compile the regex pattern # Intelligent pattern recognition: # - If the pattern contains spaces, search as exact phrase # - If the pattern seems to be a complete word, use word boundaries # - Otherwise allow substring matching if ' ' in self.pattern: # Exact phrase with word boundaries regex_pattern = rf"\b{re.escape(self.pattern)}\b" elif len(self.pattern) <= 4: # Short patterns (4 or fewer characters) - allow substring matching regex_pattern = self.pattern elif len(self.pattern) > 2 and self.pattern.isalpha(): # Probably a complete word regex_pattern = rf"\b{re.escape(self.pattern)}\b" else: # Substring matching for other cases regex_pattern = self.pattern regex = re.compile(regex_pattern, re.IGNORECASE) logger.info(get_backend_translation("regex_pattern_compiled", original=self.pattern, compiled=regex_pattern)) # Determine the columns to search if self.columns: columns_to_search = self.columns logger.info(get_backend_translation("regex_filter_searching_columns", columns=columns_to_search)) else: columns_to_search = df.columns logger.info(get_backend_translation("regex_filter_searching_all_columns", columns=list(columns_to_search))) # Filter function with detailed logging def regex_filter_row(row): row_matches = False for col in columns_to_search: if col in row and pd.notna(row[col]): cell_value = str(row[col]) if regex.search(cell_value): logger.debug(get_backend_translation("regex_match_found", row=row.name, column=col, value=cell_value)) row_matches = True break return row_matches # Apply filter filtered_df = df[df.apply(regex_filter_row, axis=1)] logger.info(get_backend_translation("regex_filter_results", rows=len(filtered_df))) return filtered_df except re.error as e: logger.error(get_backend_translation("invalid_regex_pattern", error=str(e))) raise Exception(f"Invalid regex pattern: {e}") def _apply_numeric_filter(self, df: pd.DataFrame) -> pd.DataFrame: """ Applies numeric filtering to the DataFrame """ column = self.numeric_filter['column'] operator = self.numeric_filter['operator'] value = self.numeric_filter['value'] logger.info(get_backend_translation("numeric_filter_applied", column=column, operator=operator, value=value)) if column is None: # Apply filter across all columns - a row matches if ANY column meets the criteria return self._apply_numeric_filter_all_columns(df, operator, value) else: # Apply filter to specific column return self._apply_numeric_filter_single_column(df, column, operator, value) def _apply_numeric_filter_single_column(self, df: pd.DataFrame, column: str, operator: str, value: float) -> pd.DataFrame: """ Apply numeric filter to a single column """ # Check if the column exists if column not in df.columns: logger.error(get_backend_translation("column_does_not_exist", column=column)) raise ValueError(f"Column '{column}' does not exist in the DataFrame") # Convert the column to numeric values (ignore errors for non-numeric values) numeric_series = pd.to_numeric(df[column], errors='coerce') # Apply the comparison operator if operator == '>': mask = numeric_series > value elif operator == '<': mask = numeric_series < value elif operator == '>=': mask = numeric_series >= value elif operator == '<=': mask = numeric_series <= value elif operator == '=': mask = numeric_series == value else: logger.error(get_backend_translation("unknown_operator", operator=operator)) raise ValueError(f"Unknown operator: {operator}") # Apply filter filtered_df = df[mask] logger.info(get_backend_translation("numeric_filter_single_column_results", matches=mask.sum(), total=len(df), column=column, operator=operator, value=value)) # Log some examples of the filtered values if len(filtered_df) > 0: sample_values = filtered_df[column].head(3).tolist() logger.debug(get_backend_translation("sample_filtered_values", values=sample_values)) return filtered_df def _apply_numeric_filter_all_columns(self, df: pd.DataFrame, operator: str, value: float) -> pd.DataFrame: """ Apply numeric filter across all columns - a row matches if ANY column meets the criteria """ logger.info(get_backend_translation("numeric_filter_all_columns", operator=operator, value=value)) # Create a mask that will be True for rows where ANY column meets the criteria combined_mask = pd.Series([False] * len(df), index=df.index) # Check each column for col in df.columns: # Convert the column to numeric values numeric_series = pd.to_numeric(df[col], errors='coerce') # Apply the comparison operator if operator == '>': col_mask = numeric_series > value elif operator == '<': col_mask = numeric_series < value elif operator == '>=': col_mask = numeric_series >= value elif operator == '<=': col_mask = numeric_series <= value elif operator == '=': col_mask = numeric_series == value else: logger.error(get_backend_translation("unknown_operator", operator=operator)) raise ValueError(f"Unknown operator: {operator}") # Combine with OR logic (any column matching makes the row match) combined_mask = combined_mask | col_mask # Log matches for this column matches = col_mask.sum() if matches > 0: logger.debug(get_backend_translation("column_matches_found", column=col, matches=matches)) # Apply filter filtered_df = df[combined_mask] logger.info(get_backend_translation("numeric_filter_all_columns_results", matches=combined_mask.sum(), total=len(df), operator=operator, value=value)) return filtered_df def write_excel(self, df: pd.DataFrame): """ Writes the filtered DataFrame to a new Excel file """ try: # If specific columns were selected, only write those if self.columns: # Only keep the selected columns (if they exist in the DataFrame) columns_to_keep = [col for col in self.columns if col in df.columns] df_filtered = df[columns_to_keep] logger.info(get_backend_translation("writing_selected_columns", columns=columns_to_keep)) else: # Write all columns df_filtered = df logger.info(get_backend_translation("writing_all_columns", columns=list(df.columns))) # Collect output statistics self.stats['output_rows'] = len(df_filtered) self.stats['output_columns'] = len(df_filtered.columns) df_filtered.to_excel(self.output_file, index=False) # Get output file size and calculate compression ratio if os.path.exists(self.output_file): self.stats['output_file_size'] = os.path.getsize(self.output_file) if self.stats['input_file_size'] > 0: self.stats['compression_ratio'] = self.stats['output_file_size'] / self.stats['input_file_size'] logger.info(get_backend_translation("output_file_written", file=self.output_file)) logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns'])) logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024))) if self.stats['input_file_size'] > 0: compression_pct = (self.stats['compression_ratio'] - 1) * 100 if compression_pct > 0: logger.info(get_backend_translation("compression_larger", percent=compression_pct)) else: logger.info(get_backend_translation("compression_smaller", percent=compression_pct)) except PermissionError: logger.error(get_backend_translation("no_write_permission", file=self.output_file)) raise PermissionError(f"No write permission for the file {self.output_file}") except Exception as e: logger.error(get_backend_translation("error_writing_excel_file", error=str(e))) raise Exception(f"Error writing the Excel file: {e}") def process(self): """ Main method for processing the Excel file """ # Start timing self.stats['start_time'] = time.time() try: logger.info(get_backend_translation("starting_excel_filter_processing")) df = self.read_excel() filtered_df = self.filter_dataframe(df) self.write_excel(filtered_df) # End timing and calculate final statistics self.stats['end_time'] = time.time() self.stats['processing_time_seconds'] = self.stats['end_time'] - self.stats['start_time'] self._log_final_statistics() logger.info(get_backend_translation("excel_filter_processing_completed")) return True, None except FileNotFoundError as e: error_msg = get_backend_translation("error_file_not_found", error=str(e)) logger.error(error_msg) return False, error_msg except PermissionError as e: error_msg = get_backend_translation("error_permission", error=str(e)) logger.error(error_msg) return False, error_msg except pd.errors.EmptyDataError as e: error_msg = get_backend_translation("error_empty_excel", error=str(e)) logger.error(error_msg) return False, error_msg except pd.errors.ParserError as e: error_msg = get_backend_translation("error_parser", error=str(e)) logger.error(error_msg) return False, error_msg except re.error as e: error_msg = get_backend_translation("error_invalid_regex", error=str(e)) logger.error(error_msg) return False, error_msg except ValueError as e: error_msg = get_backend_translation("error_invalid_input", error=str(e)) logger.error(error_msg) return False, error_msg except Exception as e: error_msg = get_backend_translation("error_unexpected", type=type(e).__name__, error=str(e)) logger.error(error_msg) return False, error_msg def get_statistics(self) -> Dict[str, Any]: """ Returns the collected statistics Returns: Dictionary with all collected statistics """ return self.stats.copy() def _log_final_statistics(self): """ Logs the final comprehensive statistics of the processing """ logger.info(get_backend_translation("processing_statistics")) logger.info(get_backend_translation("processing_time", time=self.stats['processing_time_seconds'])) # File statistics logger.info(get_backend_translation("file_statistics")) logger.info(get_backend_translation("input_file_size", size=self.stats['input_file_size'] / (1024*1024))) logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024))) if self.stats['compression_ratio'] > 0: compression_pct = (self.stats['compression_ratio'] - 1) * 100 logger.info(get_backend_translation("compression_rate", rate=compression_pct)) # Data dimensions logger.info(get_backend_translation("data_dimensions")) logger.info(get_backend_translation("input_dimensions", rows=self.stats['input_rows'], columns=self.stats['input_columns'])) logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns'])) # Filtering results if self.stats['filters_applied']: logger.info(get_backend_translation("filter_results")) logger.info(get_backend_translation("applied_filters", filters=', '.join(self.stats['filters_applied']))) if self.stats['input_rows'] > 0: retention_rate = (self.stats['rows_filtered'] / self.stats['input_rows']) * 100 removal_rate = (self.stats['rows_removed'] / self.stats['input_rows']) * 100 logger.info(get_backend_translation("rows_retained", rows=self.stats['rows_filtered'], rate=retention_rate)) logger.info(get_backend_translation("rows_removed", rows=self.stats['rows_removed'], rate=removal_rate)) # Performance metrics logger.info(get_backend_translation("performance_metrics")) logger.info(get_backend_translation("memory_usage", size=self.stats['memory_usage_mb'])) if self.stats['processing_time_seconds'] > 0 and self.stats['input_rows'] > 0: rows_per_second = self.stats['input_rows'] / self.stats['processing_time_seconds'] logger.info(get_backend_translation("processing_speed", speed=rows_per_second)) logger.info(get_backend_translation("end_statistics"))