import os import re from pathlib import Path import shutil from typing import Union import numpy as np import pandas as pd import psychrolib psychrolib.SetUnitSystem(psychrolib.SI) get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum) get_Hr = np.vectorize(psychrolib.GetHumRatioFromTDewPoint) from .._data.main import get_data class DataLoader: def __init__(self,path,start_time,end_time): self.path = path self.start_time = start_time self.end_time = end_time self.int_time = 'min' self.date_range = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time) def dowload_equp_data( self, equp_name : str, point : dict, url : str, clean_cache: bool ): equp_path = os.path.join(self.path,equp_name) if clean_cache and os.path.exists(equp_path): shutil.rmtree(equp_path) if not os.path.exists(equp_path): os.makedirs(equp_path) for point_name,point_class in point.items(): point_path = os.path.join(equp_path,f'{point_name}.pkl') point_class = str(point_class) if point_class in ['/']: continue # 纯数字 elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)): point_value = float(point_class) data = pd.DataFrame({point_name:point_value},index=self.date_range) pd.to_pickle(data,point_path) # 公式:干球温度和相对湿度计算露点 elif bool(re.match(r'^Dew\(.*?\)$',point_class)): Tdb, RH = point_class.strip('Dew(').strip(')').split(',') points_id = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}'] points_path = [point_path.replace('_D','_T'),point_path.replace('_D','_R')] for point_id,point_path in zip(points_id,points_path): get_data( points_id = [point_id], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) # 非该设备的点位 elif bool(re.match(r'^\[.*\]$',point_class)): get_data( points_id = [point_class.replace('[','').replace(']','')], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) # 正常点位 else: get_data( points_id = [f'{equp_name}_{point_class}'], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) # 补齐未指定的数据 all_file_path = os.listdir(equp_path) for file in all_file_path: # 通过干球温度和相对湿度计算露点 if '_T' in file and file.replace('_T','_R') in all_file_path: Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values RH = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).iloc[:,0].values Dew = pd.DataFrame({file.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=self.date_range) pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D'))) # 通过露点计算绝对湿度 if '_D' in file: Dew = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values Hr = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=self.date_range) pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H'))) return self def get_equp_data(self,equp_name:str) -> pd.DataFrame: equp_path = os.path.join(self.path,equp_name) all_file_path = os.listdir(equp_path) all_data = [] for file in all_file_path: if '.pkl' not in file: continue data_path = os.path.join(equp_path,file) data = ( pd.read_pickle(data_path) .set_axis([file.replace('.pkl','')],axis=1) .loc[self.start_time:self.end_time,:] ) all_data.append(data) if len(all_data) == 0: raise Exception(f'没有找到指定数据{all_file_path}') all_data = pd.concat(all_data,axis=1) return all_data