| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import os
- import sys
- import io
- import re
- from pathlib import Path
- import shutil
- from typing import Union
- import numpy as np
- import pandas as pd
- import psychrolib
- psychrolib.SetUnitSystem(psychrolib.SI)
- get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum)
- get_Hr = np.vectorize(psychrolib.GetHumRatioFromTDewPoint)
- get_RH = np.vectorize(psychrolib.GetRelHumFromTDewPoint)
- from .._data.main import get_data
- class DataLoader:
- def __init__(self,path,start_time,end_time,print_process=True):
- self.path = path
- self.start_time = start_time
- self.end_time = end_time
- self.int_time = 'min'
- self.date_range = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time)
- self.print_process = print_process
-
- def download_equp_data(
- self,
- equp_name : str,
- point : dict,
- url : str,
- clean_cache : bool,
- rm_point_name: list = None
- ):
-
- equp_path = os.path.join(self.path,equp_name)
- if clean_cache and os.path.exists(equp_path):
- shutil.rmtree(equp_path)
- if not os.path.exists(equp_path):
- os.makedirs(equp_path)
-
- for point_name,point_class in point.items():
-
- # 剔除一些point name
- if isinstance(rm_point_name,list) and point_name in rm_point_name:
- continue
-
- point_path = os.path.join(equp_path,f'{point_name}.pkl')
- point_class = str(point_class)
-
- if point_class in ['/']:
- continue
-
- # 纯数字
- elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)):
- point_value = float(point_class)
- data = pd.DataFrame({point_name:point_value},index=self.date_range)
- pd.to_pickle(data,point_path)
-
- # 公式:干球温度和相对湿度计算露点
- elif bool(re.match(r'^Dew\(.*?\)$',point_class)):
- Tdb, RH = point_class.strip('Dew(').strip(')').split(',')
- points_id = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}']
- Tdb_name = point_name.replace('_D','_T')
- RH_name = point_name.replace('_D','_R')
- points_path = [
- os.path.join(equp_path,f'{Tdb_name}.pkl'),
- os.path.join(equp_path,f'{RH_name}.pkl'),
- ]
- for point_id,point_path in zip(points_id,points_path):
- get_data(
- points_id = [point_id],
- time_start= self.start_time,
- time_end = self.end_time,
- int_time = 'M',
- url = url,
- from_cache= True,
- PATH = Path(point_path)
- )
-
- # 非该设备的点位
- elif bool(re.match(r'^\[.*\]$',point_class)):
- get_data(
- points_id = [point_class.replace('[','').replace(']','')],
- time_start= self.start_time,
- time_end = self.end_time,
- int_time = 'M',
- url = url,
- from_cache= True,
- PATH = Path(point_path)
- )
-
- # 正常点位
- else:
- get_data(
- points_id = [f'{equp_name}_{point_class}'],
- time_start= self.start_time,
- time_end = self.end_time,
- int_time = 'M',
- url = url,
- from_cache= True,
- PATH = Path(point_path)
- )
-
- # 补齐未指定的数据
- all_file_path = os.listdir(equp_path)
- for file in all_file_path:
- # 通过干球温度和相对湿度计算露点
- exist_T = '_T' in file
- exist_R = file.replace('_T','_R') in all_file_path
- exist_D = file.replace('_T','_D') in all_file_path
- if exist_T and exist_R and not exist_D:
- Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
- RH = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).iloc[:,0].values
- Dew = pd.DataFrame({file.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=self.date_range)
- pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D')))
- if exist_T and exist_D and not exist_R:
- Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
- Dew = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_D'))).iloc[:,0].values
- Dew = np.where(Dew>Tdb,Tdb,Dew)
- RH = pd.DataFrame({file.replace('_T','_R'):get_RH(Tdb,Dew)},index=self.date_range)
- pd.to_pickle(RH,os.path.join(equp_path,file.replace('_T','_R')))
-
- all_file_path = os.listdir(equp_path)
- for file in all_file_path:
- # 通过露点计算绝对湿度
- exist_D = '_D' in file
- exist_H = file.replace('_D','_H') in all_file_path
- if exist_D and not exist_H:
- Dew = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
- Hr = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=self.date_range)
- pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H')))
- return self
-
-
- def get_equp_data(self,equp_name:str) -> pd.DataFrame:
- equp_path = os.path.join(self.path,equp_name)
- all_file_path = os.listdir(equp_path)
- all_data = []
- for file in all_file_path:
- if '.pkl' not in file:
- continue
- data = pd.read_pickle(os.path.join(equp_path,file))
- if data.shape[1] != 1:
- print(data)
- raise Exception(f'data shape error:{equp_name}')
- data = (
- data
- .set_axis([file.replace('.pkl','')],axis=1)
- .loc[self.start_time:self.end_time,:]
- )
- all_data.append(data)
- if len(all_data) == 0:
- raise Exception(f'没有找到指定数据{all_file_path}')
- all_data = pd.concat(all_data,axis=1)
- return all_data
|