| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import os
- from datetime import datetime,timedelta
- import pandas as pd
- from workflow_utils import send_back_monitor_data
- from workflow_utils import get_model_version_file_v2
- from .config_reader import ConfigReader
- from ...tools.data_loader import DataLoader
- from ..._utils.data_summary import print_dataframe,summary_dataframe
- from ...model._base._update_utils import update
- from ..DHU.optimize import load_model
- NOW = datetime.now().replace(minute=0,second=0,microsecond=0)
- PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/')
- MODEL_FUNC_PATH = f'{PATH}/model_func.py'
- MODEL_FILE_PATH = f'./model.pkl'
- def monitor(*inputs,config=None):
- config = {} if config is None else config
-
- if '__LOCAL' in config.keys():
- config_reader_path = config['__LOCAL']
- data_URL = config['__URL']
- else:
- config_reader_path = '/mnt/workflow_data'
- data_URL = 'http://basedataportal-svc:8080/data/getpointsdata'
-
- config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx')
-
- for each_eaup_name,each_eaup_name_short in zip(
- config_reader.all_equp_names,
- config_reader.all_equp_names_short
- ):
- try:
-
- each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str')
- data = load_data(each_eaup_name,config_reader,config_reader_path,data_URL)
- model = load_model(each_eaup_name,each_equp_type,config_reader,config_reader_path,False)
- if each_equp_type in ['DHU_A','DHU_B']:
- model.find_F_air_val_rw(data,data,plot=False)
- predict = model.predict_system(data)
- data.loc[data.State.values==0,:] = 0
-
- need_init = config_reader.get_app_info(each_eaup_name,'模型监控','初始化监控','bool')
- if need_init:
- init_monitor(each_eaup_name,config_reader,predict,model)
-
- _,model_version_id = get_model_version_file_v2(
- model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','int'),
- filename = 'model_func.py'
- )
- push_list = []
- for monitor_point_name in config_reader.get_equp_point(each_eaup_name,'B'):
- if monitor_point_name not in predict.columns:
- continue
- pred_value = predict.loc[:,monitor_point_name].values
- real_value = data.loc[:,monitor_point_name].values
- index = data.index
- index = [ts.to_pydatetime().strftime("%Y-%m-%d %H:%M:%S") for ts in index]
-
- monitor_info = {
- 'md_version_id': model_version_id,
- 'ports_out' : []
- }
- for idx,ts in enumerate(index):
- monitor_info['ports_out'].append(
- {
- 'point_id': monitor_point_name,
- 'ts' : ts,
- 'wf_value': float(pred_value[idx]),
- 'ac_value': float(real_value[idx])
- }
- )
- push_list.append(monitor_info)
-
- success = send_back_monitor_data(push_list)
- if success:
- print('推送成功')
- else:
- print('push_list',push_list)
- raise Exception('推送失败')
-
- except Exception as e:
- print(e)
- raise
- def load_data(
- each_eaup_name,
- config_reader,
- config_reader_path,
- data_URL,
- ):
- data_input_point = config_reader.get_equp_point(
- each_eaup_name,equp_class=['A','B'])
- data = (
- DataLoader(
- path = f'{config_reader_path}/data/monitor/data_cur/',
- start_time = NOW - timedelta(minutes=60),
- end_time = NOW,
- print_process = False
- )
- .download_equp_data(
- equp_name = each_eaup_name,
- point = data_input_point,
- url = data_URL,
- clean_cache = True
- )
- .get_equp_data(
- equp_name = each_eaup_name,
- )
- )
- summary_dataframe(data,f'{each_eaup_name}除湿机数据')
- return data
- def init_monitor(
- each_eaup_name,
- config_reader:ConfigReader,
- predict,
- model
- ):
- pd.to_pickle({},MODEL_FILE_PATH)
- model_info = {}
- point_monitor = config_reader.get_equp_point(each_eaup_name,'B')
- for each_point_name in point_monitor:
- if each_point_name not in predict.columns:
- continue
- if each_point_name not in model.model_observe_data_columns:
- continue
- each_point_info = config_reader.get_point_info(each_eaup_name,each_point_name)
- model_info[each_point_name] = {
- 'metric' : {'MAE':10,'MAPE':1},
- 'point_id' : each_point_name,
- 'point_class': each_point_name,
- 'point_name' : f'{each_point_info["部件"]}_{each_point_info["名称"]}',
- 'thre_mae' : 10,
- 'thre_mape' : 1,
- 'thre_days' : 1,
- }
- update(
- version_id = 1,
- model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'),
- model_info = model_info,
- update_method = 'update',
- MODEL_FUNC_PATH = MODEL_FUNC_PATH,
- MODEL_FILE_PATH = MODEL_FILE_PATH,
- )
|