| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from datetime import datetime
- from typing import Union
- import os
- import pandas as pd
-
- class CacheData:
-
- def __init__(
- self,
- PATH,
- point_ids : list,
- time_start: datetime,
- time_end : datetime,
- int_time : str,
- ) -> None:
-
- self.PATH = PATH
- if self.PATH.exists():
- self.all_cache_data:pd.DataFrame = pd.read_pickle(self.PATH)
- else:
- self.all_cache_data = None
-
- self.point_ids = point_ids
- self.time_start = time_start
- self.time_end = time_end
- self.int_time = 'min' if int_time == 'M' else int_time
- self.dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
-
- self.need_cache_data = self._read_cache(drop_na=False)
- self.exist_cache_data = self._read_cache(drop_na=True)
-
- def _read_cache(self,drop_na:bool) -> Union[pd.DataFrame,None]:
- # 包含all_cache_data中的所有列
- if self.all_cache_data is None or self.all_cache_data.shape[0] == 1:
- return None
-
- dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
- dt_range_index = pd.Index(dt_range,name='ts')
- cache_data = self.all_cache_data.reindex(index=dt_range_index)
-
- if drop_na:
- cache_data = cache_data.dropna()
-
- return cache_data
-
- def is_missing_point(self) -> bool:
- if self.all_cache_data is None:
- return True
-
- missing_point = pd.Index(self.point_ids).difference(self.exist_cache_data.columns)
- if len(missing_point) > 0:
- return True
- else:
- return False
-
- def is_missing_time(self) -> bool:
- if self.all_cache_data is None:
- return True
- missing_time = self.dt_range.difference(self.exist_cache_data.index)
- if len(missing_time) > 0:
- return True
- else:
- return False
-
- def get_missing_dt(self) -> tuple:
- if not (self.is_missing_time() and (not self.is_missing_point())):
- return None
-
- dt_range = (
- self.need_cache_data
- .loc[:,self.point_ids]
- .loc[lambda dt:dt.isna().all(axis=1),:]
- .index
- )
- dt_max = dt_range.max().to_pydatetime()
- dt_min = dt_range.min().to_pydatetime()
-
- return dt_max,dt_min
-
- def update_cache(self,new_data:pd.DataFrame) -> None:
-
- if self.all_cache_data is not None:
- old_total_cache_data = self.all_cache_data.copy(deep=True)
- new_index = old_total_cache_data.index.append(new_data.index).drop_duplicates()
- new_columns = old_total_cache_data.columns.append(new_data.columns).drop_duplicates()
- new_total_cache_data = old_total_cache_data.reindex(index=new_index,columns=new_columns).sort_index()
- new_total_cache_data.update(new_data)
- pd.to_pickle(new_total_cache_data,self.PATH)
- else:
- pd.to_pickle(new_data,self.PATH)
-
- @classmethod
- def clean_cache(self,PATH) -> None:
- try:
- os.remove(PATH)
- except:
- pass
|