data_loader.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import os
  2. import sys
  3. import io
  4. import re
  5. from pathlib import Path
  6. import shutil
  7. from typing import Union
  8. import numpy as np
  9. import pandas as pd
  10. import psychrolib
  11. psychrolib.SetUnitSystem(psychrolib.SI)
  12. get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum)
  13. get_Hr = np.vectorize(psychrolib.GetHumRatioFromTDewPoint)
  14. get_RH = np.vectorize(psychrolib.GetRelHumFromTDewPoint)
  15. from .._data.main import get_data
  16. class DataLoader:
  17. def __init__(self,path,start_time,end_time,print_process=True):
  18. self.path = path
  19. self.start_time = start_time.replace(second=0,microsecond=0)
  20. self.end_time = end_time.replace(second=0,microsecond=0)
  21. self.int_time = 'min'
  22. self.date_range = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time)
  23. self.print_process = print_process
  24. def download_equp_data(
  25. self,
  26. equp_name : str,
  27. point : dict,
  28. url : str,
  29. clean_cache : bool,
  30. rm_point_name: list = None
  31. ):
  32. equp_path = os.path.join(self.path,equp_name)
  33. if clean_cache and os.path.exists(equp_path):
  34. shutil.rmtree(equp_path)
  35. if not os.path.exists(equp_path):
  36. os.makedirs(equp_path)
  37. all_download_file = {}
  38. for point_name,point_class in point.items():
  39. # 剔除一些point name
  40. if isinstance(rm_point_name,list) and point_name in rm_point_name:
  41. continue
  42. point_path = os.path.join(equp_path,f'{point_name}.pkl')
  43. point_class = str(point_class)
  44. if point_class in ['/']:
  45. continue
  46. # 纯数字
  47. elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)):
  48. point_value = float(point_class)
  49. data = pd.DataFrame({point_name:point_value},index=self.date_range)
  50. pd.to_pickle(data,point_path)
  51. all_download_file[point_name] = point_path
  52. # 公式:干球温度和相对湿度计算露点
  53. elif bool(re.match(r'^Dew\(.*?\)$',point_class)):
  54. Tdb, RH = point_class.strip('Dew(').strip(')').split(',')
  55. points_id = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}']
  56. Tdb_name = point_name.replace('_D','_T')
  57. RH_name = point_name.replace('_D','_R')
  58. points_path = [
  59. os.path.join(equp_path,f'{Tdb_name}.pkl'),
  60. os.path.join(equp_path,f'{RH_name}.pkl'),
  61. ]
  62. all_download_file[Tdb_name] = points_path[0]
  63. all_download_file[RH_name] = points_path[1]
  64. for point_id,point_path in zip(points_id,points_path):
  65. run_function_with_print_control(
  66. get_data,
  67. self.print_process,
  68. points_id = [point_id],
  69. time_start= self.start_time,
  70. time_end = self.end_time,
  71. int_time = 'M',
  72. url = url,
  73. from_cache= True,
  74. PATH = Path(point_path)
  75. )
  76. # 非该设备的点位
  77. elif bool(re.match(r'^\[.*\]$',point_class)):
  78. run_function_with_print_control(
  79. get_data,
  80. self.print_process,
  81. points_id = [point_class.replace('[','').replace(']','')],
  82. time_start= self.start_time,
  83. time_end = self.end_time,
  84. int_time = 'M',
  85. url = url,
  86. from_cache= True,
  87. PATH = Path(point_path)
  88. )
  89. all_download_file[point_name] = point_path
  90. # 正常点位
  91. else:
  92. run_function_with_print_control(
  93. get_data,
  94. self.print_process,
  95. points_id = [f'{equp_name}_{point_class}'],
  96. time_start= self.start_time,
  97. time_end = self.end_time,
  98. int_time = 'M',
  99. url = url,
  100. from_cache= True,
  101. PATH = Path(point_path)
  102. )
  103. all_download_file[point_name] = point_path
  104. # 补齐未指定的数据
  105. slice_time = slice(self.start_time,self.end_time)
  106. for point_name in all_download_file:
  107. file = f'{point_name}.pkl'
  108. # 通过干球温度和相对湿度计算露点
  109. exist_T = '_T' in file
  110. exist_R = point_name.replace('_T','_R') in all_download_file
  111. exist_D = point_name.replace('_T','_D') in all_download_file
  112. if exist_T and exist_R and not exist_D:
  113. Tdb = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0]
  114. RH = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).loc[slice_time].iloc[:,0]
  115. Dew = pd.DataFrame({point_name.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=Tdb.index)
  116. pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D')))
  117. if exist_T and exist_D and not exist_R:
  118. Tdb = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0]
  119. Dew = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_D'))).loc[slice_time].iloc[:,0]
  120. Dew = np.where(Dew>Tdb,Tdb,Dew)
  121. RH = pd.DataFrame({point_name.replace('_T','_R'):get_RH(Tdb,Dew)},index=Tdb.index)
  122. pd.to_pickle(RH,os.path.join(equp_path,file.replace('_T','_R')))
  123. all_file_path = os.listdir(equp_path)
  124. for file in all_file_path:
  125. # 通过露点计算绝对湿度
  126. if '_D' in file:
  127. Dew = pd.read_pickle(os.path.join(equp_path,file)).loc[slice_time].iloc[:,0]
  128. Hr = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=Dew.index)
  129. pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H')))
  130. return self
  131. def get_equp_data(self,equp_name:str) -> pd.DataFrame:
  132. equp_path = os.path.join(self.path,equp_name)
  133. all_file_path = os.listdir(equp_path)
  134. all_data = []
  135. for file in all_file_path:
  136. if '.pkl' not in file:
  137. continue
  138. data = pd.read_pickle(os.path.join(equp_path,file))
  139. if data.shape[1] != 1:
  140. print(data)
  141. raise Exception(f'data shape error:{equp_name}')
  142. data = (
  143. data
  144. .set_axis([file.replace('.pkl','')],axis=1)
  145. .loc[self.start_time:self.end_time,:]
  146. )
  147. all_data.append(data)
  148. if len(all_data) == 0:
  149. raise Exception(f'没有找到指定数据{all_file_path}')
  150. all_data = pd.concat(all_data,axis=1)
  151. return all_data
  152. import sys
  153. import io
  154. def run_function_with_print_control(func, enable_print=True, *args, **kwargs):
  155. """
  156. 运行函数并控制是否打印其内部的 print 语句
  157. :param func: 要运行的函数
  158. :param enable_print: 是否打印输出
  159. :param args: 传递给 func 的位置参数
  160. :param kwargs: 传递给 func 的关键字参数
  161. :return: 函数的返回值
  162. """
  163. if enable_print:
  164. return func(*args, **kwargs) # 正常执行,打印输出
  165. else:
  166. # 临时替换 sys.stdout 以抑制输出
  167. old_stdout = sys.stdout
  168. sys.stdout = io.StringIO()
  169. try:
  170. result = func(*args, **kwargs)
  171. finally:
  172. sys.stdout = old_stdout # 恢复 stdout
  173. return result