import os from datetime import datetime,timedelta import pandas as pd from workflow_utils import send_back_monitor_data from workflow_utils import get_model_version_file_v2 from .config_reader import ConfigReader from ...tools.data_loader import DataLoader from ..._utils.data_summary import print_dataframe,summary_dataframe from ...model._base._update_utils import update from ..DHU.optimize import load_model NOW = datetime.now().replace(minute=0,second=0,microsecond=0) PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/') MODEL_FUNC_PATH = f'{PATH}/model_func.py' MODEL_FILE_PATH = f'./model.pkl' def monitor(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx') for each_eaup_name,each_eaup_name_short in zip( config_reader.all_equp_names, config_reader.all_equp_names_short ): try: each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') data = load_data(each_eaup_name,config_reader,config_reader_path,data_URL) model = load_model(each_eaup_name,each_equp_type,config_reader,config_reader_path,False) if each_equp_type in ['DHU_A','DHU_B']: model.find_F_air_val_rw(data,data,plot=False) predict = model.predict_system(data) data.loc[data.State.values==0,:] = 0 need_init = config_reader.get_app_info(each_eaup_name,'模型监控','初始化监控','bool') if need_init: init_monitor(each_eaup_name,config_reader,predict,model) _,model_version_id = get_model_version_file_v2( model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','int'), filename = 'model_func.py' ) push_list = [] for monitor_point_name in config_reader.get_equp_point(each_eaup_name,'B'): if monitor_point_name not in predict.columns: continue pred_value = predict.loc[:,monitor_point_name].values real_value = data.loc[:,monitor_point_name].values index = data.index index = [ts.to_pydatetime().strftime("%Y-%m-%d %H:%M:%S") for ts in index] monitor_info = { 'md_version_id': model_version_id, 'ports_out' : [] } for idx,ts in enumerate(index): monitor_info['ports_out'].append( { 'point_id': monitor_point_name, 'ts' : ts, 'wf_value': float(pred_value[idx]), 'ac_value': float(real_value[idx]) } ) push_list.append(monitor_info) success = send_back_monitor_data(push_list) if success: print('推送成功') else: print('push_list',push_list) raise Exception('推送失败') except Exception as e: print(e) raise def load_data( each_eaup_name, config_reader, config_reader_path, data_URL, ): data_input_point = config_reader.get_equp_point( each_eaup_name,equp_class=['A','B']) data = ( DataLoader( path = f'{config_reader_path}/data/monitor/data_cur/', start_time = NOW - timedelta(minutes=60), end_time = NOW, print_process = False ) .download_equp_data( equp_name = each_eaup_name, point = data_input_point, url = data_URL, clean_cache = True ) .get_equp_data( equp_name = each_eaup_name, ) ) summary_dataframe(data,f'{each_eaup_name}除湿机数据') return data def init_monitor( each_eaup_name, config_reader:ConfigReader, predict, model ): pd.to_pickle({},MODEL_FILE_PATH) model_info = {} point_monitor = config_reader.get_equp_point(each_eaup_name,'B') for each_point_name in point_monitor: if each_point_name not in predict.columns: continue if each_point_name not in model.model_observe_data_columns: continue each_point_info = config_reader.get_point_info(each_eaup_name,each_point_name) model_info[each_point_name] = { 'metric' : {'MAE':10,'MAPE':1}, 'point_id' : each_point_name, 'point_class': each_point_name, 'point_name' : f'{each_point_info["部件"]}_{each_point_info["名称"]}', 'thre_mae' : 10, 'thre_mape' : 1, 'thre_days' : 1, } update( version_id = 1, model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'), model_info = model_info, update_method = 'update', MODEL_FUNC_PATH = MODEL_FUNC_PATH, MODEL_FILE_PATH = MODEL_FILE_PATH, )