data_cleaner.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. import warnings
  2. from typing import Union
  3. from datetime import datetime
  4. import numpy as np
  5. import pandas as pd
  6. from statsmodels.formula.api import rlm
  7. from scipy.stats import iqr
  8. from .data_summary import summary_dataframe
  9. class DataCleaner:
  10. def __init__(self,data:pd.DataFrame,print_process=True) -> None:
  11. self.raw_data = data
  12. self.data = data.copy()
  13. self.drop_index = np.array([False]*len(self.raw_data))
  14. self.print_process = print_process
  15. if self.print_process:
  16. summary_dataframe(df=self.raw_data,df_name='原始数据')
  17. def rm_na_and_inf(self):
  18. # 删除缺失数据
  19. is_na_data = self.data.isna().any(axis=1).values
  20. is_inf_data = np.any(np.isinf(self.data.values),axis=1)
  21. drop_index = is_na_data | is_inf_data
  22. self.drop_index = self.drop_index | drop_index
  23. self._count_removed_data(index=drop_index,method='rm_na_and_inf')
  24. return self
  25. def rm_constant(
  26. self,
  27. window :int = 10,
  28. exclude_value :list = None,
  29. include_cols :list = '__ALL__',
  30. include_by_re :bool = False,
  31. exclude_cols :list = None
  32. ):
  33. # 删除常数
  34. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  35. drop_index_matrix = (data.rolling(window=window).std()==0)
  36. if exclude_value is not None:
  37. for each_value in exclude_value:
  38. keep_index_matrix = data.values == each_value
  39. drop_index_matrix[keep_index_matrix] = False
  40. drop_index = drop_index_matrix.any(axis=1)
  41. self.drop_index = self.drop_index | drop_index
  42. self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
  43. return self
  44. def rm_rolling_fluct(
  45. self,
  46. window :int = 10,
  47. unit :Union[str,None] = 'min',
  48. fun :str = 'ptp',
  49. thre :float = 0,
  50. include_cols :list = '__ALL__',
  51. include_by_re :bool = False,
  52. exclude_cols :list = None
  53. ):
  54. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  55. if unit is None:
  56. roll_window = window
  57. else:
  58. roll_window = str(window) + unit
  59. roll_data = data.rolling(window=roll_window,min_periods=1,center=True)
  60. if fun == 'ptp':
  61. res = roll_data.max() - roll_data.min()
  62. elif fun == 'pct':
  63. res = (roll_data.max() - roll_data.min())/roll_data.min()
  64. drop_index_matrix = res>thre
  65. drop_index = drop_index_matrix.any(axis=1)
  66. self.drop_index = self.drop_index | drop_index
  67. self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
  68. return self
  69. def rm_outlier_rolling_mean(
  70. self,
  71. window :int = 10,
  72. thre :float = 0.02,
  73. include_cols :list = '__ALL__',
  74. include_by_re:bool = False,
  75. exclude_cols :list = None
  76. ):
  77. # 删除时序异常
  78. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  79. data = data.reset_index(drop=True)
  80. windows_mean = data.rolling(window=window,min_periods=1).mean()
  81. drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
  82. self.drop_index = drop_index | self.drop_index
  83. self._count_removed_data(index=drop_index,method='rm_outlier_mean')
  84. return self
  85. def rm_diff(
  86. self,
  87. thre : float,
  88. shift : int = 1,
  89. include_cols : list = '__ALL__',
  90. include_by_re: bool = False,
  91. exclude_cols : list = None
  92. ):
  93. # shift 等于1时为后一项减前一项
  94. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  95. data_diff = data.diff(periods=shift,axis=0)
  96. drop_index_matrix = data_diff.abs() > thre
  97. drop_index = drop_index_matrix.any(axis=1).values
  98. self.drop_index = drop_index | self.drop_index
  99. self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns)
  100. return self
  101. def rm_zero(
  102. self,
  103. include_cols :list = '__ALL__',
  104. include_by_re:bool = False,
  105. exclude_cols :list = None
  106. ):
  107. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  108. drop_index = (data==0).any(axis=1).values
  109. self.drop_index = drop_index | self.drop_index
  110. self._count_removed_data(index=drop_index,method='rm_zero')
  111. return self
  112. def rm_negative(
  113. self,
  114. keep_zero :bool = False,
  115. include_cols :list = '__ALL__',
  116. include_by_re:bool = False,
  117. exclude_cols :list = None
  118. ):
  119. # 删除负数
  120. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  121. if keep_zero is True:
  122. drop_index = (data<0).any(axis=1).values
  123. else:
  124. drop_index = (data<=0).any(axis=1).values
  125. self.drop_index = drop_index | self.drop_index
  126. self._count_removed_data(index=drop_index,method='rm_negative')
  127. return self
  128. def rm_rule(self,remove_rule:str):
  129. # 基于规则删除数据
  130. data = self.data.copy()
  131. drop_index = np.array(data.eval(remove_rule))
  132. self.drop_index = drop_index | self.drop_index
  133. self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
  134. return self
  135. def rm_regression_outlier(
  136. self,
  137. formula : str,
  138. rm_resid_IQR: float = 1.5,
  139. exclude_rule: Union[str,list,None] = None,
  140. min_sample : int = 30,
  141. ):
  142. #! 顺序敏感
  143. RAW_INDEX = np.arange(len(self.data))
  144. # 排除以外的数据,不参与计算
  145. if exclude_rule is None:
  146. exclude_rule = []
  147. if isinstance(exclude_rule,str):
  148. exclude_rule = [exclude_rule]
  149. exclued_index = np.array([False]*len(self.raw_data))
  150. for rule in exclude_rule:
  151. exclued_index = exclued_index | np.array(self.data.eval(rule))
  152. exclued_index = pd.Series(data=exclued_index,index=RAW_INDEX)
  153. exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values]
  154. # 待清洗的数据
  155. data_clean = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)]
  156. filter_index = data_clean.RAW_INDEX_.values
  157. if len(data_clean) < min_sample:
  158. return self
  159. with warnings.catch_warnings():
  160. warnings.simplefilter('ignore')
  161. mod = rlm(formula,data=data_clean).fit(maxiter=500)
  162. resid = np.array(mod.resid)
  163. IQR = iqr(resid)
  164. drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR))
  165. drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop)
  166. drop_index_complete = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values
  167. self.drop_index = drop_index_complete | self.drop_index
  168. self._count_removed_data(index=drop_index,method=f'rm_reg({formula})')
  169. return self
  170. def rm_date_range(self,start:datetime,end:datetime,col=None):
  171. start = pd.Timestamp(start)
  172. end = pd.Timestamp(end)
  173. if col is None:
  174. ts = pd.to_datetime(self.raw_data.index)
  175. else:
  176. ts = pd.to_datetime(self.raw_data.loc[:,col])
  177. drop_index = (ts>=start) & (ts<=end)
  178. self.drop_index = drop_index | self.drop_index
  179. self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})')
  180. return self
  181. def rm_outrange(
  182. self,
  183. method :str = 'quantile',
  184. upper :float = 0.99,
  185. lower :float = 0.01,
  186. include_cols :list = '__ALL__',
  187. include_by_re :bool = False,
  188. exclude_cols :list = None
  189. ):
  190. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  191. if method == 'quantile':
  192. q_upper = np.quantile(data.values,q=upper,axis=0)
  193. q_lower = np.quantile(data.values,q=lower,axis=0)
  194. elif method == 'raw':
  195. q_upper = upper
  196. q_lower = lower
  197. else:
  198. raise Exception('WRONG method')
  199. drop_index_matrix = (data > q_upper) | (data < q_lower)
  200. drop_index = drop_index_matrix.any(axis=1)
  201. self.drop_index = self.drop_index | drop_index
  202. self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns)
  203. return self
  204. def get_data(self,fill=None,get_drop=False) -> pd.DataFrame:
  205. index = self.drop_index if not get_drop else ~self.drop_index
  206. if fill is None:
  207. # 保留非删除数据
  208. result_data = self.raw_data.loc[~index,:]
  209. else:
  210. # 填充非删除数据
  211. result_data = self.raw_data.copy()
  212. result_data.loc[index,:] = fill
  213. if self.print_process:
  214. summary_dataframe(result_data,df_name='结果数据')
  215. return result_data
  216. def _get_data_by_cols(
  217. self,
  218. include_cols :list = '__ALL__',
  219. include_by_re:bool = False,
  220. exclude_cols :list = None,
  221. ) -> pd.DataFrame:
  222. data = self.data.copy()
  223. if include_by_re is True:
  224. if isinstance(include_cols,str):
  225. cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
  226. else:
  227. raise Exception('WRONG')
  228. elif include_by_re is False:
  229. if include_cols == '__ALL__':
  230. cols = data.columns
  231. elif isinstance(include_cols,str):
  232. cols = [include_cols]
  233. elif isinstance(include_cols,list):
  234. cols = data.loc[:,include_cols].columns
  235. else:
  236. raise Exception('WRONG')
  237. if exclude_cols is not None:
  238. cols = cols.difference(other=exclude_cols)
  239. return data.loc[:,cols]
  240. def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
  241. count = index.sum()
  242. pct = round(count / len(index) * 100,2)
  243. if self.print_process:
  244. print(f'remove {count}({pct}%) by {method}')
  245. if index_matrix is not None and var_name is not None:
  246. var_drop_count = np.sum(index_matrix,axis=0)
  247. for var,drop_count in zip(var_name,var_drop_count):
  248. if drop_count == 0:
  249. continue
  250. if self.print_process:
  251. print(f'{var}:{drop_count}')