import os from datetime import datetime,timedelta from pathlib import Path from pprint import pprint from typing import Union import pandas as pd from ...model.DHU.DHU_AB import DHU_AB from ...model.DHU.SDHU_AB import SDHU_AB from .config_reader import ConfigReader from ...tools.data_loader import DataLoader from ..._utils.data_summary import print_dataframe,summary_dataframe def optimize(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx') ALL_RESULT = { 'EXCEPTION':{ 'State' : {}, 'Mod' : {}, 'Data_ATD' : {}, 'Opt' : {}, 'Push' : {} }, 'STATUS':{ 'Mode_Steady': [], 'Mode_Low' : [], 'Mode_Off' : [] } } ALL_OPT_RES = [] for each_eaup_name,each_eaup_name_short in zip( config_reader.all_equp_names, config_reader.all_equp_names_short ): # 加载模型 # 加载数据 # 运行判断 # 稳态判断:房间露点设定值与反馈值是否接近 # 模型调整 # 模型判断:模型精度是否满足要求 # 模式判断: # 1. 基于当前露点优化模式:基于房间露点设定值减偏差 # 2. 快速提升送风露点模式:约束送风露点保持不变 if not config_reader.get_app_info(each_eaup_name,'实时优化','开始实时优化','bool'): continue each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') global NOW NOW = config_reader.get_app_info(each_eaup_name,'实时优化','运行时间','datetime').replace(second=0,microsecond=0) print(f'{each_eaup_name}开始实时优化,设备类型为{each_equp_type},运行时间为{NOW}') try: dhu_State = load_dhu_State( each_eaup_name = each_eaup_name, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL, NOW = NOW ) print(f'{each_eaup_name}设备状态为{dhu_State}') if dhu_State < 1: print('设备处于非运行状态,跳过') ALL_RESULT['STATUS']['Mode_Off'].append(each_eaup_name) continue except Exception as e: ALL_RESULT['EXCEPTION']['State'][each_eaup_name] = e raise e continue # 加载模型 try: MODEL = load_model( each_eaup_name = each_eaup_name, each_equp_type = each_equp_type, config_reader = config_reader, config_reader_path = config_reader_path ) except Exception as e: ALL_RESULT['EXCEPTION']['Mod'][each_eaup_name] = e continue try: data_dhu = load_data_dhu( each_eaup_name = each_eaup_name, each_equp_type = each_equp_type, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL ) except Exception as e: ALL_RESULT['EXCEPTION']['Data_ATD'][each_eaup_name] = e continue try: ALL_RESULT['STATUS']['Mode_Steady'].append(each_eaup_name) opt_result = optimize_dhu( MODEL = MODEL, data_cur = data_dhu, config_reader = config_reader, each_eaup_name = each_eaup_name, each_equp_type = each_equp_type ) ALL_OPT_RES.append(opt_result['opt_summary'][1].assign(equp_name=each_eaup_name)) except Exception as e: ALL_RESULT['EXCEPTION']['Opt'][each_eaup_name] = e raise e continue try: push_result( each_eaup_name = each_eaup_name, each_eaup_name_short = each_eaup_name_short, each_equp_type = each_equp_type, config_reader = config_reader, opt_result = opt_result, data_cur = data_dhu ) except Exception as e: ALL_RESULT['EXCEPTION']['Push'][each_eaup_name] = e continue pprint(ALL_RESULT) print_dataframe(pd.concat(ALL_OPT_RES,axis=0),'优化结果') def load_model( each_eaup_name, each_equp_type, config_reader:ConfigReader, config_reader_path, use_adj_name = True ): adjust_equp_name = config_reader.get_equp_info(each_eaup_name,'替代模型','str') if (adjust_equp_name in config_reader.all_equp_names_total) and use_adj_name: print(f'{each_eaup_name}使用替代模型{adjust_equp_name}') each_eaup_name = adjust_equp_name else: pass if each_equp_type in ['DHU_A','DHU_B']: MODEL = DHU_AB elif each_equp_type in ['SDHU_A','SDHU_B']: MODEL = SDHU_AB else: raise NotImplementedError if config_reader.get_app_info( each_eaup_name, app_type = '实时优化', key = '使用临时模型', info_type = 'bool' ): # 从文件夹中获取临时模型 MODEL = MODEL.load(path=f'{config_reader_path}/model/{each_eaup_name}.pkl') else: MODEL = MODEL.load_from_platform( source = 'id', model_id = config_reader.get_equp_info( equp_name = each_eaup_name, key = '模型编号', info_type = 'str' ) ) return MODEL def load_dhu_State( each_eaup_name, config_reader, config_reader_path, data_URL, NOW, ): data_last = ( DataLoader( path = f'{config_reader_path}/data/optimize/data_cur/', start_time = NOW - timedelta(minutes=1), end_time = NOW, print_process = False ) .download_equp_data( equp_name = each_eaup_name, point = {'State':config_reader.get_equp_point(each_eaup_name,equp_class=['A'])['State']}, url = data_URL, clean_cache = True ) .get_equp_data( equp_name = each_eaup_name, ) ) return data_last.iat[-1,0] def load_data_room( each_eaup_name, each_equp_type, config_reader, config_reader_path, data_URL ): room_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','房间稳态判断时长','float') room_data_loader = DataLoader( path = f'{config_reader_path}/data/optimize/data_cur/', start_time = NOW - timedelta(minutes=room_steady_len), end_time = NOW, print_process = False ) room_data_loader.download_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,equp_class=['C']), url = data_URL, clean_cache = True ) room_data = room_data_loader.get_equp_data(each_eaup_name) summary_dataframe(room_data,f'{each_eaup_name}房间数据') room_Dew_SP_adj = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点设定值偏差','float') room_Dew_SP = room_data.room_DSP.mean() + room_Dew_SP_adj room_Dew_PV = room_data.room_DPV.mean() room_Dew_diff_dwlim = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点过低阈值','float') # 模式判断 is_room_Dew_steady = (room_Dew_PV < (room_Dew_SP + 0.5)) and (room_Dew_PV > (room_Dew_SP - 0.5)) is_room_Dew_low = room_Dew_PV < (room_Dew_SP - room_Dew_diff_dwlim) data_room = { 'is_room_Dew_steady': is_room_Dew_steady, 'is_room_Dew_low' : is_room_Dew_low, 'room_Dew_SP' : room_Dew_SP, 'room_Dew_PV' : room_Dew_PV } return data_room def load_data_dhu( each_eaup_name, each_equp_type, config_reader, config_reader_path, data_URL, ): dhu_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','除湿机工况均值时长','float') data_input_point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','B']) data_last = ( DataLoader( path = f'{config_reader_path}/data/optimize/data_cur/', start_time = NOW - timedelta(minutes=dhu_steady_len), end_time = NOW, print_process = False ) .download_equp_data( equp_name = each_eaup_name, point = data_input_point, url = data_URL, clean_cache = True ) .get_equp_data( equp_name = each_eaup_name, ) ) data_cur = data_last.mean(axis=0).to_frame().T summary_dataframe(data_last,f'{each_eaup_name}除湿机数据') return data_cur def optimize_dhu( MODEL : Union[DHU_AB,SDHU_AB], data_cur : pd.DataFrame, config_reader : ConfigReader, each_eaup_name : str, each_equp_type : str ): # 模型精度判断 predict = MODEL.predict_system(input_data=data_cur) TVP_data = ( predict.T.set_axis(['pred'],axis=1) .join( data_cur.T.set_axis(['real'],axis=1), how = 'left' ) .dropna(axis=0) .assign(diff=lambda x: x['pred'] - x['real']) ) print(TVP_data) # 实时优化 if each_equp_type in ['DHU_A','DHU_B']: opt_res = MODEL.optimize( cur_input_data=data_cur, wheel_1_TinR=( config_reader.get_equp_info(each_eaup_name,key='前再生温度上限',info_type='float'), config_reader.get_equp_info(each_eaup_name,key='前再生温度下限',info_type='float') ), wheel_2_TinR=( config_reader.get_equp_info(each_eaup_name,key='后再生温度上限',info_type='float'), config_reader.get_equp_info(each_eaup_name,key='后再生温度下限',info_type='float') ), fan_2_Hz = None, constrains = ['coil_3_DoutA-[coil_3_DoutA]<0'], logging = False, target = 'summary_waste', target_min = True ) elif each_equp_type in ['SDHU_A','SDHU_B']: opt_res = MODEL.optimize( cur_input_data = data_cur, wheel_1_TinR = ( config_reader.get_equp_info(each_eaup_name,key='前再生温度上限',info_type='float'), config_reader.get_equp_info(each_eaup_name,key='前再生温度下限',info_type='float') ), fan_2_Hz = (30,45), constrains = ['wheel_1_DoutP-[wheel_1_DoutP]<0'], logging = False, target = 'summary_waste', target_min = True ) opt_summary = opt_res['opt_summary'] opt_var = opt_res['opt_var'] print_dataframe(opt_summary,'优化结果') print_dataframe(opt_var,'优化变量') return { 'opt_summary': opt_summary, 'opt_var' : opt_var, } def push_result( each_eaup_name, each_eaup_name_short, each_equp_type, config_reader:ConfigReader, opt_result:dict, data_cur:pd.DataFrame ): equp_point = config_reader.get_equp_point(each_eaup_name,equp_class=['D']) if each_equp_type in ['DHU_A','DHU_B']: rcmd = [ { 'name' : '前再生加热温度', 'rcmd_value': opt_result['opt_var'][0].iat[0,0], 'children' : [ { 'name' : '前再生加热温度', 'rcmd_value': opt_result['opt_var'][0].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['wheel_1_TinR']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["wheel_1_TinR_AISP"]}' } ] }, { 'name' : '后再生加热温度', 'rcmd_value': opt_result['opt_var'][1].iat[0,0], 'children' : [ { 'name' : '后再生加热温度', 'rcmd_value': opt_result['opt_var'][1].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['wheel_2_TinR']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["wheel_2_TinR_AISP"]}' } ] }, ] elif each_equp_type in ['SDHU_A','SDHU_B']: rcmd = [ { 'name' : '再生加热温度', 'rcmd_value': opt_result['opt_var'][0].iat[0,0], 'children' : [ { 'name' : '再生加热温度', 'rcmd_value': opt_result['opt_var'][0].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['wheel_1_TinR']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["wheel_1_TinR_AISP"]}' } ] }, { 'name' : '排风机频率', 'rcmd_value': opt_result['opt_var'][1].iat[0,0], 'children' : [ { 'name' : '排风机频率', 'rcmd_value': opt_result['opt_var'][1].round(1).iat[0,0], 'curr_value': data_cur.loc[:,['fan_2_Hz']].round(1).iat[0,0], 'point_id' : f'{each_eaup_name}_{equp_point["fan_2_Hz_AISP"]}' } ] }, ] else: raise Exception('MODEL_TYPE_ERROR') if config_reader.get_app_info(each_eaup_name,'实时优化','推送策略','bool'): add_ai_rcmd_operation( code = each_eaup_name_short, job_id = os.environ.get('JOB_ID', None), rcmd = rcmd, custom_details = [] ) else: pprint(rcmd) def add_ai_rcmd_operation(code,job_id,rcmd,custom_details): from workflowlib import requests json_info = { "job_id": job_id, "code" : code, "data" : { "rcmd" : rcmd, "custom_details": custom_details }, } print('ai_rcmd',json_info) try: # 下发分步指令 url = 'http://m2-backend-svc:8000/api/ai/ai_rcmd_operation/add_ai_rcmd_operation_v3' r = requests.post(url,json=json_info,headers={'Content-Type': "application/json", "Connection": "close"}) jsonResp = r.json() print(f'下发指令 jsonResp',jsonResp) except Exception as e: print(e) return