data_cleaner.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import numpy as np
  2. import pandas as pd
  3. from .data_summary import summary_dataframe
  4. class DataCleaner:
  5. def __init__(self,data:pd.DataFrame,print_process=True) -> None:
  6. self.raw_data = data
  7. self.data = data.copy()
  8. self.drop_index = np.array([False]*len(self.raw_data))
  9. self.print_process = print_process
  10. if self.print_process:
  11. summary_dataframe(df=self.raw_data,df_name='原始数据')
  12. def rm_na_and_inf(self):
  13. # 删除缺失数据
  14. is_na_data = self.data.isna().any(axis=1).values
  15. is_inf_data = np.any(np.isinf(self.data.values),axis=1)
  16. drop_index = is_na_data | is_inf_data
  17. self.drop_index = self.drop_index | drop_index
  18. self._count_removed_data(index=drop_index,method='rm_na_and_inf')
  19. return self
  20. def rm_constant(
  21. self,
  22. window :int = 10,
  23. include_cols :list = '__ALL__',
  24. include_by_re:bool = False,
  25. exclude_cols :list = None
  26. ):
  27. # 删除常数
  28. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  29. drop_index_matrix = (data.rolling(window=window).std()==0)
  30. drop_index = drop_index_matrix.any(axis=1)
  31. self.drop_index = self.drop_index | drop_index
  32. self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
  33. return self
  34. def rm_rolling_fluct(
  35. self,
  36. window :int = 10,
  37. unit :str = 'min',
  38. fun :str = 'ptp',
  39. thre :float = 0,
  40. include_cols :list = '__ALL__',
  41. include_by_re :bool = False,
  42. exclude_cols :list = None
  43. ):
  44. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  45. if unit is None:
  46. roll_window = window
  47. else:
  48. roll_window = str(window) + unit
  49. roll_data = data.rolling(window=roll_window,min_periods=1,center=True)
  50. if fun == 'ptp':
  51. res = roll_data.max() - roll_data.min()
  52. elif fun == 'pct':
  53. res = (roll_data.max() - roll_data.min())/roll_data.min()
  54. drop_index_matrix = res>thre
  55. drop_index = drop_index_matrix.any(axis=1)
  56. self.drop_index = self.drop_index | drop_index
  57. self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
  58. return self
  59. def rm_outlier_rolling_mean(
  60. self,
  61. window :int = 10,
  62. thre :float = 0.02,
  63. include_cols :list = '__ALL__',
  64. include_by_re:bool = False,
  65. exclude_cols :list = None
  66. ):
  67. # 删除时序异常
  68. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  69. data = data.reset_index(drop=True)
  70. windows_mean = data.rolling(window=window,min_periods=1).mean()
  71. drop_index = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
  72. self.drop_index = drop_index | self.drop_index
  73. self._count_removed_data(index=drop_index,method='rm_outlier_mean')
  74. return self
  75. def rm_negative(
  76. self,
  77. keep_zero :bool = False,
  78. include_cols :list = '__ALL__',
  79. include_by_re:bool = False,
  80. exclude_cols :list = None
  81. ):
  82. # 删除负数
  83. data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
  84. if keep_zero is True:
  85. drop_index = (data<0).any(axis=1).values
  86. else:
  87. drop_index = (data<=0).any(axis=1).values
  88. self.drop_index = drop_index | self.drop_index
  89. self._count_removed_data(index=drop_index,method='rm_negative')
  90. return self
  91. def rm_rule(self,remove_rule:str):
  92. # 基于规则删除数据
  93. data = self.data.copy()
  94. drop_index = np.array(data.eval(remove_rule))
  95. self.drop_index = drop_index | self.drop_index
  96. self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
  97. return self
  98. def get_data(self,fill=None) -> pd.DataFrame:
  99. if fill is None:
  100. # 保留非删除数据
  101. result_data = self.raw_data.loc[~self.drop_index,:]
  102. else:
  103. # 填充非删除数据
  104. result_data = self.raw_data.copy()
  105. result_data.loc[self.drop_index,:] = fill
  106. if self.print_process:
  107. summary_dataframe(result_data,df_name='结果数据')
  108. return result_data
  109. def _get_data_by_cols(
  110. self,
  111. include_cols :list = '__ALL__',
  112. include_by_re:bool = False,
  113. exclude_cols :list = None,
  114. ) -> pd.DataFrame:
  115. data = self.data.copy()
  116. if include_by_re is True:
  117. if isinstance(include_cols,str):
  118. cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
  119. else:
  120. raise Exception('WRONG')
  121. elif include_by_re is False:
  122. if include_cols == '__ALL__':
  123. cols = data.columns
  124. elif isinstance(include_cols,str):
  125. cols = [include_cols]
  126. elif isinstance(include_cols,list):
  127. cols = data.loc[:,include_cols].columns
  128. else:
  129. raise Exception('WRONG')
  130. if exclude_cols is not None:
  131. cols = cols.difference(other=exclude_cols)
  132. return data.loc[:,cols]
  133. def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
  134. count = index.sum()
  135. pct = round(count / len(index) * 100,2)
  136. if self.print_process:
  137. print(f'remove {count}({pct}%) by {method}')
  138. if index_matrix is not None and var_name is not None:
  139. var_drop_count = np.sum(index_matrix,axis=0)
  140. for var,drop_count in zip(var_name,var_drop_count):
  141. if drop_count == 0:
  142. continue
  143. if self.print_process:
  144. print(f'{var}:{drop_count}')