import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as pt from .._base._base_device import BaseDevice from ...components.coil_water import CoolingCoil2 from ...components.coil_steam import SteamCoilFs,SteamCoilFs2,SteamCoil from ...components.wheel import WheelS3 from ...components.mixed import Mixed from ..utils.fit_utils import ( observe,record,reorder_posterior,get_fitted_result ) from ...tools.optimizer import optimizer class DHU_A(BaseDevice): model_input_data_columns = { 'Tin_F' : 'coil_1_ToutA', 'Hin_F' : 'coil_1_HoutA', 'fan_1_Hz' : 'fan_1_Hz', 'fan_2_Hz' : 'fan_2_Hz', 'coil_1_TinW' : 'coil_1_TinW', 'coil_2_TinW' : 'coil_2_TinW', 'coil_3_TinW' : 'coil_3_TinW', 'coil_1_Val' : 'coil_1_Val', 'coil_2_Val' : 'coil_2_Val', 'coil_3_Val' : 'coil_3_Val', 'wheel_1_TinR': 'wheel_1_TinR', 'wheel_2_TinR': 'wheel_2_TinR', 'mixed_1_TinM': 'mixed_1_TinM', 'mixed_2_TinM': 'mixed_2_TinM', 'mixed_1_HinM': 'mixed_1_HinM', 'mixed_2_HinM': 'mixed_2_HinM', } model_observe_data_columns = { 'mixed_1_ToutA' : 'mixed_1_ToutA', 'mixed_1_DoutA' : 'mixed_1_DoutA', 'wheel_1_ToutC' : 'wheel_1_ToutC', 'coil_2_ToutA' : 'coil_2_ToutA', 'coil_2_DoutA' : 'coil_2_DoutA', 'wheel_2_ToutP' : 'wheel_2_ToutP', 'wheel_2_DoutP' : 'wheel_2_DoutP', 'wheel_2_ToutR' : 'wheel_2_ToutR', 'steamcoil_1_FP' : 'steamcoil_1_FP', 'steamcoil_2_FP' : 'steamcoil_2_FP', 'steamcoil_1_Fs' : 'steamcoil_1_Fs', 'steamcoil_2_Fs' : 'steamcoil_2_Fs', 'steamcoil_1_Val': 'steamcoil_1_Val', 'steamcoil_2_Val': 'steamcoil_2_Val', } def __init__(self) -> None: super().__init__() self.components = [ WheelS3('wheel_1'), WheelS3('wheel_2'), CoolingCoil2('coil_2'), CoolingCoil2('coil_3'), # SteamCoil('steamcoil_1'), # SteamCoil('steamcoil_2'), SteamCoilFs2('steamcoil_1'), SteamCoilFs('steamcoil_2'), Mixed('mixed_1'), Mixed('mixed_2'), ] self.components = {comp.name:comp for comp in self.components} def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, rw_FA_val : bool = False, plot_TVP : bool = True ): with pm.Model() as self.MODEL_PYMC: param_prior = {name:comp.prior() for name,comp in self.components.items()} param_prior['F_air'] = AirFlow.prior(rw_FA_val=rw_FA_val,N=len(input_data)) res = DHU_A.model( **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()}, engine = 'pymc', components = self.components, param = param_prior ) for std_name,name in self.model_observe_data_columns.items(): if name not in observed_data.columns: continue observed_data = observed_data.rename(columns={name:std_name}) observe('mixed_1_ToutA',res['mixed_1']['ToutA'],observed=observed_data) observe('mixed_1_DoutA',res['mixed_1']['DoutA'],observed=observed_data) observe('wheel_1_ToutC',res['wheel_1']['ToutC'],observed=observed_data) observe('coil_2_ToutA',res['coil_2']['ToutA'],observed=observed_data) observe('coil_2_DoutA',res['coil_2']['DoutA'],observed=observed_data) observe('wheel_2_ToutP',res['wheel_2']['ToutP'],observed=observed_data) observe('wheel_2_DoutP',res['wheel_2']['DoutP'],observed=observed_data) observe('wheel_2_ToutR',res['wheel_2']['ToutR'],observed=observed_data) observe('steamcoil_1_FP',res['steamcoil_1']['FP'],observed=observed_data,sigma=1000) observe('steamcoil_1_Fs',res['steamcoil_1']['Fs'],observed=observed_data,sigma=20) observe('steamcoil_2_Fs',res['steamcoil_2']['Fs'],observed=observed_data,sigma=20) # record('steamcoil_1_Fs',res['steamcoil_1']['Fs']) # record('steamcoil_2_Fs',res['steamcoil_2']['Fs']) record('wheel_2_ToutC',res['wheel_2']['ToutC']) record('mixed_2_ToutA',res['mixed_2']['ToutA']) record('wheel_1_FaP',res['wheel_1']['FP']) record('wheel_1_FaR',res['wheel_1']['FR']) record('wheel_1_FaC',res['wheel_1']['FC']) record('mixed_1_FaA',res['mixed_1']['FA']) record('mixed_1_FaM',res['mixed_1']['FM']) record('F_air_S',res['Fa']['Fa_S']) record('F_air_H',res['Fa']['Fa_H']) record('F_air_X',res['Fa']['Fa_X']) self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False) self.record_model( model_name = 'DHU', model = reorder_posterior(param_prior,self.param_posterior), train_data = {'x':np.array([1])}, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) self.TVP_data,self.TVP_metric = get_fitted_result(self.param_posterior,observed_data,plot_TVP) return self def predict(self,input_data:pd.DataFrame) -> dict: param_posterior = self.model_info['model_DHU'] res = DHU_A.model( **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()}, engine = 'numpy', components = self.components, param = param_posterior ) return res def predict_system(self,input_data:pd.DataFrame) -> pd.DataFrame: pred_res = self.predict(input_data) system_output = {} for equp_name,output_info in pred_res.items(): for output_name,output_value in output_info.items(): system_output[f'{equp_name}_{output_name}'] = output_value system_output = pd.DataFrame(system_output) system_output['Fs'] = system_output.steamcoil_1_Fs + system_output.steamcoil_2_Fs return system_output def optimize( self, cur_input_data : pd.DataFrame, wheel_1_TinR_ub: float = 120, wheel_1_TinR_lb: float = 70, wheel_2_TinR_ub: float = 120, wheel_2_TinR_lb: float = 70, constrains : list = None ) -> list: constrains = [] if constrains is None else constrains cur_input_data = cur_input_data.iloc[[0],:] opt_var_boundary = { 'wheel_1_TinR':{'lb':wheel_1_TinR_lb,'ub':wheel_1_TinR_ub}, 'wheel_2_TinR':{'lb':wheel_2_TinR_lb,'ub':wheel_2_TinR_ub}, } opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())] oth_var_value = ( cur_input_data .loc[:,list(self.model_input_data_columns.values())] .drop(opt_var_value.columns,axis=1) ) opt_res = optimizer( model = self, opt_var_boundary = opt_var_boundary, opt_var_value = opt_var_value, oth_var_value = oth_var_value, constrains = constrains ) return opt_res @classmethod def model( cls, Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 fan_1_Hz, # 处理侧风机频率 fan_2_Hz, # 再生侧风机频率 coil_1_TinW, # 前表冷进水温度 coil_2_TinW, # 中表冷进水温度 coil_3_TinW, # 后表冷进水温度 coil_1_Val, # 前表冷阀门开度 coil_2_Val, # 中表冷阀门开度 coil_3_Val, # 后表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 wheel_2_TinR, # 后转轮再生侧温度 mixed_1_TinM, # 回风温度(处理侧) mixed_1_HinM, # 回风湿度(处理侧) mixed_2_TinM, # 补风温度(再生侧) mixed_2_HinM, # 补风湿度(再生侧) engine : str, components: dict, param : dict, ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 coil_3_FW = coil_3_Val / 100 # 空气的质量流量 air_flow = AirFlow.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param) # 前转轮 wheel_1_res = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = 0, FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = components['mixed_1'].model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = air_flow['mixed_1_FaA'], TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = air_flow['mixed_1_FaM'], engine = engine ) # 中表冷 coil_2_res = components['coil_2'].model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = air_flow['coil_2_FaA'], TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮 wheel_2_res = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = 0, FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 后表冷 coil_3_res = components['coil_3'].model( TinA = wheel_2_res['ToutP'], HinA = wheel_2_res['HoutP'], FA = air_flow['coil_3_FaA'], TinW = coil_3_TinW, FW = coil_3_FW, engine = engine, param = param['coil_3'] ) # 后转轮湿度修正 wheel_2_res_adj = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = wheel_2_res['HoutC'], FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 再生侧混风(排风) mixed_2_res = components['mixed_2'].model( TinA = wheel_2_res_adj['ToutR'], HinA = wheel_2_res_adj['HoutR'], FA = air_flow['mixed_2_FaA'], TinM = mixed_2_TinM, HinM = mixed_2_HinM, FM = air_flow['mixed_2_FaM'], engine = engine ) # 前转轮湿度修正 wheel_1_res_adj = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = mixed_2_res['HoutA'], FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 前蒸气盘管 steamcoil_1_res = components['steamcoil_1'].model( TinA = mixed_2_res['ToutA'], ToutA = wheel_1_TinR, FA = air_flow['steamcoil_1_Fa'], param = param['steamcoil_1'], engine = engine ) # 后蒸气盘管 steamcoil_2_res = components['steamcoil_2'].model( TinA = wheel_2_res_adj['ToutC'], ToutA = wheel_2_TinR, FA = air_flow['steamcoil_2_Fa'], param = param['steamcoil_2'], engine = engine ) return { 'coil_2' : coil_2_res, 'coil_3' : coil_3_res, 'wheel_1' : wheel_1_res_adj, 'wheel_2' : wheel_2_res_adj, 'mixed_1' : mixed_1_res, 'mixed_2' : mixed_2_res, 'steamcoil_1': steamcoil_1_res, 'steamcoil_2': steamcoil_2_res, 'Fa' : air_flow, 'summary' : {} } class AirFlow: @classmethod def model(cls,fan_1_Hz,fan_2_Hz,param): # 空气的质量流量 F_air_HzP_H = param['F_air']['HzP_H'] F_air_HzP_X = param['F_air']['HzP_X'] F_air_HzP_S = F_air_HzP_H + F_air_HzP_X F_air_HzR_B = param['F_air']['HzR_B'] F_air_S_base = 1 F_air_X_base = param['F_air']['X_base'] F_air_H_base = param['F_air']['H_base'] F_air_B_base = param['F_air']['B_base'] F_air_val_rw = param['F_air'].get('val_rw',0) F_air_val_pct = param['F_air'].get('val_pct',0) F_air_X_base_adj = F_air_X_base + F_air_val_rw F_air_H_base_adj = F_air_H_base - F_air_val_rw * F_air_val_pct F_air_B_base_adj = F_air_B_base - F_air_val_rw * (1 - F_air_val_pct) Fa_S = F_air_S_base + F_air_HzP_S * (fan_1_Hz / 50) Fa_H = F_air_H_base_adj + F_air_HzP_H * (fan_1_Hz / 50) Fa_X = F_air_X_base_adj + F_air_HzP_X * (fan_1_Hz / 50) Fa_B = F_air_B_base_adj + F_air_HzR_B * (fan_2_Hz / 50) Fa_P = Fa_B + Fa_X + Fa_H - Fa_S wheel_1_FaP = Fa_S - Fa_H wheel_1_FaC = Fa_X - wheel_1_FaP wheel_1_FaR = Fa_P wheel_2_FaP = Fa_S wheel_2_FaC = wheel_1_FaC wheel_2_FaR = wheel_1_FaC mixed_1_FaM = Fa_H mixed_1_FaA = wheel_1_FaP mixed_2_FaM = Fa_B mixed_2_FaA = wheel_1_FaC coil_2_FaA = Fa_S coil_3_FaA = Fa_S steamcoil_1_Fa = Fa_P steamcoil_2_Fa = wheel_1_FaC return { 'Fa_S':Fa_S,'Fa_H':Fa_H,'Fa_X':Fa_X,'Fa_B':Fa_B,'Fa_P':Fa_P, 'wheel_1_FaP':wheel_1_FaP,'wheel_1_FaC':wheel_1_FaC,'wheel_1_FaR':wheel_1_FaR, 'wheel_2_FaP':wheel_2_FaP,'wheel_2_FaC':wheel_2_FaC,'wheel_2_FaR':wheel_2_FaR, 'mixed_1_FaM':mixed_1_FaM,'mixed_1_FaA':mixed_1_FaA, 'mixed_2_FaM':mixed_2_FaM,'mixed_2_FaA':mixed_2_FaA, 'coil_2_FaA':coil_2_FaA,'coil_3_FaA':coil_3_FaA, 'steamcoil_1_Fa':steamcoil_1_Fa,'steamcoil_2_Fa':steamcoil_2_Fa } @classmethod def prior(cls,rw_FA_val,N) -> dict: HzP_X = pm.HalfNormal('F_air_HzP_X',sigma=1,initval=1) HzP_H = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.1) HzR_B = pm.HalfNormal('F_air_HzR_B',sigma=1,initval=0.5) X_base = pm.TruncatedNormal('F_air_X_base',mu=0.5,sigma=0.2,lower=0,initval=0.5) H_base = pm.TruncatedNormal('F_air_H_base',mu=0.6,sigma=0.2,lower=0,upper=0.999,initval=0.6) B_base = pm.TruncatedNormal('F_air_B_base',mu=0.2,sigma=0.1,lower=0,initval=0.1) if rw_FA_val: period = 30 n_segments = int(np.ceil(N/period)) remainder = N % period repeat = [period] * (n_segments - 1) + ([remainder] if remainder != 0 else []) rw = pm.GaussianRandomWalk( 'rw',sigma=0.1,init_dist=pm.Normal.dist(mu=0,sigma=0.3),shape=n_segments) val_rw = pm.Deterministic('F_air_val_rw',pt.repeat(rw,repeat)) val_pct = pm.Beta('F_air_val_pct',alpha=8,beta=1,initval=0.9) else: val_rw = 0 val_pct = 0 return { 'HzP_X':HzP_X,'HzP_H':HzP_H,'HzR_B':HzR_B, 'X_base':X_base,'H_base':H_base,'B_base':B_base, 'val_rw':val_rw,'val_pct':val_pct }