Migrate to GitLab
This commit is contained in:
504
excel_filter/filter.py
Normal file
504
excel_filter/filter.py
Normal file
@@ -0,0 +1,504 @@
|
||||
"""
|
||||
Excel Filter Module
|
||||
Main functionality for filtering Excel files based on regex patterns
|
||||
"""
|
||||
|
||||
import re
|
||||
import pandas as pd
|
||||
from typing import List, Dict, Any, Optional
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Simple translation system for backend messages
|
||||
class BackendTranslations:
|
||||
"""
|
||||
Simple translation system for backend modules
|
||||
"""
|
||||
|
||||
def __init__(self, language="de"):
|
||||
self.current_language = language
|
||||
self.translations = {}
|
||||
|
||||
# Load translations from JSON files
|
||||
self.load_translations()
|
||||
|
||||
def load_translations(self):
|
||||
"""
|
||||
Load translation files from the locales directory
|
||||
"""
|
||||
# Get the directory where this script is located
|
||||
script_dir = Path(__file__).parent
|
||||
locales_dir = script_dir / "locales"
|
||||
|
||||
# Load the language file
|
||||
lang_file = locales_dir / f"{self.current_language}.json"
|
||||
if lang_file.exists():
|
||||
try:
|
||||
with open(lang_file, 'r', encoding='utf-8') as f:
|
||||
self.translations = json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Error loading {lang_file}: {e}")
|
||||
self.translations = {}
|
||||
else:
|
||||
self.translations = {}
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""
|
||||
Get a translation for a key
|
||||
"""
|
||||
return self.translations.get(key, default if default is not None else key)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
Get a translation using dictionary-style access
|
||||
"""
|
||||
return self.get(key)
|
||||
|
||||
# Global backend translations instance
|
||||
_backend_translations = BackendTranslations()
|
||||
|
||||
def get_backend_translation(key, **kwargs):
|
||||
"""
|
||||
Get a backend translation and format it with provided arguments
|
||||
"""
|
||||
message = _backend_translations.get(key, key)
|
||||
if kwargs:
|
||||
try:
|
||||
message = message.format(**kwargs)
|
||||
except (KeyError, ValueError):
|
||||
pass # Keep original message if formatting fails
|
||||
return message
|
||||
|
||||
|
||||
class ExcelFilter:
|
||||
"""
|
||||
Class for filtering Excel files based on regex patterns and numeric filters
|
||||
"""
|
||||
|
||||
def __init__(self, input_file: str, output_file: str, pattern: str = None,
|
||||
sheet_name: str = None, columns: List[str] = None,
|
||||
numeric_filter: Dict[str, Any] = None, language: str = "de"):
|
||||
"""
|
||||
Initializes the ExcelFilter
|
||||
|
||||
Args:
|
||||
input_file: Path to the input file
|
||||
output_file: Path to the output file
|
||||
pattern: Regex pattern for filtering (optional)
|
||||
sheet_name: Name of the worksheet (optional)
|
||||
columns: List of column names to search (optional)
|
||||
numeric_filter: Dictionary with numeric filter settings (optional)
|
||||
Format: {'column': str or None, 'operator': str, 'value': float}
|
||||
If 'column' is None, the filter applies to all columns
|
||||
"""
|
||||
self.input_file = input_file
|
||||
self.output_file = output_file
|
||||
self.pattern = pattern
|
||||
self.sheet_name = sheet_name
|
||||
self.columns = columns
|
||||
self.numeric_filter = numeric_filter
|
||||
|
||||
# Statistics collection
|
||||
self.stats = {
|
||||
'start_time': None,
|
||||
'end_time': None,
|
||||
'input_file_size': 0,
|
||||
'output_file_size': 0,
|
||||
'input_rows': 0,
|
||||
'input_columns': 0,
|
||||
'output_rows': 0,
|
||||
'output_columns': 0,
|
||||
'memory_usage_mb': 0,
|
||||
'filters_applied': [],
|
||||
'processing_time_seconds': 0,
|
||||
'compression_ratio': 0.0,
|
||||
'rows_filtered': 0,
|
||||
'rows_removed': 0
|
||||
}
|
||||
|
||||
# Log initialization with all parameters
|
||||
logger.info(f"ExcelFilter initialized: input_file='{input_file}', output_file='{output_file}', "
|
||||
f"pattern='{pattern}', sheet_name='{sheet_name}', columns={columns}, "
|
||||
f"numeric_filter={numeric_filter}")
|
||||
|
||||
def read_excel(self) -> pd.DataFrame:
|
||||
"""
|
||||
Reads the Excel file and returns a DataFrame
|
||||
"""
|
||||
try:
|
||||
# Get input file size
|
||||
if os.path.exists(self.input_file):
|
||||
self.stats['input_file_size'] = os.path.getsize(self.input_file)
|
||||
|
||||
if self.sheet_name:
|
||||
df = pd.read_excel(self.input_file, sheet_name=self.sheet_name)
|
||||
else:
|
||||
df = pd.read_excel(self.input_file)
|
||||
|
||||
# Collect input statistics
|
||||
self.stats['input_rows'] = len(df)
|
||||
self.stats['input_columns'] = len(df.columns)
|
||||
self.stats['memory_usage_mb'] = df.memory_usage(deep=True).sum() / (1024 * 1024)
|
||||
|
||||
logger.info(get_backend_translation("input_file_loaded", rows=len(df), columns=len(df.columns)))
|
||||
logger.info(get_backend_translation("file_size_info", size=self.stats['input_file_size'] / (1024*1024)))
|
||||
logger.info(get_backend_translation("memory_usage_info", size=self.stats['memory_usage_mb']))
|
||||
|
||||
return df
|
||||
except FileNotFoundError:
|
||||
logger.error(get_backend_translation("file_not_found_error", input_file=self.input_file))
|
||||
raise FileNotFoundError(f"The file {self.input_file} was not found")
|
||||
except Exception as e:
|
||||
logger.error(get_backend_translation("error_reading_excel_file", error=str(e)))
|
||||
raise Exception(f"Error reading the Excel file: {e}")
|
||||
|
||||
def filter_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Filters the DataFrame based on regex patterns and/or numeric filters
|
||||
"""
|
||||
try:
|
||||
filtered_df = df
|
||||
applied_filters = []
|
||||
|
||||
# Apply regex filtering if pattern is provided
|
||||
if self.pattern and self.pattern.strip():
|
||||
filtered_df = self._apply_regex_filter(filtered_df)
|
||||
applied_filters.append("Regex")
|
||||
|
||||
# Apply numeric filtering if enabled
|
||||
if self.numeric_filter:
|
||||
filtered_df = self._apply_numeric_filter(filtered_df)
|
||||
applied_filters.append("Numeric")
|
||||
|
||||
# Update statistics
|
||||
self.stats['filters_applied'] = applied_filters
|
||||
self.stats['rows_filtered'] = len(filtered_df)
|
||||
self.stats['rows_removed'] = len(df) - len(filtered_df)
|
||||
|
||||
if not applied_filters:
|
||||
logger.warning(get_backend_translation("no_filter_criteria_specified"))
|
||||
logger.info(get_backend_translation("no_filters_applied_rows_remain", rows=len(df)))
|
||||
return df
|
||||
|
||||
# Calculate filtering efficiency
|
||||
retention_rate = (len(filtered_df) / len(df)) * 100 if len(df) > 0 else 0
|
||||
removal_rate = (self.stats['rows_removed'] / len(df)) * 100 if len(df) > 0 else 0
|
||||
|
||||
logger.info(get_backend_translation("filters_applied_list", filters=', '.join(applied_filters)))
|
||||
logger.info(get_backend_translation("filter_results_summary", retained=len(filtered_df), removed=self.stats['rows_removed']))
|
||||
logger.info(get_backend_translation("retention_removal_rates", retention=retention_rate, removal=removal_rate))
|
||||
|
||||
return filtered_df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error filtering: {e}")
|
||||
raise Exception(f"Error filtering: {e}")
|
||||
|
||||
def _apply_regex_filter(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Applies regex filtering to the DataFrame
|
||||
"""
|
||||
try:
|
||||
# Compile the regex pattern
|
||||
# Intelligent pattern recognition:
|
||||
# - If the pattern contains spaces, search as exact phrase
|
||||
# - If the pattern seems to be a complete word, use word boundaries
|
||||
# - Otherwise allow substring matching
|
||||
|
||||
if ' ' in self.pattern:
|
||||
# Exact phrase with word boundaries
|
||||
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
|
||||
elif len(self.pattern) <= 4:
|
||||
# Short patterns (4 or fewer characters) - allow substring matching
|
||||
regex_pattern = self.pattern
|
||||
elif len(self.pattern) > 2 and self.pattern.isalpha():
|
||||
# Probably a complete word
|
||||
regex_pattern = rf"\b{re.escape(self.pattern)}\b"
|
||||
else:
|
||||
# Substring matching for other cases
|
||||
regex_pattern = self.pattern
|
||||
|
||||
regex = re.compile(regex_pattern, re.IGNORECASE)
|
||||
logger.info(get_backend_translation("regex_pattern_compiled", original=self.pattern, compiled=regex_pattern))
|
||||
|
||||
# Determine the columns to search
|
||||
if self.columns:
|
||||
columns_to_search = self.columns
|
||||
logger.info(get_backend_translation("regex_filter_searching_columns", columns=columns_to_search))
|
||||
else:
|
||||
columns_to_search = df.columns
|
||||
logger.info(get_backend_translation("regex_filter_searching_all_columns", columns=list(columns_to_search)))
|
||||
|
||||
# Filter function with detailed logging
|
||||
def regex_filter_row(row):
|
||||
row_matches = False
|
||||
for col in columns_to_search:
|
||||
if col in row and pd.notna(row[col]):
|
||||
cell_value = str(row[col])
|
||||
if regex.search(cell_value):
|
||||
logger.debug(get_backend_translation("regex_match_found", row=row.name, column=col, value=cell_value))
|
||||
row_matches = True
|
||||
break
|
||||
|
||||
return row_matches
|
||||
|
||||
# Apply filter
|
||||
filtered_df = df[df.apply(regex_filter_row, axis=1)]
|
||||
logger.info(get_backend_translation("regex_filter_results", rows=len(filtered_df)))
|
||||
|
||||
return filtered_df
|
||||
|
||||
except re.error as e:
|
||||
logger.error(get_backend_translation("invalid_regex_pattern", error=str(e)))
|
||||
raise Exception(f"Invalid regex pattern: {e}")
|
||||
|
||||
def _apply_numeric_filter(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Applies numeric filtering to the DataFrame
|
||||
"""
|
||||
column = self.numeric_filter['column']
|
||||
operator = self.numeric_filter['operator']
|
||||
value = self.numeric_filter['value']
|
||||
|
||||
logger.info(get_backend_translation("numeric_filter_applied", column=column, operator=operator, value=value))
|
||||
|
||||
if column is None:
|
||||
# Apply filter across all columns - a row matches if ANY column meets the criteria
|
||||
return self._apply_numeric_filter_all_columns(df, operator, value)
|
||||
else:
|
||||
# Apply filter to specific column
|
||||
return self._apply_numeric_filter_single_column(df, column, operator, value)
|
||||
|
||||
def _apply_numeric_filter_single_column(self, df: pd.DataFrame, column: str,
|
||||
operator: str, value: float) -> pd.DataFrame:
|
||||
"""
|
||||
Apply numeric filter to a single column
|
||||
"""
|
||||
# Check if the column exists
|
||||
if column not in df.columns:
|
||||
logger.error(get_backend_translation("column_does_not_exist", column=column))
|
||||
raise ValueError(f"Column '{column}' does not exist in the DataFrame")
|
||||
|
||||
# Convert the column to numeric values (ignore errors for non-numeric values)
|
||||
numeric_series = pd.to_numeric(df[column], errors='coerce')
|
||||
|
||||
# Apply the comparison operator
|
||||
if operator == '>':
|
||||
mask = numeric_series > value
|
||||
elif operator == '<':
|
||||
mask = numeric_series < value
|
||||
elif operator == '>=':
|
||||
mask = numeric_series >= value
|
||||
elif operator == '<=':
|
||||
mask = numeric_series <= value
|
||||
elif operator == '=':
|
||||
mask = numeric_series == value
|
||||
else:
|
||||
logger.error(get_backend_translation("unknown_operator", operator=operator))
|
||||
raise ValueError(f"Unknown operator: {operator}")
|
||||
|
||||
# Apply filter
|
||||
filtered_df = df[mask]
|
||||
logger.info(get_backend_translation("numeric_filter_single_column_results", matches=mask.sum(), total=len(df), column=column, operator=operator, value=value))
|
||||
|
||||
# Log some examples of the filtered values
|
||||
if len(filtered_df) > 0:
|
||||
sample_values = filtered_df[column].head(3).tolist()
|
||||
logger.debug(get_backend_translation("sample_filtered_values", values=sample_values))
|
||||
|
||||
return filtered_df
|
||||
|
||||
def _apply_numeric_filter_all_columns(self, df: pd.DataFrame, operator: str, value: float) -> pd.DataFrame:
|
||||
"""
|
||||
Apply numeric filter across all columns - a row matches if ANY column meets the criteria
|
||||
"""
|
||||
logger.info(get_backend_translation("numeric_filter_all_columns", operator=operator, value=value))
|
||||
|
||||
# Create a mask that will be True for rows where ANY column meets the criteria
|
||||
combined_mask = pd.Series([False] * len(df), index=df.index)
|
||||
|
||||
# Check each column
|
||||
for col in df.columns:
|
||||
# Convert the column to numeric values
|
||||
numeric_series = pd.to_numeric(df[col], errors='coerce')
|
||||
|
||||
# Apply the comparison operator
|
||||
if operator == '>':
|
||||
col_mask = numeric_series > value
|
||||
elif operator == '<':
|
||||
col_mask = numeric_series < value
|
||||
elif operator == '>=':
|
||||
col_mask = numeric_series >= value
|
||||
elif operator == '<=':
|
||||
col_mask = numeric_series <= value
|
||||
elif operator == '=':
|
||||
col_mask = numeric_series == value
|
||||
else:
|
||||
logger.error(get_backend_translation("unknown_operator", operator=operator))
|
||||
raise ValueError(f"Unknown operator: {operator}")
|
||||
|
||||
# Combine with OR logic (any column matching makes the row match)
|
||||
combined_mask = combined_mask | col_mask
|
||||
|
||||
# Log matches for this column
|
||||
matches = col_mask.sum()
|
||||
if matches > 0:
|
||||
logger.debug(get_backend_translation("column_matches_found", column=col, matches=matches))
|
||||
|
||||
# Apply filter
|
||||
filtered_df = df[combined_mask]
|
||||
logger.info(get_backend_translation("numeric_filter_all_columns_results", matches=combined_mask.sum(), total=len(df), operator=operator, value=value))
|
||||
|
||||
return filtered_df
|
||||
|
||||
def write_excel(self, df: pd.DataFrame):
|
||||
"""
|
||||
Writes the filtered DataFrame to a new Excel file
|
||||
"""
|
||||
try:
|
||||
# If specific columns were selected, only write those
|
||||
if self.columns:
|
||||
# Only keep the selected columns (if they exist in the DataFrame)
|
||||
columns_to_keep = [col for col in self.columns if col in df.columns]
|
||||
df_filtered = df[columns_to_keep]
|
||||
logger.info(get_backend_translation("writing_selected_columns", columns=columns_to_keep))
|
||||
else:
|
||||
# Write all columns
|
||||
df_filtered = df
|
||||
logger.info(get_backend_translation("writing_all_columns", columns=list(df.columns)))
|
||||
|
||||
# Collect output statistics
|
||||
self.stats['output_rows'] = len(df_filtered)
|
||||
self.stats['output_columns'] = len(df_filtered.columns)
|
||||
|
||||
df_filtered.to_excel(self.output_file, index=False)
|
||||
|
||||
# Get output file size and calculate compression ratio
|
||||
if os.path.exists(self.output_file):
|
||||
self.stats['output_file_size'] = os.path.getsize(self.output_file)
|
||||
if self.stats['input_file_size'] > 0:
|
||||
self.stats['compression_ratio'] = self.stats['output_file_size'] / self.stats['input_file_size']
|
||||
|
||||
logger.info(get_backend_translation("output_file_written", file=self.output_file))
|
||||
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
|
||||
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
|
||||
|
||||
if self.stats['input_file_size'] > 0:
|
||||
compression_pct = (self.stats['compression_ratio'] - 1) * 100
|
||||
if compression_pct > 0:
|
||||
logger.info(get_backend_translation("compression_larger", percent=compression_pct))
|
||||
else:
|
||||
logger.info(get_backend_translation("compression_smaller", percent=compression_pct))
|
||||
|
||||
except PermissionError:
|
||||
logger.error(get_backend_translation("no_write_permission", file=self.output_file))
|
||||
raise PermissionError(f"No write permission for the file {self.output_file}")
|
||||
except Exception as e:
|
||||
logger.error(get_backend_translation("error_writing_excel_file", error=str(e)))
|
||||
raise Exception(f"Error writing the Excel file: {e}")
|
||||
|
||||
def process(self):
|
||||
"""
|
||||
Main method for processing the Excel file
|
||||
"""
|
||||
# Start timing
|
||||
self.stats['start_time'] = time.time()
|
||||
|
||||
try:
|
||||
logger.info(get_backend_translation("starting_excel_filter_processing"))
|
||||
df = self.read_excel()
|
||||
filtered_df = self.filter_dataframe(df)
|
||||
self.write_excel(filtered_df)
|
||||
|
||||
# End timing and calculate final statistics
|
||||
self.stats['end_time'] = time.time()
|
||||
self.stats['processing_time_seconds'] = self.stats['end_time'] - self.stats['start_time']
|
||||
|
||||
self._log_final_statistics()
|
||||
|
||||
logger.info(get_backend_translation("excel_filter_processing_completed"))
|
||||
return True, None
|
||||
|
||||
except FileNotFoundError as e:
|
||||
error_msg = get_backend_translation("error_file_not_found", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except PermissionError as e:
|
||||
error_msg = get_backend_translation("error_permission", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except pd.errors.EmptyDataError as e:
|
||||
error_msg = get_backend_translation("error_empty_excel", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except pd.errors.ParserError as e:
|
||||
error_msg = get_backend_translation("error_parser", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except re.error as e:
|
||||
error_msg = get_backend_translation("error_invalid_regex", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except ValueError as e:
|
||||
error_msg = get_backend_translation("error_invalid_input", error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
except Exception as e:
|
||||
error_msg = get_backend_translation("error_unexpected", type=type(e).__name__, error=str(e))
|
||||
logger.error(error_msg)
|
||||
return False, error_msg
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Returns the collected statistics
|
||||
|
||||
Returns:
|
||||
Dictionary with all collected statistics
|
||||
"""
|
||||
return self.stats.copy()
|
||||
|
||||
def _log_final_statistics(self):
|
||||
"""
|
||||
Logs the final comprehensive statistics of the processing
|
||||
"""
|
||||
logger.info(get_backend_translation("processing_statistics"))
|
||||
logger.info(get_backend_translation("processing_time", time=self.stats['processing_time_seconds']))
|
||||
|
||||
# File statistics
|
||||
logger.info(get_backend_translation("file_statistics"))
|
||||
logger.info(get_backend_translation("input_file_size", size=self.stats['input_file_size'] / (1024*1024)))
|
||||
logger.info(get_backend_translation("output_file_size", size=self.stats['output_file_size'] / (1024*1024)))
|
||||
if self.stats['compression_ratio'] > 0:
|
||||
compression_pct = (self.stats['compression_ratio'] - 1) * 100
|
||||
logger.info(get_backend_translation("compression_rate", rate=compression_pct))
|
||||
|
||||
# Data dimensions
|
||||
logger.info(get_backend_translation("data_dimensions"))
|
||||
logger.info(get_backend_translation("input_dimensions", rows=self.stats['input_rows'], columns=self.stats['input_columns']))
|
||||
logger.info(get_backend_translation("output_dimensions", rows=self.stats['output_rows'], columns=self.stats['output_columns']))
|
||||
|
||||
# Filtering results
|
||||
if self.stats['filters_applied']:
|
||||
logger.info(get_backend_translation("filter_results"))
|
||||
logger.info(get_backend_translation("applied_filters", filters=', '.join(self.stats['filters_applied'])))
|
||||
if self.stats['input_rows'] > 0:
|
||||
retention_rate = (self.stats['rows_filtered'] / self.stats['input_rows']) * 100
|
||||
removal_rate = (self.stats['rows_removed'] / self.stats['input_rows']) * 100
|
||||
logger.info(get_backend_translation("rows_retained", rows=self.stats['rows_filtered'], rate=retention_rate))
|
||||
logger.info(get_backend_translation("rows_removed", rows=self.stats['rows_removed'], rate=removal_rate))
|
||||
|
||||
# Performance metrics
|
||||
logger.info(get_backend_translation("performance_metrics"))
|
||||
logger.info(get_backend_translation("memory_usage", size=self.stats['memory_usage_mb']))
|
||||
if self.stats['processing_time_seconds'] > 0 and self.stats['input_rows'] > 0:
|
||||
rows_per_second = self.stats['input_rows'] / self.stats['processing_time_seconds']
|
||||
logger.info(get_backend_translation("processing_speed", speed=rows_per_second))
|
||||
|
||||
logger.info(get_backend_translation("end_statistics"))
|
||||
Reference in New Issue
Block a user