from datetime import datetime from typing import Union import os import pandas as pd class CacheData: def __init__( self, PATH, point_ids : list, time_start: datetime, time_end : datetime, int_time : str, ) -> None: self.PATH = PATH if self.PATH.exists(): self.all_cache_data:pd.DataFrame = pd.read_pickle(self.PATH) else: self.all_cache_data = None self.point_ids = point_ids self.time_start = time_start self.time_end = time_end self.int_time = 'min' if int_time == 'M' else int_time self.dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time) self.need_cache_data = self._read_cache(drop_na=False) self.exist_cache_data = self._read_cache(drop_na=True) def _read_cache(self,drop_na:bool) -> Union[pd.DataFrame,None]: # 包含all_cache_data中的所有列 if self.all_cache_data is None or self.all_cache_data.shape[0] == 1: return None dt_range = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time) dt_range_index = pd.Index(dt_range,name='ts') cache_data = self.all_cache_data.reindex(index=dt_range_index) if drop_na: cache_data = cache_data.dropna() return cache_data def is_missing_point(self) -> bool: if self.all_cache_data is None: return True missing_point = pd.Index(self.point_ids).difference(self.exist_cache_data.columns) if len(missing_point) > 0: return True else: return False def is_missing_time(self) -> bool: if self.all_cache_data is None: return True missing_time = self.dt_range.difference(self.exist_cache_data.index) if len(missing_time) > 0: return True else: return False def get_missing_dt(self) -> tuple: if not (self.is_missing_time() and (not self.is_missing_point())): return None dt_range = ( self.need_cache_data .loc[:,self.point_ids] .loc[lambda dt:dt.isna().all(axis=1),:] .index ) dt_max = dt_range.max().to_pydatetime() dt_min = dt_range.min().to_pydatetime() return dt_max,dt_min def update_cache(self,new_data:pd.DataFrame) -> None: if self.all_cache_data is not None: old_total_cache_data = self.all_cache_data.copy(deep=True) new_index = old_total_cache_data.index.append(new_data.index).drop_duplicates() new_columns = old_total_cache_data.columns.append(new_data.columns).drop_duplicates() new_total_cache_data = old_total_cache_data.reindex(index=new_index,columns=new_columns).sort_index() new_total_cache_data.update(new_data) pd.to_pickle(new_total_cache_data,self.PATH) else: pd.to_pickle(new_data,self.PATH) @classmethod def clean_cache(self,PATH) -> None: try: os.remove(PATH) except: pass