| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- import os
- from datetime import datetime,timedelta
- from pathlib import Path
- from pprint import pprint
- from typing import Union
- import numpy as np
- import pandas as pd
- from ...model.DHU.DHU_AB import DHU_AB
- from ...model.DHU.SDHU_AB import SDHU_AB
- from .config_reader import ConfigReader
- from ...tools.data_loader import DataLoader
- from ..._utils.data_summary import print_dataframe,summary_dataframe
- def optimize(*inputs,config=None):
- config = {} if config is None else config
-
- if '__LOCAL' in config.keys():
- config_reader_path = config['__LOCAL']
- data_URL = config['__URL']
- else:
- config_reader_path = '/mnt/workflow_data'
- data_URL = 'http://basedataportal-svc:8080/data/getpointsdata'
-
- config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx')
-
- ALL_RESULT = {
- 'EXCEPTION':{
- 'State' : {},
- 'Mod' : {},
- 'Data_ATD' : {},
- 'Opt' : {},
- 'Push' : {}
- },
- 'STATUS':{
- 'Mode_Steady': [],
- 'Mode_Low' : [],
- 'Mode_Off' : []
- }
- }
- ALL_OPT_RES = []
-
- for each_eaup_name,each_eaup_name_short in zip(
- config_reader.all_equp_names,
- config_reader.all_equp_names_short
- ):
-
- if not config_reader.get_app_info(each_eaup_name,'实时优化','开始实时优化','bool'):
- continue
- each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str')
-
- global NOW
- NOW = config_reader.get_app_info(each_eaup_name,'实时优化','运行时间','datetime').replace(second=0,microsecond=0)
- print(f'{each_eaup_name}开始实时优化,设备类型为{each_equp_type},运行时间为{NOW}')
-
- try:
- dhu_State = load_dhu_State(
- each_eaup_name = each_eaup_name,
- config_reader = config_reader,
- config_reader_path = config_reader_path,
- data_URL = data_URL,
- NOW = NOW
- )
- print(f'{each_eaup_name}设备状态为{dhu_State}')
- if dhu_State < 1:
- print('设备处于非运行状态,跳过')
- ALL_RESULT['STATUS']['Mode_Off'].append(each_eaup_name)
- continue
- except Exception as e:
- ALL_RESULT['EXCEPTION']['State'][each_eaup_name] = e
- continue
-
- # 加载模型
- try:
- MODEL = load_model(
- each_eaup_name = each_eaup_name,
- each_equp_type = each_equp_type,
- config_reader = config_reader,
- config_reader_path = config_reader_path
- )
- except Exception as e:
- ALL_RESULT['EXCEPTION']['Mod'][each_eaup_name] = e
- continue
-
- try:
- data_dhu = load_data_dhu(
- each_eaup_name = each_eaup_name,
- each_equp_type = each_equp_type,
- config_reader = config_reader,
- config_reader_path = config_reader_path,
- data_URL = data_URL
- )
- except Exception as e:
- ALL_RESULT['EXCEPTION']['Data_ATD'][each_eaup_name] = e
- continue
-
- try:
- ALL_RESULT['STATUS']['Mode_Steady'].append(each_eaup_name)
-
- opt_result = optimize_dhu(
- MODEL = MODEL,
- data_cur = data_dhu,
- config_reader = config_reader,
- each_eaup_name = each_eaup_name,
- each_equp_type = each_equp_type
- )
- ALL_OPT_RES.append(opt_result['opt_summary'][1].assign(equp_name=each_eaup_name))
- except Exception as e:
- ALL_RESULT['EXCEPTION']['Opt'][each_eaup_name] = e
- continue
-
- try:
- push_result(
- each_eaup_name = each_eaup_name,
- each_eaup_name_short = each_eaup_name_short,
- each_equp_type = each_equp_type,
- config_reader = config_reader,
- opt_result = opt_result,
- data_cur = data_dhu
- )
- except Exception as e:
- ALL_RESULT['EXCEPTION']['Push'][each_eaup_name] = e
- continue
- pprint(ALL_RESULT)
- print_dataframe(pd.concat(ALL_OPT_RES,axis=0),'优化结果')
-
- def load_model(
- each_eaup_name,
- each_equp_type,
- config_reader:ConfigReader,
- config_reader_path,
- use_adj_name = True
- ):
- adjust_equp_name = config_reader.get_equp_info(each_eaup_name,'替代模型','str')
- if (adjust_equp_name in config_reader.all_equp_names_total) and use_adj_name:
- print(f'{each_eaup_name}使用替代模型{adjust_equp_name}')
- each_eaup_name = adjust_equp_name
- else:
- pass
-
- if each_equp_type in ['DHU_A','DHU_B']:
- MODEL = DHU_AB
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- MODEL = SDHU_AB
- else:
- raise NotImplementedError
-
- if config_reader.get_app_info(
- each_eaup_name,
- app_type = '实时优化',
- key = '使用临时模型',
- info_type = 'bool'
- ):
- # 从文件夹中获取临时模型
- MODEL = MODEL.load(path=f'{config_reader_path}/model/{each_eaup_name}.pkl')
- else:
- MODEL = MODEL.load_from_platform(
- source = 'id',
- model_id = config_reader.get_equp_info(
- equp_name = each_eaup_name,
- key = '模型编号',
- info_type = 'str'
- )
- )
- return MODEL
- def load_dhu_State(
- each_eaup_name,
- config_reader,
- config_reader_path,
- data_URL,
- NOW,
- ):
- data_last = (
- DataLoader(
- path = f'{config_reader_path}/data/optimize/data_cur/',
- start_time = NOW - timedelta(minutes=1),
- end_time = NOW,
- print_process = False
- )
- .download_equp_data(
- equp_name = each_eaup_name,
- point = {'State':config_reader.get_equp_point(each_eaup_name,equp_class=['A'])['State']},
- url = data_URL,
- clean_cache = True
- )
- .get_equp_data(
- equp_name = each_eaup_name,
- )
- )
- return data_last.iat[-1,0]
- def load_data_room(
- each_eaup_name,
- each_equp_type,
- config_reader,
- config_reader_path,
- data_URL
- ):
- room_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','房间稳态判断时长','float')
- room_data_loader = DataLoader(
- path = f'{config_reader_path}/data/optimize/data_cur/',
- start_time = NOW - timedelta(minutes=room_steady_len),
- end_time = NOW,
- print_process = False
- )
- room_data_loader.download_equp_data(
- equp_name = each_eaup_name,
- point = config_reader.get_equp_point(each_eaup_name,equp_class=['C']),
- url = data_URL,
- clean_cache = True
- )
- room_data = room_data_loader.get_equp_data(each_eaup_name)
- summary_dataframe(room_data,f'{each_eaup_name}房间数据')
- room_Dew_SP_adj = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点设定值偏差','float')
- room_Dew_SP = room_data.room_DSP.mean() + room_Dew_SP_adj
- room_Dew_PV = room_data.room_DPV.mean()
- room_Dew_diff_dwlim = config_reader.get_app_info(each_eaup_name,'实时优化','房间露点过低阈值','float')
-
- # 模式判断
- is_room_Dew_steady = (room_Dew_PV < (room_Dew_SP + 0.5)) and (room_Dew_PV > (room_Dew_SP - 0.5))
- is_room_Dew_low = room_Dew_PV < (room_Dew_SP - room_Dew_diff_dwlim)
-
- data_room = {
- 'is_room_Dew_steady': is_room_Dew_steady,
- 'is_room_Dew_low' : is_room_Dew_low,
- 'room_Dew_SP' : room_Dew_SP,
- 'room_Dew_PV' : room_Dew_PV
- }
- return data_room
- def load_data_dhu(
- each_eaup_name,
- each_equp_type,
- config_reader,
- config_reader_path,
- data_URL,
- ):
- dhu_steady_len = config_reader.get_app_info(each_eaup_name,'实时优化','除湿机工况均值时长','float')
- data_input_point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','B'])
- data_last = (
- DataLoader(
- path = f'{config_reader_path}/data/optimize/data_cur/',
- start_time = NOW - timedelta(minutes=dhu_steady_len),
- end_time = NOW,
- print_process = False
- )
- .download_equp_data(
- equp_name = each_eaup_name,
- point = data_input_point,
- url = data_URL,
- clean_cache = True
- )
- .get_equp_data(
- equp_name = each_eaup_name,
- )
- )
- data_cur = data_last.mean(axis=0).to_frame().T
- summary_dataframe(data_last,f'{each_eaup_name}除湿机数据')
- return data_cur
- def optimize_dhu(
- MODEL : Union[DHU_AB,SDHU_AB],
- data_cur : pd.DataFrame,
- config_reader : ConfigReader,
- each_eaup_name : str,
- each_equp_type : str
- ):
- # 模型精度判断
- predict = MODEL.predict_system(input_data=data_cur)
- TVP_data = (
- predict.T.set_axis(['pred'],axis=1)
- .join(
- data_cur.T.set_axis(['real'],axis=1),
- how = 'left'
- )
- .dropna(axis=0)
- .assign(diff=lambda x: x['pred'] - x['real'])
- )
- print(TVP_data)
-
- # 约束条件
- constrains = []
- if each_equp_type in ['DHU_A','DHU_B']:
- constrains.append('#送风露点约束# coil_3_DoutA-[coil_3_DoutA]<0')
- Tdup = config_reader.get_equp_info(each_eaup_name,key='前后再生差值上限',info_type='float')
- Tddw = config_reader.get_equp_info(each_eaup_name,key='前后再生差值下限',info_type='float')
- if not np.isnan(Tdup):
- constrains.append(f'#前后再生差值上限# (wheel_1_TinR-wheel_2_TinR)-{Tdup}<0')
- if not np.isnan(Tddw):
- constrains.append(f'#前后再生差值下限# {Tddw}-(wheel_1_TinR-wheel_2_TinR)<0')
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- constrains.append('#送风露点约束# wheel_1_DoutP-[wheel_1_DoutP]<0')
- print('约束条件')
- pprint(constrains)
-
- # 实时优化
- if each_equp_type in ['DHU_A','DHU_B']:
- opt_res = MODEL.optimize(
- cur_input_data=data_cur,
- wheel_1_TinR=(
- config_reader.get_equp_info(each_eaup_name,key='前再生温度上限',info_type='float'),
- config_reader.get_equp_info(each_eaup_name,key='前再生温度下限',info_type='float')
- ),
- wheel_2_TinR=(
- config_reader.get_equp_info(each_eaup_name,key='后再生温度上限',info_type='float'),
- config_reader.get_equp_info(each_eaup_name,key='后再生温度下限',info_type='float')
- ),
- fan_2_Hz = None,
- constrains = constrains,
- logging = False,
- target = 'summary_waste',
- target_min = True
- )
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- opt_res = MODEL.optimize(
- cur_input_data = data_cur,
- wheel_1_TinR = (
- config_reader.get_equp_info(each_eaup_name,key='前再生温度上限',info_type='float'),
- config_reader.get_equp_info(each_eaup_name,key='前再生温度下限',info_type='float')
- ),
- fan_2_Hz = (30,45),
- constrains = constrains,
- logging = False,
- target = 'summary_waste',
- target_min = True
- )
- opt_summary = opt_res['opt_summary']
- opt_var = opt_res['opt_var']
- print_dataframe(opt_summary,'优化结果')
- print_dataframe(opt_var,'优化变量')
- return {
- 'opt_summary': opt_summary,
- 'opt_var' : opt_var,
- }
- def push_result(
- each_eaup_name,
- each_eaup_name_short,
- each_equp_type,
- config_reader:ConfigReader,
- opt_result:dict,
- data_cur:pd.DataFrame
- ):
- equp_point = config_reader.get_equp_point(each_eaup_name,equp_class=['D'])
- if each_equp_type in ['DHU_A','DHU_B']:
- rcmd = [
- {
- 'name' : '前再生加热温度',
- 'rcmd_value': opt_result['opt_var'][0].iat[0,0],
- 'children' : [
- {
- 'name' : '前再生加热温度',
- 'rcmd_value': opt_result['opt_var'][0].round(1).iat[0,0],
- 'curr_value': data_cur.loc[:,['wheel_1_TinR']].round(1).iat[0,0],
- 'point_id' : f'{each_eaup_name}_{equp_point["wheel_1_TinR_AISP"]}'
- }
- ]
- },
- {
- 'name' : '后再生加热温度',
- 'rcmd_value': opt_result['opt_var'][1].iat[0,0],
- 'children' : [
- {
- 'name' : '后再生加热温度',
- 'rcmd_value': opt_result['opt_var'][1].round(1).iat[0,0],
- 'curr_value': data_cur.loc[:,['wheel_2_TinR']].round(1).iat[0,0],
- 'point_id' : f'{each_eaup_name}_{equp_point["wheel_2_TinR_AISP"]}'
- }
- ]
- },
- ]
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- rcmd = [
- {
- 'name' : '再生加热温度',
- 'rcmd_value': opt_result['opt_var'][0].iat[0,0],
- 'children' : [
- {
- 'name' : '再生加热温度',
- 'rcmd_value': opt_result['opt_var'][0].round(1).iat[0,0],
- 'curr_value': data_cur.loc[:,['wheel_1_TinR']].round(1).iat[0,0],
- 'point_id' : f'{each_eaup_name}_{equp_point["wheel_1_TinR_AISP"]}'
- }
- ]
- },
- {
- 'name' : '排风机频率',
- 'rcmd_value': opt_result['opt_var'][1].iat[0,0],
- 'children' : [
- {
- 'name' : '排风机频率',
- 'rcmd_value': opt_result['opt_var'][1].round(1).iat[0,0],
- 'curr_value': data_cur.loc[:,['fan_2_Hz']].round(1).iat[0,0],
- 'point_id' : f'{each_eaup_name}_{equp_point["fan_2_Hz_AISP"]}'
- }
- ]
- },
- ]
- else:
- raise Exception('MODEL_TYPE_ERROR')
-
- if config_reader.get_app_info(each_eaup_name,'实时优化','推送策略','bool'):
- add_ai_rcmd_operation(
- code = each_eaup_name_short,
- job_id = os.environ.get('JOB_ID', None),
- rcmd = rcmd,
- custom_details = []
- )
- else:
- pprint(rcmd)
-
-
- def add_ai_rcmd_operation(code,job_id,rcmd,custom_details):
- from workflowlib import requests
- json_info = {
- "job_id": job_id,
- "code" : code,
- "data" : {
- "rcmd" : rcmd,
- "custom_details": custom_details
- },
- }
- print('ai_rcmd',json_info)
- try:
- # 下发分步指令
- url = 'http://m2-backend-svc:8000/api/ai/ai_rcmd_operation/add_ai_rcmd_operation_v3'
- r = requests.post(url,json=json_info,headers={'Content-Type': "application/json", "Connection": "close"})
- jsonResp = r.json()
- print(f'下发指令 jsonResp',jsonResp)
- except Exception as e:
- print(e)
- return
|