| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import numpy as np
- import pandas as pd
- from .data_summary import summary_dataframe
- class DataCleaner:
-
- def __init__(self,data:pd.DataFrame,print_process=True) -> None:
- self.raw_data = data
- self.data = data.copy()
- self.drop_index = np.array([False]*len(self.raw_data))
- self.print_process = print_process
-
- if self.print_process:
- summary_dataframe(df=self.raw_data,df_name='原始数据')
-
- def rm_na_and_inf(self):
- # 删除缺失数据
- is_na_data = self.data.isna().any(axis=1).values
- is_inf_data = np.any(np.isinf(self.data.values),axis=1)
- drop_index = is_na_data | is_inf_data
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_na_and_inf')
- return self
-
- def rm_constant(
- self,
- window :int = 10,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- # 删除常数
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- drop_index_matrix = (data.rolling(window=window).std()==0)
- drop_index = drop_index_matrix.any(axis=1)
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
- return self
- def rm_rolling_fluct(
- self,
- window :int = 10,
- unit :str = 'min',
- fun :str = 'ptp',
- thre :float = 0,
- include_cols :list = '__ALL__',
- include_by_re :bool = False,
- exclude_cols :list = None
- ):
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
-
- if unit is None:
- roll_window = window
- else:
- roll_window = str(window) + unit
- roll_data = data.rolling(window=roll_window,min_periods=1,center=True)
-
- if fun == 'ptp':
- res = roll_data.max() - roll_data.min()
- elif fun == 'pct':
- res = (roll_data.max() - roll_data.min())/roll_data.min()
- drop_index_matrix = res>thre
- drop_index = drop_index_matrix.any(axis=1)
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
- return self
-
- def rm_outlier_rolling_mean(
- self,
- window :int = 10,
- thre :float = 0.02,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- # 删除时序异常
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- data = data.reset_index(drop=True)
- windows_mean = data.rolling(window=window,min_periods=1).mean()
- drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_outlier_mean')
- return self
-
- def rm_negative(
- self,
- keep_zero :bool = False,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- # 删除负数
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- if keep_zero is True:
- drop_index = (data<0).any(axis=1).values
- else:
- drop_index = (data<=0).any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_negative')
- return self
-
- def rm_rule(self,remove_rule:str):
- # 基于规则删除数据
- data = self.data.copy()
- drop_index = np.array(data.eval(remove_rule))
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
- return self
- def get_data(self,fill=None) -> pd.DataFrame:
- if fill is None:
- # 保留非删除数据
- result_data = self.raw_data.loc[~self.drop_index,:]
- else:
- # 填充非删除数据
- result_data = self.raw_data.copy()
- result_data.loc[self.drop_index,:] = fill
-
- if self.print_process:
- summary_dataframe(result_data,df_name='结果数据')
- return result_data
-
- def _get_data_by_cols(
- self,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None,
- ) -> pd.DataFrame:
- data = self.data.copy()
-
- if include_by_re is True:
- if isinstance(include_cols,str):
- cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
- else:
- raise Exception('WRONG')
-
- elif include_by_re is False:
- if include_cols == '__ALL__':
- cols = data.columns
- elif isinstance(include_cols,str):
- cols = [include_cols]
- elif isinstance(include_cols,list):
- cols = data.loc[:,include_cols].columns
- else:
- raise Exception('WRONG')
-
- if exclude_cols is not None:
- cols = cols.difference(other=exclude_cols)
-
- return data.loc[:,cols]
-
-
- def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
- count = index.sum()
- pct = round(count / len(index) * 100,2)
- if self.print_process:
- print(f'remove {count}({pct}%) by {method}')
-
- if index_matrix is not None and var_name is not None:
- var_drop_count = np.sum(index_matrix,axis=0)
- for var,drop_count in zip(var_name,var_drop_count):
- if drop_count == 0:
- continue
- if self.print_process:
- print(f'{var}:{drop_count}')
|