import warnings from typing import Union from datetime import datetime import numpy as np import pandas as pd from statsmodels.formula.api import rlm from scipy.stats import iqr from .data_summary import summary_dataframe class DataCleaner: def __init__(self,data:pd.DataFrame,print_process=True) -> None: self.raw_data = data self.data = data.copy() self.drop_index = np.array([False]*len(self.raw_data)) self.print_process = print_process if self.print_process: summary_dataframe(df=self.raw_data,df_name='原始数据') def rm_na_and_inf(self): # 删除缺失数据 is_na_data = self.data.isna().any(axis=1).values is_inf_data = np.any(np.isinf(self.data.values),axis=1) drop_index = is_na_data | is_inf_data self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_na_and_inf') return self def rm_constant( self, window :int = 10, exclude_value :list = None, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): # 删除常数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) drop_index_matrix = (data.rolling(window=window).std()==0) if exclude_value is not None: for each_value in exclude_value: keep_index_matrix = data.values == each_value drop_index_matrix[keep_index_matrix] = False drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_rolling_fluct( self, window :int = 10, unit :Union[str,None] = 'min', fun :str = 'ptp', thre :float = 0, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if unit is None: roll_window = window else: roll_window = str(window) + unit roll_data = data.rolling(window=roll_window,min_periods=1,center=True) if fun == 'ptp': res = roll_data.max() - roll_data.min() elif fun == 'pct': res = (roll_data.max() - roll_data.min())/roll_data.min() drop_index_matrix = res>thre drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_outlier_rolling_mean( self, window :int = 10, thre :float = 0.02, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除时序异常 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) data = data.reset_index(drop=True) windows_mean = data.rolling(window=window,min_periods=1).mean() drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_outlier_mean') return self def rm_diff( self, thre : float, shift : int = 1, include_cols : list = '__ALL__', include_by_re: bool = False, exclude_cols : list = None ): # shift 等于1时为后一项减前一项 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) data_diff = data.diff(periods=shift,axis=0) drop_index_matrix = data_diff.abs() > thre drop_index = drop_index_matrix.any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_zero( self, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) drop_index = (data==0).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_zero') return self def rm_negative( self, keep_zero :bool = False, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除负数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if keep_zero is True: drop_index = (data<0).any(axis=1).values else: drop_index = (data<=0).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_negative') return self def rm_rule(self,remove_rule:str): # 基于规则删除数据 data = self.data.copy() drop_index = np.array(data.eval(remove_rule)) self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})') return self def rm_regression_outlier( self, formula : str, rm_resid_IQR: float = 1.5, rm_dir : str = 'both', exclude_rule: Union[str,list,None] = None, min_sample : int = 30, ): #! 顺序敏感 RAW_INDEX = np.arange(len(self.data)) # 排除以外的数据,不参与计算 if exclude_rule is None: exclude_rule = [] if isinstance(exclude_rule,str): exclude_rule = [exclude_rule] exclued_index = np.array([False]*len(self.raw_data)) for rule in exclude_rule: exclued_index = exclued_index | np.array(self.data.eval(rule)) exclued_index = pd.Series(data=exclued_index,index=RAW_INDEX) exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values] # 待清洗的数据 data_clean = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)] filter_index = data_clean.RAW_INDEX_.values if len(data_clean) < min_sample: return self with warnings.catch_warnings(): warnings.simplefilter('ignore') mod = rlm(formula,data=data_clean).fit(maxiter=500) resid = np.array(mod.resid) IQR = iqr(resid) if rm_dir == 'both': drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR)) elif rm_dir == 'lower': drop_index = resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR) elif rm_dir == 'upper': drop_index = resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR) else: raise ValueError('rm_dir must be one of "both","lower","upper"') drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop) drop_index_complete = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values self.drop_index = drop_index_complete | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_reg({formula})') return self def rm_date_range(self,start:datetime,end:datetime,col=None): start = pd.Timestamp(start) end = pd.Timestamp(end) if col is None: ts = pd.to_datetime(self.raw_data.index) else: ts = pd.to_datetime(self.raw_data.loc[:,col]) drop_index = (ts>=start) & (ts<=end) self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})') return self def rm_outrange( self, method :str = 'quantile', upper :float = 0.99, lower :float = 0.01, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if method == 'quantile': q_upper = np.quantile(data.values,q=upper,axis=0) q_lower = np.quantile(data.values,q=lower,axis=0) elif method == 'raw': q_upper = upper q_lower = lower else: raise Exception('WRONG method') drop_index_matrix = (data > q_upper) | (data < q_lower) drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns) return self def get_data(self,fill=None,get_drop=False) -> pd.DataFrame: index = self.drop_index if not get_drop else ~self.drop_index if fill is None: # 保留非删除数据 result_data = self.raw_data.loc[~index,:] else: # 填充非删除数据 result_data = self.raw_data.copy() result_data.loc[index,:] = fill if self.print_process: summary_dataframe(result_data,df_name='结果数据') return result_data def _get_data_by_cols( self, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None, ) -> pd.DataFrame: data = self.data.copy() if include_by_re is True: if isinstance(include_cols,str): cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns else: raise Exception('WRONG') elif include_by_re is False: if include_cols == '__ALL__': cols = data.columns elif isinstance(include_cols,str): cols = [include_cols] elif isinstance(include_cols,list): cols = data.loc[:,include_cols].columns else: raise Exception('WRONG') if exclude_cols is not None: cols = cols.difference(other=exclude_cols) return data.loc[:,cols] def _count_removed_data(self,index,method,index_matrix=None,var_name=None): count = index.sum() pct = round(count / len(index) * 100,2) if self.print_process: print(f'remove {count}({pct}%) by {method}') if index_matrix is not None and var_name is not None: var_drop_count = np.sum(index_matrix,axis=0) for var,drop_count in zip(var_name,var_drop_count): if drop_count == 0: continue if self.print_process: print(f'{var}:{drop_count}')