import os from datetime import datetime,timedelta from pathlib import Path from pprint import pprint import pandas as pd from ...model.DHU.DHU_AB import DHU_AB from .config_reader import ConfigReader from ...tools.data_loader import DataLoader NOW = datetime.now().replace(second=0,microsecond=0) def optimize(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx') ALL_RESULT = { 'EXCEPTION':{ 'Mod' : {}, 'Data_Room': {}, 'Data_ATD' : {}, 'Opt' : {}, 'Push' : {} }, 'STATUS':{ 'Mode_Steady': [], 'Mode_Low' : [], 'Mode_stop' : [] } } for each_eaup_name,each_eaup_name_short in zip( config_reader.all_equp_names, config_reader.all_equp_names_short ): # 加载模型 # 加载数据 # 运行判断 # 稳态判断:房间露点设定值与反馈值是否接近 # 模型调整 # 模型判断:模型精度是否满足要求 # 模式判断: # 1. 基于当前露点优化模式:基于房间露点设定值减偏差 # 2. 快速提升送风露点模式:约束送风露点保持不变 if not config_reader.get_app_info(each_eaup_name,'实时优化','开始实时优化','bool'): continue each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') # 加载模型 try: MODEL = load_model( each_eaup_name = each_eaup_name, config_reader = config_reader, config_reader_path = config_reader_path ) except Exception as e: ALL_RESULT['EXCEPTION']['Mod'][each_eaup_name] = e continue try: data_room = load_data_room( each_eaup_name = each_eaup_name, each_equp_type = each_equp_type, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL ) if data_room['is_room_Dew_steady']: DewOut_constrain = 'coil_3_DoutA-[coil_3_DoutA]<0' ALL_RESULT['STATUS']['Mode_Steady'].append(each_eaup_name) elif data_room['is_room_Dew_low']: room_Dew_SP = data_room['room_Dew_SP'] #TODO 这个4度比较粗糙,待改进 DewOut_constrain = f'coil_3_DoutA-{room_Dew_SP-4}<0' ALL_RESULT['STATUS']['Mode_Low'].append(each_eaup_name) else: ALL_RESULT['STATUS']['Mode_stop'].append(each_eaup_name) continue except Exception as e: ALL_RESULT['STATUS']['Data_Room'][each_eaup_name] = e continue try: data_dhu = load_data_dhu( each_eaup_name = each_eaup_name, each_equp_type = each_equp_type, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL ) except Exception as e: ALL_RESULT['EXCEPTION']['Data_ATD'][each_eaup_name] = e continue try: opt_result = optimize_dhu( MODEL = MODEL, data_cur = data_dhu, DewOut_constrain = DewOut_constrain, config_reader = config_reader, each_eaup_name = each_eaup_name, each_equp_type = each_equp_type ) except Exception as e: ALL_RESULT['EXCEPTION']['Opt'][each_eaup_name] = e continue try: push_result( each_eaup_name = each_eaup_name, each_eaup_name_short = each_eaup_name_short, each_equp_type = each_equp_type, config_reader = config_reader, opt_result = opt_result, data_cur = data_dhu ) except Exception as e: ALL_RESULT['EXCEPTION']['Push'][each_eaup_name] = e continue pprint(ALL_RESULT) def load_model( each_eaup_name, config_reader:ConfigReader, config_reader_path ): adjust_equp_name = config_reader.get_equp_info(each_eaup_name,'替代模型','str') print('abc',adjust_equp_name) if adjust_equp_name in config_reader.all_equp_names_total: print(f'{each_eaup_name}使用替代模型{adjust_equp_name}') each_eaup_name = adjust_equp_name else: pass if config_reader.get_app_info( each_eaup_name, app_type = '实时优化', key = '使用临时模型', info_type = 'bool' ): # 从文件夹中获取临时模型 MODEL = DHU_AB.load(path=f'{config_reader_path}/model/{each_eaup_name}.pkl') else: MODEL = DHU_AB.load_from_platform( source = 'id', model_id = config_reader.get_equp_info( equp_name = each_eaup_name, key = '模型编号', info_type = 'str' ) ) return MODEL def load_data_room( each_eaup_name, each_equp_type, config_reader, config_reader_path, data_URL ): room_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','房间稳态判断时长','float') room_data_loader = DataLoader( path = f'{config_reader_path}/data/optimize/data_cur/', start_time = NOW - timedelta(minutes=room_steady_len), end_time = NOW ) room_data_loader.download_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,each_equp_type,equp_class=['C']), url = data_URL, clean_cache = True ) room_data = room_data_loader.get_equp_data(each_eaup_name) room_Dew_SP_adj = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点设定值偏差','float') room_Dew_SP = room_data.room_DSP.mean() + room_Dew_SP_adj room_Dew_PV = room_data.room_DPV.mean() room_Dew_diff_dwlim = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点过低阈值','float') # 模式判断 is_room_Dew_steady = (room_Dew_PV < room_Dew_SP + 0.5) or (room_Dew_PV > room_Dew_SP - 0.5) is_room_Dew_low = room_Dew_PV - room_Dew_SP < room_Dew_diff_dwlim data_room = { 'is_room_Dew_steady': is_room_Dew_steady, 'is_room_Dew_low' : is_room_Dew_low, 'room_Dew_SP' : room_Dew_SP } return data_room def load_data_dhu( each_eaup_name, each_equp_type, config_reader, config_reader_path, data_URL ): dhu_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','除湿机工况均值时长','float') data_input_point = config_reader.get_equp_point(each_eaup_name,each_equp_type,equp_class=['A','B']) data_cur = ( DataLoader( path = f'{config_reader_path}/data/optimize/data_cur/', start_time = NOW - timedelta(minutes=dhu_steady_len), end_time = NOW ) .download_equp_data( equp_name = each_eaup_name, point = data_input_point, url = data_URL, clean_cache = True ) .get_equp_data( equp_name = each_eaup_name, ) .mean(axis=0) .to_frame().T ) return data_cur def optimize_dhu( MODEL, data_cur, DewOut_constrain, config_reader, each_eaup_name, each_equp_type ): # 模型精度判断 predict = MODEL.predict_system(input_data=data_cur) TVP_data = ( predict.T.set_axis(['pred'],axis=1) .join( data_cur.T.set_axis(['real'],axis=1), how = 'left' ) .dropna(axis=0) .assign(diff=lambda x: x['pred'] - x['real']) ) print(TVP_data) # 实时优化 constrains = [DewOut_constrain] opt_res = MODEL.optimize( cur_input_data=data_cur, wheel_1_TinR=( config_reader.get_app_info(each_eaup_name,'实时优化','前再生盘管温度下限','float'), config_reader.get_app_info(each_eaup_name,'实时优化','前再生盘管温度上限','float') ), wheel_2_TinR=( config_reader.get_app_info(each_eaup_name,'实时优化','后再生盘管温度下限','float'), config_reader.get_app_info(each_eaup_name,'实时优化','后再生盘管温度上限','float') ), fan_2_Hz = None, constrains = constrains, logging = False, target = 'summary_waste', target_min = True ) opt_summary = opt_res['opt_summary'] opt_var = opt_res['opt_var'] print(opt_summary) print(opt_var) return { 'opt_summary': opt_summary, 'opt_var' : opt_var, } def push_result( each_eaup_name, each_eaup_name_short, each_equp_type, config_reader:ConfigReader, opt_result:dict, data_cur:pd.DataFrame ): equp_point = config_reader.get_equp_point(each_eaup_name,each_equp_type,equp_class=['D']) if each_equp_type in ['DHU_A','DHU_B']: rcmd = [ { 'name' : '前再生加热温度', 'rcmd_value': opt_result['opt_var'][0].iat[0,0], 'children' : [ { 'name' : '前再生加热温度', 'rcmd_value': opt_result['opt_var'][0].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['wheel_1_TinR']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["wheel_1_TinR_AISP"]}' } ] }, { 'name' : '后再生加热温度', 'rcmd_value': opt_result['opt_var'][1].iat[0,0], 'children' : [ { 'name' : '后再生加热温度', 'rcmd_value': opt_result['opt_var'][1].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['wheel_2_TinR']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["wheel_2_TinR_AISP"]}' } ] }, ] else: raise Exception('MODEL_TYPE_ERROR') if config_reader.get_app_info(each_eaup_name,'实时优化','推送策略','bool'): add_ai_rcmd_operation( code = each_eaup_name_short, job_id = os.environ.get('JOB_ID', None), rcmd = rcmd, custom_details = [] ) else: pprint(rcmd) def add_ai_rcmd_operation(code,job_id,rcmd,custom_details): from workflowlib import requests json_info = { "job_id": job_id, "code" : code, "data" : { "rcmd" : rcmd, "custom_details": custom_details }, } print('ai_rcmd',json_info) try: # 下发分步指令 url = 'http://m2-backend-svc:8000/api/ai/ai_rcmd_operation/add_ai_rcmd_operation_v3' r = requests.post(url,json=json_info,headers={'Content-Type': "application/json", "Connection": "close"}) jsonResp = r.json() print(f'下发指令 jsonResp',jsonResp) except Exception as e: print(e) return