| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- import warnings
- from typing import Union
- from datetime import datetime
- import sys
- from io import StringIO
- import numpy as np
- import pandas as pd
- from statsmodels.formula.api import rlm
- from scipy.stats import iqr
- from .data_summary import summary_dataframe
- class DataCleaner:
-
- def __init__(self,data:pd.DataFrame,print_process=True) -> None:
- self.raw_data = data
- self.data = data.copy()
- self.drop_index = np.array([False]*len(self.raw_data))
- self.print_process = print_process
- self.log_info = []
-
- raw_info = capture_print_output(
- summary_dataframe,
- interfere = not print_process,
- df = self.raw_data,
- df_name = '原始数据'
- )
- self.log_info.append(raw_info)
-
- def rm_na_and_inf(self):
- # 删除缺失数据
- is_na_data = self.data.isna().any(axis=1).values
- is_inf_data = np.any(np.isinf(self.data.values),axis=1)
- drop_index = is_na_data | is_inf_data
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_na_and_inf')
- return self
-
- def rm_constant(
- self,
- window :int = 10,
- exclude_value :list = None,
- include_cols :list = '__ALL__',
- include_by_re :bool = False,
- exclude_cols :list = None
- ):
- # 删除常数
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- drop_index_matrix = (data.rolling(window=window).std()==0)
- if exclude_value is not None:
- for each_value in exclude_value:
- keep_index_matrix = data.values == each_value
- drop_index_matrix[keep_index_matrix] = False
- drop_index = drop_index_matrix.any(axis=1)
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
- return self
- def rm_rolling_fluct(
- self,
- window :int = 10,
- unit :Union[str,None] = 'min',
- fun :str = 'ptp',
- thre :float = 0,
- include_cols :list = '__ALL__',
- include_by_re :bool = False,
- exclude_cols :list = None
- ):
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
-
- if unit is None:
- roll_window = window
- else:
- roll_window = str(window) + unit
- roll_data = data.rolling(window=roll_window,min_periods=1,center=True)
-
- if fun == 'ptp':
- res = roll_data.max() - roll_data.min()
- elif fun == 'pct':
- res = (roll_data.max() - roll_data.min())/roll_data.min()
- drop_index_matrix = res>thre
- drop_index = drop_index_matrix.any(axis=1)
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
- return self
-
- def rm_outlier_rolling_mean(
- self,
- window :int = 10,
- thre :float = 0.02,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- # 删除时序异常
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- data = data.reset_index(drop=True)
- windows_mean = data.rolling(window=window,min_periods=1).mean()
- drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_outlier_mean')
- return self
-
- def rm_diff(
- self,
- thre : float,
- shift : int = 1,
- include_cols : list = '__ALL__',
- include_by_re: bool = False,
- exclude_cols : list = None
- ):
- # shift 等于1时为后一项减前一项
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- data_diff = data.diff(periods=shift,axis=0)
- drop_index_matrix = data_diff.abs() > thre
- drop_index = drop_index_matrix.any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns)
- return self
-
- def rm_zero(
- self,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- drop_index = (data==0).any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_zero')
- return self
-
-
- def rm_negative(
- self,
- keep_zero :bool = False,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None
- ):
- # 删除负数
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- if keep_zero is True:
- drop_index = (data<0).any(axis=1).values
- else:
- drop_index = (data<=0).any(axis=1).values
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method='rm_negative')
- return self
-
- def rm_rule(self,remove_rule:str):
- # 基于规则删除数据
- data = self.data.copy()
- drop_index = np.array(data.eval(remove_rule))
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
- return self
- def rm_regression_outlier(
- self,
- formula : str,
- rm_resid_IQR: float = 1.5,
- rm_dir : str = 'both',
- exclude_rule: Union[str,list,None] = None,
- min_sample : int = 30,
- ):
- #! 顺序敏感
- RAW_INDEX = np.arange(len(self.data))
-
- # 排除以外的数据,不参与计算
- if exclude_rule is None:
- exclude_rule = []
- if isinstance(exclude_rule,str):
- exclude_rule = [exclude_rule]
- exclued_index = np.array([False]*len(self.raw_data))
- for rule in exclude_rule:
- exclued_index = exclued_index | np.array(self.data.eval(rule))
- exclued_index = pd.Series(data=exclued_index,index=RAW_INDEX)
- exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values]
-
- # 待清洗的数据
- data_clean = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)]
- filter_index = data_clean.RAW_INDEX_.values
-
- if len(data_clean) < min_sample:
- return self
-
- with warnings.catch_warnings():
- warnings.simplefilter('ignore')
- mod = rlm(formula,data=data_clean).fit(maxiter=500)
- resid = np.array(mod.resid)
- IQR = iqr(resid)
- if rm_dir == 'both':
- drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR))
- elif rm_dir == 'lower':
- drop_index = resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)
- elif rm_dir == 'upper':
- drop_index = resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR)
- else:
- raise ValueError('rm_dir must be one of "both","lower","upper"')
-
- drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop)
- drop_index_complete = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values
- self.drop_index = drop_index_complete | self.drop_index
- self._count_removed_data(index=drop_index,method=f'rm_reg({formula})')
- return self
-
- def rm_date_range(self,start:datetime,end:datetime,col=None):
- start = pd.Timestamp(start)
- end = pd.Timestamp(end)
- if col is None:
- ts = pd.to_datetime(self.raw_data.index)
- else:
- ts = pd.to_datetime(self.raw_data.loc[:,col])
- drop_index = (ts>=start) & (ts<=end)
- self.drop_index = drop_index | self.drop_index
- self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})')
- return self
-
- def rm_outrange(
- self,
- method :str = 'quantile',
- upper :float = 0.99,
- lower :float = 0.01,
- include_cols :list = '__ALL__',
- include_by_re :bool = False,
- exclude_cols :list = None
- ):
- data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
- if method == 'quantile':
- q_upper = np.quantile(data.values,q=upper,axis=0)
- q_lower = np.quantile(data.values,q=lower,axis=0)
- elif method == 'raw':
- q_upper = upper
- q_lower = lower
- else:
- raise Exception('WRONG method')
-
- drop_index_matrix = (data > q_upper) | (data < q_lower)
- drop_index = drop_index_matrix.any(axis=1)
- self.drop_index = self.drop_index | drop_index
- self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns)
-
- return self
-
- def save_log(self,path:str='./log.txt'):
- with open(path, "w", encoding="utf-8") as f:
- for line in self.log_info:
- f.write(line + "\n")
- return self
-
- def get_data(self,fill=None,get_drop=False,save_log=None) -> pd.DataFrame:
- index = self.drop_index if not get_drop else ~self.drop_index
- if fill is None:
- # 保留非删除数据
- result_data = self.raw_data.loc[~index,:]
- else:
- # 填充非删除数据
- result_data = self.raw_data.copy()
- result_data.loc[index,:] = fill
- res_info = capture_print_output(
- summary_dataframe,
- interfere = not self.print_process,
- df = result_data,
- df_name = '结果数据'
- )
- self.log_info.append(res_info)
- if save_log is not None:
- self.save_log(save_log)
- return result_data
-
- def _get_data_by_cols(
- self,
- include_cols :list = '__ALL__',
- include_by_re:bool = False,
- exclude_cols :list = None,
- ) -> pd.DataFrame:
- data = self.data.copy()
-
- if include_by_re is True:
- if isinstance(include_cols,str):
- cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
- else:
- raise Exception('WRONG')
-
- elif include_by_re is False:
- if include_cols == '__ALL__':
- cols = data.columns
- elif isinstance(include_cols,str):
- cols = [include_cols]
- elif isinstance(include_cols,list):
- cols = data.loc[:,include_cols].columns
- else:
- raise Exception('WRONG')
-
- if exclude_cols is not None:
- cols = cols.difference(other=exclude_cols)
-
- return data.loc[:,cols]
-
-
- def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
- count = index.sum()
- pct = round(count / len(index) * 100,2)
- info = f'remove {count}({pct}%) by {method}'
- self.log_info.append(info)
- if self.print_process:
- print(info)
-
- if index_matrix is not None and var_name is not None:
- var_drop_count = np.sum(index_matrix,axis=0)
- for var,drop_count in zip(var_name,var_drop_count):
- if drop_count == 0:
- continue
- info = f'{var}:{drop_count}'
- self.log_info.append(info)
- if self.print_process:
- print(info)
- def capture_print_output(func, *args, interfere=False, **kwargs):
- """
- 捕获函数的打印输出并返回为字符串,可选择是否影响原函数的正常打印
-
- 参数:
- func: 要执行的函数
- *args: 传递给函数的位置参数
- interfere: 是否干扰原函数打印 (默认False)
- **kwargs: 传递给函数的关键字参数
-
- 返回:
- 函数执行过程中所有打印输出的字符串
- """
- # 创建一个字符串缓冲区来捕获输出
- new_stdout = StringIO()
-
- # 保存原来的标准输出
- old_stdout = sys.stdout
-
- if interfere:
- # 简单重定向模式 - 会干扰原打印
- sys.stdout = new_stdout
- else:
- # 不干扰模式 - 使用Tee类同时输出
- class Tee:
- def __init__(self, old, new):
- self.old = old
- self.new = new
-
- def write(self, text):
- self.old.write(text) # 保持原打印
- self.new.write(text) # 捕获到字符串
-
- def flush(self):
- self.old.flush()
- self.new.flush()
-
- sys.stdout = Tee(old_stdout, new_stdout)
-
- try:
- # 执行函数
- func(*args, **kwargs)
- # 获取捕获的输出
- output = new_stdout.getvalue()
- finally:
- # 恢复原来的标准输出
- sys.stdout = old_stdout
-
- return output
|