import os import sys import io import re from pathlib import Path import shutil from typing import Union import numpy as np import pandas as pd import psychrolib psychrolib.SetUnitSystem(psychrolib.SI) get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum) get_Hr = np.vectorize(psychrolib.GetHumRatioFromTDewPoint) get_RH = np.vectorize(psychrolib.GetRelHumFromTDewPoint) from .._data.main import get_data class DataLoader: def __init__(self,path,start_time,end_time,print_process=True): self.path = path self.start_time = start_time.replace(second=0,microsecond=0) self.end_time = end_time.replace(second=0,microsecond=0) self.int_time = 'min' self.date_range = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time) self.print_process = print_process def download_equp_data( self, equp_name : str, point : dict, url : str, clean_cache : bool, rm_point_name: list = None ): equp_path = os.path.join(self.path,equp_name) if clean_cache and os.path.exists(equp_path): shutil.rmtree(equp_path) if not os.path.exists(equp_path): os.makedirs(equp_path) all_download_file = {} for point_name,point_class in point.items(): # 剔除一些point name if isinstance(rm_point_name,list) and point_name in rm_point_name: continue point_path = os.path.join(equp_path,f'{point_name}.pkl') point_class = str(point_class) if point_class in ['/']: continue # 纯数字 elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)): point_value = float(point_class) data = pd.DataFrame({point_name:point_value},index=self.date_range) pd.to_pickle(data,point_path) all_download_file[point_name] = point_path # 公式:干球温度和相对湿度计算露点 elif bool(re.match(r'^Dew\(.*?\)$',point_class)): Tdb, RH = point_class.strip('Dew(').strip(')').split(',') points_id = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}'] Tdb_name = point_name.replace('_D','_T') RH_name = point_name.replace('_D','_R') points_path = [ os.path.join(equp_path,f'{Tdb_name}.pkl'), os.path.join(equp_path,f'{RH_name}.pkl'), ] all_download_file[Tdb_name] = points_path[0] all_download_file[RH_name] = points_path[1] for point_id,point_path in zip(points_id,points_path): run_function_with_print_control( get_data, self.print_process, points_id = [point_id], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) # 非该设备的点位 elif bool(re.match(r'^\[.*\]$',point_class)): run_function_with_print_control( get_data, self.print_process, points_id = [point_class.replace('[','').replace(']','')], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) all_download_file[point_name] = point_path # 正常点位 else: run_function_with_print_control( get_data, self.print_process, points_id = [f'{equp_name}_{point_class}'], time_start= self.start_time, time_end = self.end_time, int_time = 'M', url = url, from_cache= True, PATH = Path(point_path) ) all_download_file[point_name] = point_path # 补齐未指定的数据 slice_time = slice(self.start_time,self.end_time) for point_name in all_download_file: file = f'{point_name}.pkl' # 通过干球温度和相对湿度计算露点 exist_T = '_T' in file exist_R = point_name.replace('_T','_R') in all_download_file exist_D = point_name.replace('_T','_D') in all_download_file if exist_T and exist_R and not exist_D: Tdb = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0] RH = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).loc[slice_time].iloc[:,0] Dew = pd.DataFrame({point_name.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=Tdb.index) pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D'))) if exist_T and exist_D and not exist_R: Tdb = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0] Dew = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_D'))).loc[slice_time].iloc[:,0] Dew = np.where(Dew>Tdb,Tdb,Dew) RH = pd.DataFrame({point_name.replace('_T','_R'):get_RH(Tdb,Dew)},index=Tdb.index) pd.to_pickle(RH,os.path.join(equp_path,file.replace('_T','_R'))) all_file_path = os.listdir(equp_path) for file in all_file_path: # 通过露点计算绝对湿度 if '_D' in file: Dew = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0] Hr = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=Dew.index) pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H'))) return self def get_equp_data(self,equp_name:str) -> pd.DataFrame: equp_path = os.path.join(self.path,equp_name) all_file_path = os.listdir(equp_path) all_data = [] for file in all_file_path: if '.pkl' not in file: continue data = pd.read_pickle(os.path.join(equp_path,file)) if data.shape[1] != 1: print(data) raise Exception(f'data shape error:{equp_name}') data = ( data .set_axis([file.replace('.pkl','')],axis=1) .loc[self.start_time:self.end_time,:] ) all_data.append(data) if len(all_data) == 0: raise Exception(f'没有找到指定数据{all_file_path}') all_data = pd.concat(all_data,axis=1) return all_data import sys import io def run_function_with_print_control(func, enable_print=True, *args, **kwargs): """ 运行函数并控制是否打印其内部的 print 语句 :param func: 要运行的函数 :param enable_print: 是否打印输出 :param args: 传递给 func 的位置参数 :param kwargs: 传递给 func 的关键字参数 :return: 函数的返回值 """ if enable_print: return func(*args, **kwargs) # 正常执行,打印输出 else: # 临时替换 sys.stdout 以抑制输出 old_stdout = sys.stdout sys.stdout = io.StringIO() try: result = func(*args, **kwargs) finally: sys.stdout = old_stdout # 恢复 stdout return result