import warnings from typing import Union from datetime import datetime import sys from io import StringIO import numpy as np import pandas as pd from statsmodels.formula.api import rlm from scipy.stats import iqr from .data_summary import summary_dataframe class DataCleaner: def __init__(self,data:pd.DataFrame,print_process=True) -> None: self.raw_data = data self.data = data.copy() self.drop_index = np.array([False]*len(self.raw_data)) self.print_process = print_process self.log_info = [] raw_info = capture_print_output( summary_dataframe, interfere = not print_process, df = self.raw_data, df_name = '原始数据' ) self.log_info.append(raw_info) def rm_na_and_inf(self): # 删除缺失数据 is_na_data = self.data.isna().any(axis=1).values is_inf_data = np.any(np.isinf(self.data.values),axis=1) drop_index = is_na_data | is_inf_data self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_na_and_inf') return self def rm_constant( self, window :int = 10, exclude_value :list = None, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): # 删除常数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) drop_index_matrix = (data.rolling(window=window).std()==0) if exclude_value is not None: for each_value in exclude_value: keep_index_matrix = data.values == each_value drop_index_matrix[keep_index_matrix] = False drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_rolling_fluct( self, window :int = 10, unit :Union[str,None] = 'min', fun :str = 'ptp', thre :float = 0, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if unit is None: roll_window = window else: roll_window = str(window) + unit roll_data = data.rolling(window=roll_window,min_periods=1,center=True) if fun == 'ptp': res = roll_data.max() - roll_data.min() elif fun == 'pct': res = (roll_data.max() - roll_data.min())/roll_data.min() drop_index_matrix = res>thre drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_outlier_rolling_mean( self, window :int = 10, thre :float = 0.02, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除时序异常 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) data = data.reset_index(drop=True) windows_mean = data.rolling(window=window,min_periods=1).mean() drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_outlier_mean') return self def rm_diff( self, thre : float, shift : int = 1, include_cols : list = '__ALL__', include_by_re: bool = False, exclude_cols : list = None ): # shift 等于1时为后一项减前一项 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) data_diff = data.diff(periods=shift,axis=0) drop_index_matrix = data_diff.abs() > thre drop_index = drop_index_matrix.any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_zero( self, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) drop_index = (data==0).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_zero') return self def rm_negative( self, keep_zero :bool = False, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除负数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if keep_zero is True: drop_index = (data<0).any(axis=1).values else: drop_index = (data<=0).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_negative') return self def rm_rule(self,remove_rule:str): # 基于规则删除数据 data = self.data.copy() drop_index = np.array(data.eval(remove_rule)) self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})') return self def rm_regression_outlier( self, formula : str, rm_resid_IQR: float = 1.5, rm_dir : str = 'both', exclude_rule: Union[str,list,None] = None, min_sample : int = 30, ): #! 顺序敏感 RAW_INDEX = np.arange(len(self.data)) # 排除以外的数据,不参与计算 if exclude_rule is None: exclude_rule = [] if isinstance(exclude_rule,str): exclude_rule = [exclude_rule] exclued_index = np.array([False]*len(self.raw_data)) for rule in exclude_rule: exclued_index = exclued_index | np.array(self.data.eval(rule)) exclued_index = pd.Series(data=exclued_index,index=RAW_INDEX) exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values] # 待清洗的数据 data_clean = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)] filter_index = data_clean.RAW_INDEX_.values if len(data_clean) < min_sample: return self with warnings.catch_warnings(): warnings.simplefilter('ignore') mod = rlm(formula,data=data_clean).fit(maxiter=500) resid = np.array(mod.resid) IQR = iqr(resid) if rm_dir == 'both': drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR)) elif rm_dir == 'lower': drop_index = resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR) elif rm_dir == 'upper': drop_index = resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR) else: raise ValueError('rm_dir must be one of "both","lower","upper"') drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop) drop_index_complete = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values self.drop_index = drop_index_complete | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_reg({formula})') return self def rm_date_range(self,start:datetime,end:datetime,col=None): start = pd.Timestamp(start) end = pd.Timestamp(end) if col is None: ts = pd.to_datetime(self.raw_data.index) else: ts = pd.to_datetime(self.raw_data.loc[:,col]) drop_index = (ts>=start) & (ts<=end) self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})') return self def rm_outrange( self, method :str = 'quantile', upper :float = 0.99, lower :float = 0.01, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if method == 'quantile': q_upper = np.quantile(data.values,q=upper,axis=0) q_lower = np.quantile(data.values,q=lower,axis=0) elif method == 'raw': q_upper = upper q_lower = lower else: raise Exception('WRONG method') drop_index_matrix = (data > q_upper) | (data < q_lower) drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns) return self def save_log(self,path:str='./log.txt'): with open(path, "w", encoding="utf-8") as f: for line in self.log_info: f.write(line + "\n") return self def get_data(self,fill=None,get_drop=False,save_log=None) -> pd.DataFrame: index = self.drop_index if not get_drop else ~self.drop_index if fill is None: # 保留非删除数据 result_data = self.raw_data.loc[~index,:] else: # 填充非删除数据 result_data = self.raw_data.copy() result_data.loc[index,:] = fill res_info = capture_print_output( summary_dataframe, interfere = not self.print_process, df = result_data, df_name = '结果数据' ) self.log_info.append(res_info) if save_log is not None: self.save_log(save_log) return result_data def _get_data_by_cols( self, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None, ) -> pd.DataFrame: data = self.data.copy() if include_by_re is True: if isinstance(include_cols,str): cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns else: raise Exception('WRONG') elif include_by_re is False: if include_cols == '__ALL__': cols = data.columns elif isinstance(include_cols,str): cols = [include_cols] elif isinstance(include_cols,list): cols = data.loc[:,include_cols].columns else: raise Exception('WRONG') if exclude_cols is not None: cols = cols.difference(other=exclude_cols) return data.loc[:,cols] def _count_removed_data(self,index,method,index_matrix=None,var_name=None): count = index.sum() pct = round(count / len(index) * 100,2) info = f'remove {count}({pct}%) by {method}' self.log_info.append(info) if self.print_process: print(info) if index_matrix is not None and var_name is not None: var_drop_count = np.sum(index_matrix,axis=0) for var,drop_count in zip(var_name,var_drop_count): if drop_count == 0: continue info = f'{var}:{drop_count}' self.log_info.append(info) if self.print_process: print(info) def capture_print_output(func, *args, interfere=False, **kwargs): """ 捕获函数的打印输出并返回为字符串,可选择是否影响原函数的正常打印 参数: func: 要执行的函数 *args: 传递给函数的位置参数 interfere: 是否干扰原函数打印 (默认False) **kwargs: 传递给函数的关键字参数 返回: 函数执行过程中所有打印输出的字符串 """ # 创建一个字符串缓冲区来捕获输出 new_stdout = StringIO() # 保存原来的标准输出 old_stdout = sys.stdout if interfere: # 简单重定向模式 - 会干扰原打印 sys.stdout = new_stdout else: # 不干扰模式 - 使用Tee类同时输出 class Tee: def __init__(self, old, new): self.old = old self.new = new def write(self, text): self.old.write(text) # 保持原打印 self.new.write(text) # 捕获到字符串 def flush(self): self.old.flush() self.new.flush() sys.stdout = Tee(old_stdout, new_stdout) try: # 执行函数 func(*args, **kwargs) # 获取捕获的输出 output = new_stdout.getvalue() finally: # 恢复原来的标准输出 sys.stdout = old_stdout return output