monitor.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import os
  2. from datetime import datetime,timedelta
  3. import pandas as pd
  4. from workflow_utils import send_back_monitor_data
  5. from workflow_utils import get_model_version_file_v2
  6. from .config_reader import ConfigReader
  7. from ...tools.data_loader import DataLoader
  8. from ..._utils.data_summary import print_dataframe,summary_dataframe
  9. from ...model._base._update_utils import update
  10. from ..DHU.optimize import load_model
  11. NOW = datetime.now().replace(minute=0,second=0,microsecond=0)
  12. PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/')
  13. MODEL_FUNC_PATH = f'{PATH}/model_func.py'
  14. MODEL_FILE_PATH = f'./model.pkl'
  15. def monitor(*inputs,config=None):
  16. config = {} if config is None else config
  17. if '__LOCAL' in config.keys():
  18. config_reader_path = config['__LOCAL']
  19. data_URL = config['__URL']
  20. else:
  21. config_reader_path = '/mnt/workflow_data'
  22. data_URL = 'http://basedataportal-svc:8080/data/getpointsdata'
  23. config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx')
  24. for each_eaup_name,each_eaup_name_short in zip(
  25. config_reader.all_equp_names,
  26. config_reader.all_equp_names_short
  27. ):
  28. try:
  29. each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str')
  30. data = load_data(each_eaup_name,config_reader,config_reader_path,data_URL)
  31. model = load_model(each_eaup_name,each_equp_type,config_reader,config_reader_path,False)
  32. if each_equp_type in ['DHU_A','DHU_B']:
  33. model.find_F_air_val_rw(data,data,plot=False)
  34. predict = model.predict_system(data)
  35. data.loc[data.State.values==0,:] = 0
  36. need_init = config_reader.get_app_info(each_eaup_name,'模型监控','初始化监控','bool')
  37. if need_init:
  38. init_monitor(each_eaup_name,config_reader,predict,model)
  39. _,model_version_id = get_model_version_file_v2(
  40. model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','int'),
  41. filename = 'model_func.py'
  42. )
  43. push_list = []
  44. for monitor_point_name in config_reader.get_equp_point(each_eaup_name,'B'):
  45. if monitor_point_name not in predict.columns:
  46. continue
  47. pred_value = predict.loc[:,monitor_point_name].values
  48. real_value = data.loc[:,monitor_point_name].values
  49. index = data.index
  50. index = [ts.to_pydatetime().strftime("%Y-%m-%d %H:%M:%S") for ts in index]
  51. monitor_info = {
  52. 'md_version_id': model_version_id,
  53. 'ports_out' : []
  54. }
  55. for idx,ts in enumerate(index):
  56. monitor_info['ports_out'].append(
  57. {
  58. 'point_id': monitor_point_name,
  59. 'ts' : ts,
  60. 'wf_value': float(pred_value[idx]),
  61. 'ac_value': float(real_value[idx])
  62. }
  63. )
  64. push_list.append(monitor_info)
  65. success = send_back_monitor_data(push_list)
  66. if success:
  67. print('推送成功')
  68. else:
  69. print('push_list',push_list)
  70. raise Exception('推送失败')
  71. except Exception as e:
  72. print(e)
  73. raise
  74. def load_data(
  75. each_eaup_name,
  76. config_reader,
  77. config_reader_path,
  78. data_URL,
  79. ):
  80. data_input_point = config_reader.get_equp_point(
  81. each_eaup_name,equp_class=['A','B'])
  82. data = (
  83. DataLoader(
  84. path = f'{config_reader_path}/data/monitor/data_cur/',
  85. start_time = NOW - timedelta(minutes=60),
  86. end_time = NOW,
  87. print_process = False
  88. )
  89. .download_equp_data(
  90. equp_name = each_eaup_name,
  91. point = data_input_point,
  92. url = data_URL,
  93. clean_cache = True
  94. )
  95. .get_equp_data(
  96. equp_name = each_eaup_name,
  97. )
  98. )
  99. summary_dataframe(data,f'{each_eaup_name}除湿机数据')
  100. return data
  101. def init_monitor(
  102. each_eaup_name,
  103. config_reader:ConfigReader,
  104. predict,
  105. model
  106. ):
  107. pd.to_pickle({},MODEL_FILE_PATH)
  108. model_info = {}
  109. point_monitor = config_reader.get_equp_point(each_eaup_name,'B')
  110. for each_point_name in point_monitor:
  111. if each_point_name not in predict.columns:
  112. continue
  113. if each_point_name not in model.model_observe_data_columns:
  114. continue
  115. each_point_info = config_reader.get_point_info(each_eaup_name,each_point_name)
  116. model_info[each_point_name] = {
  117. 'metric' : {'MAE':10,'MAPE':1},
  118. 'point_id' : each_point_name,
  119. 'point_class': each_point_name,
  120. 'point_name' : f'{each_point_info["部件"]}_{each_point_info["名称"]}',
  121. 'thre_mae' : 10,
  122. 'thre_mape' : 1,
  123. 'thre_days' : 1,
  124. }
  125. update(
  126. version_id = 1,
  127. model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'),
  128. model_info = model_info,
  129. update_method = 'update',
  130. MODEL_FUNC_PATH = MODEL_FUNC_PATH,
  131. MODEL_FILE_PATH = MODEL_FILE_PATH,
  132. )