main.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import os
  2. from datetime import datetime,timedelta
  3. from pathlib import Path
  4. import numpy as np
  5. import pandas as pd
  6. from .._utils.point_io import PointReader
  7. from .._utils.config_info import ConfigInfo
  8. from .._utils.data_summary import summary_dataframe
  9. from .cache_data import CacheData
  10. def main(set_time_start=None,set_time_end=None,config=None):
  11. config_info = ConfigInfo(config)
  12. int_time = config_info.get_property('int_time',default='M')
  13. from_cache = config_info.get_property('from_cache',default=False)
  14. clean_cache = config_info.get_property('clean_cache',default=False)
  15. na_behave = config_info.get_property('na_behave',default='Exception')
  16. output_id = config_info.get_io_id(io='out')
  17. time_start,time_end = get_time(config_info,set_time_start,set_time_end,int_time)
  18. point_ids = [point['point_id'] for point in config['_PORTS_OUT']]
  19. file_name = config.get('_CODE','DEFAULT')
  20. file_name = 'DEFAULT' if file_name is None else file_name # 组件config中的_CODE偶尔会出现能取到None的情况
  21. PATH = Path(f'/mnt/workflow_data/{file_name}_File.pkl')
  22. if int_time == 'S':
  23. url = 'http://basedataportal-svc:8080/data/get_points_real_value'
  24. else:
  25. url = config_info.get_property('url','http://basedataportal-svc:8080/data/getpointsdata')
  26. # 清除缓存数据
  27. if clean_cache:
  28. CacheData.clean_cache(PATH=PATH)
  29. data = get_data(point_ids,time_start,time_end,int_time,url,from_cache,PATH)
  30. result = get_result(data,point_ids,output_id,na_behave)
  31. return result[0] if len(result)==1 else result
  32. def parse_str_time(str_time):
  33. return datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
  34. def get_result(data,point_ids,output_id,na_behave):
  35. result = []
  36. if len(output_id) == 0:
  37. return [None]
  38. for col in point_ids:
  39. if col in data.columns:
  40. df = data.loc[:,[col]]
  41. else:
  42. df = pd.DataFrame({col:np.ones(shape=data.index)*np.nan},index=data.index)
  43. if not df.isna().any().any():
  44. result.append(df)
  45. elif na_behave == 'Exception':
  46. raise Exception(f'{col}未获取到对应数据')
  47. elif na_behave == 'Zero':
  48. df = df.fillna(0)
  49. result.append(df)
  50. else:
  51. raise Exception('WRONG')
  52. # 非优化回算任务打印数据
  53. if 'OPTIM_CALC_LOG_ID' not in os.environ:
  54. print(result)
  55. return result
  56. def get_data(
  57. points_id : list,
  58. time_start: datetime,
  59. time_end : datetime,
  60. int_time : str,
  61. url : str,
  62. from_cache: bool,
  63. PATH
  64. ) -> pd.DataFrame:
  65. cache_data = CacheData(PATH,points_id,time_start,time_end,int_time)
  66. if not from_cache:
  67. print('Data Source : DB (Set)')
  68. data = get_data_from_db(points_id,time_start,time_end,int_time,url)
  69. data = data.fillna(-9999)
  70. elif cache_data.is_missing_point():
  71. print('Data Source : DB (Missing Point)')
  72. data = get_data_from_db(points_id,time_start,time_end,int_time,url)
  73. data = data.fillna(-9999)
  74. elif cache_data.is_missing_time():
  75. dt_end,dt_start = cache_data.get_missing_dt()
  76. print(f'Data Source : DB+CACHE (Missing Time {dt_start}~{dt_end})')
  77. data_db = get_data_from_db(points_id,dt_start,dt_end,int_time,url)
  78. data_db = data_db.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
  79. data_cache = cache_data.need_cache_data
  80. data_cache = data_cache.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
  81. data = data_cache.copy()
  82. data.update(data_db.fillna(value=-9999))
  83. data = data.loc[lambda dt:~dt.index.duplicated(keep='last'),:].sort_index()
  84. else:
  85. print('Data Source : CACHE')
  86. data = cache_data.need_cache_data
  87. cache_data.update_cache(data)
  88. print(f'URL : {url}')
  89. print(f'Int : {int_time}')
  90. summary_dataframe(data,'点位加载数据')
  91. return data
  92. def get_data_from_db(
  93. points_id : list,
  94. time_start: datetime,
  95. time_end : datetime,
  96. int_time : str,
  97. url : str
  98. ) -> pd.DataFrame:
  99. point_reader = PointReader(point_ids=points_id,dt_begin=time_start,dt_end=time_end,url=url)
  100. if int_time in ['M']:
  101. data = point_reader.read_interval()
  102. elif int_time in ['H','D']:
  103. data = point_reader.read_int_interval(freq=int_time)
  104. elif int_time in ['S']:
  105. data = point_reader.read_current()
  106. else:
  107. raise Exception('WRONG PARAM')
  108. return data
  109. def get_time(
  110. config_info:ConfigInfo,
  111. set_time_start,
  112. set_time_end,
  113. int_time:str
  114. ) -> tuple:
  115. if 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
  116. # 任务计划获取当前时间
  117. NOW = parse_str_time(os.environ.get('JOB_SCHEDULE_CALC_TIME_NOW')) - timedelta(minutes=1)
  118. else:
  119. # 如果取当前分钟的数据,在该分钟的前几秒可能会取不到数据,因此将当前时间往前推一分钟
  120. NOW = datetime.now() - timedelta(minutes=1)
  121. # 获取组件设定的开始和结束时间
  122. if isinstance(set_time_start,datetime) and isinstance(set_time_end,datetime):
  123. time_type = '组件输入'
  124. config_time_end = set_time_end
  125. config_time_start = set_time_start
  126. else:
  127. time_type = '组件配置'
  128. config_time_end = config_info.get_property('time_end',None)
  129. config_time_end = parse_str_time(config_time_end) if config_time_end is not None else NOW
  130. time_end_shift = config_info.get_property('time_end_shift',0)
  131. time_end_shift = timedelta(minutes=time_end_shift)
  132. config_time_end -= time_end_shift
  133. if config_info.get_property('time_start1',None) is not None:
  134. config_time_start = config_info.get_property('time_start1')
  135. config_time_start = parse_str_time(config_time_start)
  136. elif config_info.get_property('time_start2',None) is not None:
  137. config_time_start = config_info.get_property('time_start2')
  138. config_time_start = config_time_end - timedelta(minutes=config_time_start)
  139. else:
  140. config_time_start = config_time_end
  141. # # 根据环境确定取数时间
  142. # 仿真环境
  143. if 'SIMULATOR_JOB_ID' in os.environ:
  144. time_type = '仿真环境:' + time_type
  145. time_end = NOW
  146. time_start = time_end
  147. # 监控环境
  148. elif 'MONITOR_TYPE' in os.environ:
  149. time_type = '监控环境:' + time_type
  150. monitor_begin = os.environ.get('MONITOR_BEGIN', None)
  151. monitor_end = os.environ.get('MONITOR_END', None)
  152. time_end = parse_str_time(monitor_end)
  153. time_start = parse_str_time(monitor_begin) - timedelta(hours=1)
  154. # 优化回算
  155. elif 'OPTIM_CALC_LOG_ID' in os.environ:
  156. time_type = '优化回算:' + time_type
  157. time_end = parse_str_time(os.environ['OPTIM_TIME'])
  158. time_start = time_end - timedelta(minutes=10)
  159. print(f"os.environ['OPTIM_TIME']: {os.environ['OPTIM_TIME']}")
  160. # 实时优化环境
  161. elif 'OPTIM_JOB_ID' in os.environ:
  162. time_type = '实时优化:' + time_type
  163. time_end = config_time_end
  164. time_start = config_time_start
  165. elif 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
  166. time_type = '任务计划:' + time_type
  167. time_end = config_time_end
  168. time_start = config_time_start
  169. # 调试环境 WORKFLOW_DEBUG
  170. else:
  171. time_type = '调试环境:' + time_type
  172. time_end = config_time_end
  173. time_start = config_time_start
  174. if int_time == 'H':
  175. time_start = time_start.replace(minute=0)
  176. time_end = time_end.replace(minute=0)
  177. elif int_time == 'D':
  178. time_start = time_start.replace(hour=0,minute=0)
  179. time_end = time_end.replace(hour=0,minute=0)
  180. elif int_time == 'S':
  181. time_start = datetime.now()
  182. time_end = datetime.now()
  183. time_start = time_start.replace(second=0,microsecond=0)
  184. time_end = time_end.replace(second=0,microsecond=0)
  185. print(f'time_type : {time_type}')
  186. print(f'time_start : {time_start}')
  187. print(f'time_end : {time_end}')
  188. return time_start,time_end