data_loader.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import os
  2. import sys
  3. import io
  4. import re
  5. from pathlib import Path
  6. import shutil
  7. from typing import Union
  8. import numpy as np
  9. import pandas as pd
  10. import psychrolib
  11. psychrolib.SetUnitSystem(psychrolib.SI)
  12. get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum)
  13. get_Hr = np.vectorize(psychrolib.GetHumRatioFromTDewPoint)
  14. get_RH = np.vectorize(psychrolib.GetRelHumFromTDewPoint)
  15. from .._data.main import get_data
  16. class DataLoader:
  17. def __init__(self,path,start_time,end_time,print_process=True):
  18. self.path = path
  19. self.start_time = start_time
  20. self.end_time = end_time
  21. self.int_time = 'min'
  22. self.date_range = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time)
  23. self.print_process = print_process
  24. def download_equp_data(
  25. self,
  26. equp_name : str,
  27. point : dict,
  28. url : str,
  29. clean_cache : bool,
  30. rm_point_name: list = None
  31. ):
  32. equp_path = os.path.join(self.path,equp_name)
  33. if clean_cache and os.path.exists(equp_path):
  34. shutil.rmtree(equp_path)
  35. if not os.path.exists(equp_path):
  36. os.makedirs(equp_path)
  37. for point_name,point_class in point.items():
  38. # 剔除一些point name
  39. if isinstance(rm_point_name,list) and point_name in rm_point_name:
  40. continue
  41. point_path = os.path.join(equp_path,f'{point_name}.pkl')
  42. point_class = str(point_class)
  43. if point_class in ['/']:
  44. continue
  45. # 纯数字
  46. elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)):
  47. point_value = float(point_class)
  48. data = pd.DataFrame({point_name:point_value},index=self.date_range)
  49. pd.to_pickle(data,point_path)
  50. # 公式:干球温度和相对湿度计算露点
  51. elif bool(re.match(r'^Dew\(.*?\)$',point_class)):
  52. Tdb, RH = point_class.strip('Dew(').strip(')').split(',')
  53. points_id = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}']
  54. Tdb_name = point_name.replace('_D','_T')
  55. RH_name = point_name.replace('_D','_R')
  56. points_path = [
  57. os.path.join(equp_path,f'{Tdb_name}.pkl'),
  58. os.path.join(equp_path,f'{RH_name}.pkl'),
  59. ]
  60. for point_id,point_path in zip(points_id,points_path):
  61. get_data(
  62. points_id = [point_id],
  63. time_start= self.start_time,
  64. time_end = self.end_time,
  65. int_time = 'M',
  66. url = url,
  67. from_cache= True,
  68. PATH = Path(point_path)
  69. )
  70. # 非该设备的点位
  71. elif bool(re.match(r'^\[.*\]$',point_class)):
  72. get_data(
  73. points_id = [point_class.replace('[','').replace(']','')],
  74. time_start= self.start_time,
  75. time_end = self.end_time,
  76. int_time = 'M',
  77. url = url,
  78. from_cache= True,
  79. PATH = Path(point_path)
  80. )
  81. # 正常点位
  82. else:
  83. get_data(
  84. points_id = [f'{equp_name}_{point_class}'],
  85. time_start= self.start_time,
  86. time_end = self.end_time,
  87. int_time = 'M',
  88. url = url,
  89. from_cache= True,
  90. PATH = Path(point_path)
  91. )
  92. # 补齐未指定的数据
  93. all_file_path = os.listdir(equp_path)
  94. for file in all_file_path:
  95. # 通过干球温度和相对湿度计算露点
  96. exist_T = '_T' in file
  97. exist_R = file.replace('_T','_R') in all_file_path
  98. exist_D = file.replace('_T','_D') in all_file_path
  99. if exist_T and exist_R and not exist_D:
  100. Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
  101. RH = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).iloc[:,0].values
  102. Dew = pd.DataFrame({file.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=self.date_range)
  103. pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D')))
  104. if exist_T and exist_D and not exist_R:
  105. Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
  106. Dew = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_D'))).iloc[:,0].values
  107. RH = pd.DataFrame({file.replace('_T','_R'):get_RH(Tdb,Dew)},index=self.date_range)
  108. pd.to_pickle(RH,os.path.join(equp_path,file.replace('_T','_R')))
  109. all_file_path = os.listdir(equp_path)
  110. for file in all_file_path:
  111. # 通过露点计算绝对湿度
  112. exist_D = '_D' in file
  113. exist_H = file.replace('_D','_H') in all_file_path
  114. if exist_D and not exist_H:
  115. Dew = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
  116. Hr = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=self.date_range)
  117. pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H')))
  118. return self
  119. def get_equp_data(self,equp_name:str) -> pd.DataFrame:
  120. equp_path = os.path.join(self.path,equp_name)
  121. all_file_path = os.listdir(equp_path)
  122. all_data = []
  123. for file in all_file_path:
  124. if '.pkl' not in file:
  125. continue
  126. data = pd.read_pickle(os.path.join(equp_path,file))
  127. if data.shape[1] != 1:
  128. print(data)
  129. raise Exception(f'data shape error:{equp_name}')
  130. data = (
  131. data
  132. .set_axis([file.replace('.pkl','')],axis=1)
  133. .loc[self.start_time:self.end_time,:]
  134. )
  135. all_data.append(data)
  136. if len(all_data) == 0:
  137. raise Exception(f'没有找到指定数据{all_file_path}')
  138. all_data = pd.concat(all_data,axis=1)
  139. return all_data