import numpy as np import pandas as pd from .data_summary import summary_dataframe class DataCleaner: def __init__(self,data:pd.DataFrame,print_process=True) -> None: self.raw_data = data self.data = data.copy() self.drop_index = np.array([False]*len(self.raw_data)) self.print_process = print_process if self.print_process: summary_dataframe(df=self.raw_data,df_name='原始数据') def rm_na_and_inf(self): # 删除缺失数据 is_na_data = self.data.isna().any(axis=1).values is_inf_data = np.any(np.isinf(self.data.values),axis=1) drop_index = is_na_data | is_inf_data self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_na_and_inf') return self def rm_constant( self, window :int = 10, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除常数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) drop_index_matrix = (data.rolling(window=window).std()==0) drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_rolling_fluct( self, window :int = 10, unit :str = 'min', fun :str = 'ptp', thre :float = 0, include_cols :list = '__ALL__', include_by_re :bool = False, exclude_cols :list = None ): data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if unit is None: roll_window = window else: roll_window = str(window) + unit roll_data = data.rolling(window=roll_window,min_periods=1,center=True) if fun == 'ptp': res = roll_data.max() - roll_data.min() elif fun == 'pct': res = (roll_data.max() - roll_data.min())/roll_data.min() drop_index_matrix = res>thre drop_index = drop_index_matrix.any(axis=1) self.drop_index = self.drop_index | drop_index self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns) return self def rm_outlier_rolling_mean( self, window :int = 10, thre :float = 0.02, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除时序异常 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) data = data.reset_index(drop=True) windows_mean = data.rolling(window=window,min_periods=1).mean() drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_outlier_mean') return self def rm_negative( self, keep_zero :bool = False, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None ): # 删除负数 data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols) if keep_zero is True: drop_index = (data<0).any(axis=1).values else: drop_index = (data<=0).any(axis=1).values self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method='rm_negative') return self def rm_rule(self,remove_rule:str): # 基于规则删除数据 data = self.data.copy() drop_index = np.array(data.eval(remove_rule)) self.drop_index = drop_index | self.drop_index self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})') return self def get_data(self,fill=None) -> pd.DataFrame: if fill is None: # 保留非删除数据 result_data = self.raw_data.loc[~self.drop_index,:] else: # 填充非删除数据 result_data = self.raw_data.copy() result_data.loc[self.drop_index,:] = fill if self.print_process: summary_dataframe(result_data,df_name='结果数据') return result_data def _get_data_by_cols( self, include_cols :list = '__ALL__', include_by_re:bool = False, exclude_cols :list = None, ) -> pd.DataFrame: data = self.data.copy() if include_by_re is True: if isinstance(include_cols,str): cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns else: raise Exception('WRONG') elif include_by_re is False: if include_cols == '__ALL__': cols = data.columns elif isinstance(include_cols,str): cols = [include_cols] elif isinstance(include_cols,list): cols = data.loc[:,include_cols].columns else: raise Exception('WRONG') if exclude_cols is not None: cols = cols.difference(other=exclude_cols) return data.loc[:,cols] def _count_removed_data(self,index,method,index_matrix=None,var_name=None): count = index.sum() pct = round(count / len(index) * 100,2) if self.print_process: print(f'remove {count}({pct}%) by {method}') if index_matrix is not None and var_name is not None: var_drop_count = np.sum(index_matrix,axis=0) for var,drop_count in zip(var_name,var_drop_count): if drop_count == 0: continue if self.print_process: print(f'{var}:{drop_count}')