data_cleaner.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import warnings
  2. from typing import Union
  3. from datetime import datetime
  4. import numpy as np
  5. import pandas as pd
  6. from statsmodels.formula.api import rlm
  7. from scipy.stats import iqr
  8. from .data_summary import summary_dataframe
  9. class DataCleaner:
  10. def __init__(self,data:pd.DataFrame,print_process=True) -> None:
  11. self.raw_data = data
  12. self.data = data.copy()
  13. self.drop_index = np.array([False]*len(self.raw_data))
  14. self.print_process = print_process
  15. if self.print_process:
  16. summary_dataframe(df=self.raw_data,df_name='原始数据')
  17. def rm_na_and_inf(self):
  18. # 删除缺失数据
  19. is_na_data = self.data.isna().any(axis=1).values
  20. is_inf_data = np.any(np.isinf(self.data.values),axis=1)
  21. drop_index = is_na_data | is_inf_data
  22. self.drop_index = self.drop_index | drop_index
  23. self._count_removed_data(index=drop_index,method='rm_na_and_inf')
  24. return self
  25. def rm_constant(
  26. self,
  27. window :int = 10,
  28. exclude_value :list = None,
  29. include_cols :list = '__ALL__',
  30. include_by_re :bool = False,
  31. exclude_cols :list = None
  32. ):
  33. # 删除常数
  34. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  35. drop_index_matrix = (data.rolling(window=window).std()==0)
  36. if exclude_value is not None:
  37. for each_value in exclude_value:
  38. keep_index_matrix = data.values == each_value
  39. drop_index_matrix[keep_index_matrix] = False
  40. drop_index = drop_index_matrix.any(axis=1)
  41. self.drop_index = self.drop_index | drop_index
  42. self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
  43. return self
  44. def rm_rolling_fluct(
  45. self,
  46. window :int = 10,
  47. unit :Union[str,None] = 'min',
  48. fun :str = 'ptp',
  49. thre :float = 0,
  50. include_cols :list = '__ALL__',
  51. include_by_re :bool = False,
  52. exclude_cols :list = None
  53. ):
  54. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  55. if unit is None:
  56. roll_window = window
  57. else:
  58. roll_window = str(window) + unit
  59. roll_data = data.rolling(window=roll_window,min_periods=1,center=True)
  60. if fun == 'ptp':
  61. res = roll_data.max() - roll_data.min()
  62. elif fun == 'pct':
  63. res = (roll_data.max() - roll_data.min())/roll_data.min()
  64. drop_index_matrix = res>thre
  65. drop_index = drop_index_matrix.any(axis=1)
  66. self.drop_index = self.drop_index | drop_index
  67. self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
  68. return self
  69. def rm_outlier_rolling_mean(
  70. self,
  71. window :int = 10,
  72. thre :float = 0.02,
  73. include_cols :list = '__ALL__',
  74. include_by_re:bool = False,
  75. exclude_cols :list = None
  76. ):
  77. # 删除时序异常
  78. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  79. data = data.reset_index(drop=True)
  80. windows_mean = data.rolling(window=window,min_periods=1).mean()
  81. drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
  82. self.drop_index = drop_index | self.drop_index
  83. self._count_removed_data(index=drop_index,method='rm_outlier_mean')
  84. return self
  85. def rm_diff(
  86. self,
  87. thre : float,
  88. shift : int = 1,
  89. include_cols : list = '__ALL__',
  90. include_by_re: bool = False,
  91. exclude_cols : list = None
  92. ):
  93. # shift 等于1时为后一项减前一项
  94. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  95. data_diff = data.diff(periods=shift,axis=0)
  96. drop_index_matrix = data_diff.abs() > thre
  97. drop_index = drop_index_matrix.any(axis=1).values
  98. self.drop_index = drop_index | self.drop_index
  99. self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns)
  100. return self
  101. def rm_zero(
  102. self,
  103. include_cols :list = '__ALL__',
  104. include_by_re:bool = False,
  105. exclude_cols :list = None
  106. ):
  107. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  108. drop_index = (data==0).any(axis=1).values
  109. self.drop_index = drop_index | self.drop_index
  110. self._count_removed_data(index=drop_index,method='rm_zero')
  111. return self
  112. def rm_negative(
  113. self,
  114. keep_zero :bool = False,
  115. include_cols :list = '__ALL__',
  116. include_by_re:bool = False,
  117. exclude_cols :list = None
  118. ):
  119. # 删除负数
  120. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  121. if keep_zero is True:
  122. drop_index = (data<0).any(axis=1).values
  123. else:
  124. drop_index = (data<=0).any(axis=1).values
  125. self.drop_index = drop_index | self.drop_index
  126. self._count_removed_data(index=drop_index,method='rm_negative')
  127. return self
  128. def rm_rule(self,remove_rule:str):
  129. # 基于规则删除数据
  130. data = self.data.copy()
  131. drop_index = np.array(data.eval(remove_rule))
  132. self.drop_index = drop_index | self.drop_index
  133. self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
  134. return self
  135. def rm_regression_outlier(
  136. self,
  137. formula : str,
  138. rm_resid_IQR: float = 1.5,
  139. rm_dir : str = 'both',
  140. exclude_rule: Union[str,list,None] = None,
  141. min_sample : int = 30,
  142. ):
  143. #! 顺序敏感
  144. RAW_INDEX = np.arange(len(self.data))
  145. # 排除以外的数据,不参与计算
  146. if exclude_rule is None:
  147. exclude_rule = []
  148. if isinstance(exclude_rule,str):
  149. exclude_rule = [exclude_rule]
  150. exclued_index = np.array([False]*len(self.raw_data))
  151. for rule in exclude_rule:
  152. exclued_index = exclued_index | np.array(self.data.eval(rule))
  153. exclued_index = pd.Series(data=exclued_index,index=RAW_INDEX)
  154. exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values]
  155. # 待清洗的数据
  156. data_clean = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)]
  157. filter_index = data_clean.RAW_INDEX_.values
  158. if len(data_clean) < min_sample:
  159. return self
  160. with warnings.catch_warnings():
  161. warnings.simplefilter('ignore')
  162. mod = rlm(formula,data=data_clean).fit(maxiter=500)
  163. resid = np.array(mod.resid)
  164. IQR = iqr(resid)
  165. if rm_dir == 'both':
  166. drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR))
  167. elif rm_dir == 'lower':
  168. drop_index = resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)
  169. elif rm_dir == 'upper':
  170. drop_index = resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR)
  171. else:
  172. raise ValueError('rm_dir must be one of "both","lower","upper"')
  173. drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop)
  174. drop_index_complete = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values
  175. self.drop_index = drop_index_complete | self.drop_index
  176. self._count_removed_data(index=drop_index,method=f'rm_reg({formula})')
  177. return self
  178. def rm_date_range(self,start:datetime,end:datetime,col=None):
  179. start = pd.Timestamp(start)
  180. end = pd.Timestamp(end)
  181. if col is None:
  182. ts = pd.to_datetime(self.raw_data.index)
  183. else:
  184. ts = pd.to_datetime(self.raw_data.loc[:,col])
  185. drop_index = (ts>=start) & (ts<=end)
  186. self.drop_index = drop_index | self.drop_index
  187. self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})')
  188. return self
  189. def rm_outrange(
  190. self,
  191. method :str = 'quantile',
  192. upper :float = 0.99,
  193. lower :float = 0.01,
  194. include_cols :list = '__ALL__',
  195. include_by_re :bool = False,
  196. exclude_cols :list = None
  197. ):
  198. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  199. if method == 'quantile':
  200. q_upper = np.quantile(data.values,q=upper,axis=0)
  201. q_lower = np.quantile(data.values,q=lower,axis=0)
  202. elif method == 'raw':
  203. q_upper = upper
  204. q_lower = lower
  205. else:
  206. raise Exception('WRONG method')
  207. drop_index_matrix = (data > q_upper) | (data < q_lower)
  208. drop_index = drop_index_matrix.any(axis=1)
  209. self.drop_index = self.drop_index | drop_index
  210. self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns)
  211. return self
  212. def get_data(self,fill=None,get_drop=False) -> pd.DataFrame:
  213. index = self.drop_index if not get_drop else ~self.drop_index
  214. if fill is None:
  215. # 保留非删除数据
  216. result_data = self.raw_data.loc[~index,:]
  217. else:
  218. # 填充非删除数据
  219. result_data = self.raw_data.copy()
  220. result_data.loc[index,:] = fill
  221. if self.print_process:
  222. summary_dataframe(result_data,df_name='结果数据')
  223. return result_data
  224. def _get_data_by_cols(
  225. self,
  226. include_cols :list = '__ALL__',
  227. include_by_re:bool = False,
  228. exclude_cols :list = None,
  229. ) -> pd.DataFrame:
  230. data = self.data.copy()
  231. if include_by_re is True:
  232. if isinstance(include_cols,str):
  233. cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
  234. else:
  235. raise Exception('WRONG')
  236. elif include_by_re is False:
  237. if include_cols == '__ALL__':
  238. cols = data.columns
  239. elif isinstance(include_cols,str):
  240. cols = [include_cols]
  241. elif isinstance(include_cols,list):
  242. cols = data.loc[:,include_cols].columns
  243. else:
  244. raise Exception('WRONG')
  245. if exclude_cols is not None:
  246. cols = cols.difference(other=exclude_cols)
  247. return data.loc[:,cols]
  248. def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
  249. count = index.sum()
  250. pct = round(count / len(index) * 100,2)
  251. if self.print_process:
  252. print(f'remove {count}({pct}%) by {method}')
  253. if index_matrix is not None and var_name is not None:
  254. var_drop_count = np.sum(index_matrix,axis=0)
  255. for var,drop_count in zip(var_name,var_drop_count):
  256. if drop_count == 0:
  257. continue
  258. if self.print_process:
  259. print(f'{var}:{drop_count}')