cache_data.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from datetime import datetime
  2. from typing import Union
  3. import os
  4. import pandas as pd
  5. class CacheData:
  6. def __init__(
  7. self,
  8. PATH,
  9. point_ids : list,
  10. time_start: datetime,
  11. time_end : datetime,
  12. int_time : str,
  13. ) -> None:
  14. self.PATH = PATH
  15. if self.PATH.exists():
  16. self.all_cache_data:pd.DataFrame = pd.read_pickle(self.PATH)
  17. else:
  18. self.all_cache_data = None
  19. self.point_ids = point_ids
  20. self.time_start = time_start
  21. self.time_end = time_end
  22. self.int_time = 'min' if int_time == 'M' else int_time
  23. self.dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
  24. self.need_cache_data = self._read_cache(drop_na=False)
  25. self.exist_cache_data = self._read_cache(drop_na=True)
  26. def _read_cache(self,drop_na:bool) -> Union[pd.DataFrame,None]:
  27. # 包含all_cache_data中的所有列
  28. if self.all_cache_data is None or self.all_cache_data.shape[0] == 1:
  29. return None
  30. dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
  31. dt_range_index = pd.Index(dt_range,name='ts')
  32. cache_data = self.all_cache_data.reindex(index=dt_range_index)
  33. if drop_na:
  34. cache_data = cache_data.dropna()
  35. return cache_data
  36. def is_missing_point(self) -> bool:
  37. if self.all_cache_data is None:
  38. return True
  39. missing_point = pd.Index(self.point_ids).difference(self.exist_cache_data.columns)
  40. if len(missing_point) > 0:
  41. return True
  42. else:
  43. return False
  44. def is_missing_time(self) -> bool:
  45. if self.all_cache_data is None:
  46. return True
  47. missing_time = self.dt_range.difference(self.exist_cache_data.index)
  48. if len(missing_time) > 0:
  49. return True
  50. else:
  51. return False
  52. def get_missing_dt(self) -> tuple:
  53. if not (self.is_missing_time() and (not self.is_missing_point())):
  54. return None
  55. dt_range = (
  56. self.need_cache_data
  57. .loc[:,self.point_ids]
  58. .loc[lambda dt:dt.isna().all(axis=1),:]
  59. .index
  60. )
  61. dt_max = dt_range.max().to_pydatetime()
  62. dt_min = dt_range.min().to_pydatetime()
  63. return dt_max,dt_min
  64. def update_cache(self,new_data:pd.DataFrame) -> None:
  65. if self.all_cache_data is not None:
  66. old_total_cache_data = self.all_cache_data.copy(deep=True)
  67. new_index = old_total_cache_data.index.append(new_data.index).drop_duplicates()
  68. new_columns = old_total_cache_data.columns.append(new_data.columns).drop_duplicates()
  69. new_total_cache_data = old_total_cache_data.reindex(index=new_index,columns=new_columns).sort_index()
  70. new_total_cache_data.update(new_data)
  71. pd.to_pickle(new_total_cache_data,self.PATH)
  72. else:
  73. pd.to_pickle(new_data,self.PATH)
  74. @classmethod
  75. def clean_cache(self,PATH) -> None:
  76. try:
  77. os.remove(PATH)
  78. except:
  79. pass