import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as pt from sklearn.metrics import r2_score,mean_absolute_error,mean_absolute_percentage_error try: import plotnine as gg except: pass from .._model._base import BaseModel from ..components.coil import CoolingCoil2,SteamCoilFs,SteamCoilFs2,SteamCoilFs3 from ..components.wheel import WheelS3 from ..components.mixed import Mixed def record(name,var): pm.Deterministic(f'{name}_mu',var) def observe(name,var,observed,sigma=1): mu = pm.Deterministic(f'{name}_mu',var) sigma = pm.HalfNormal(f'{name}_sigma',sigma=sigma) pm.Normal(name,mu=mu,sigma=sigma,observed=observed) class DHU_3(BaseModel): def __init__(self) -> None: super().__init__() def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, rw_FA_val : bool = False, plot_TVP : bool = True ): with pm.Model() as self.MODEL_PYMC: param_prior = { 'wheel_1' : WheelS3.prior('wheel_1'), 'wheel_2' : WheelS3.prior('wheel_2'), 'coil_2' : CoolingCoil2.prior('coil_2'), 'coil_3' : CoolingCoil2.prior('coil_3'), 'steamcoil_1': SteamCoilFs2.prior('steamcoil_1'), 'steamcoil_2': SteamCoilFs.prior('steamcoil_2'), 'F_air' : { 'HzP_X' : pm.HalfNormal('F_air_HzP_X',sigma=1,initval=1), 'HzP_H' : pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.1), 'HzR_B' : pm.HalfNormal('F_air_HzR_B',sigma=1,initval=0.5), 'X_base': pm.TruncatedNormal('F_air_X_base',mu=0.5,sigma=0.2,lower=0,initval=0.5), 'H_base': pm.TruncatedNormal('F_air_H_base',mu=0.6,sigma=0.2,lower=0,upper=0.999,initval=0.6), 'B_base': pm.TruncatedNormal('F_air_B_base',mu=0.2,sigma=0.1,lower=0,initval=0.1), }, 'mixed_1':{}, 'mixed_2':{} } if rw_FA_val: N = len(input_data) period = 30 n_segments = int(np.ceil(N/period)) remainder = N % period repeat = [period] * (n_segments - 1) + ([remainder] if remainder != 0 else []) rw = pm.GaussianRandomWalk( 'rw',sigma=0.1,init_dist=pm.Normal.dist(mu=0,sigma=0.3),shape=n_segments) param_prior['F_air']['val_rw'] = pm.Deterministic('F_air_val_rw',pt.repeat(rw,repeat)) param_prior['F_air']['val_pct'] = pm.Beta('F_air_val_pct',alpha=8,beta=1,initval=0.9) res = DHU_3.model( Tin_F = input_data.coil_1_ToutA.values, Hin_F = input_data.coil_1_HoutA.values, HzP = input_data.HzP.values, HzR = input_data.HzR.values, coil_1_TinW = input_data.coil_1_TinW.values, coil_2_TinW = input_data.coil_2_TinW.values, coil_3_TinW = input_data.coil_3_TinW.values, coil_1_Val = input_data.coil_1_Val.values, coil_2_Val = input_data.coil_2_Val.values, coil_3_Val = input_data.coil_3_Val.values, wheel_1_TinR = input_data.wheel_1_TinR.values, wheel_2_TinR = input_data.wheel_2_TinR.values, mixed_1_TinM = input_data.mixed_1_TinM.values, mixed_2_TinM = input_data.mixed_2_TinM.values, mixed_1_HinM = input_data.mixed_1_HinM.values, mixed_2_HinM = input_data.mixed_2_HinM.values, engine = 'pymc', param = param_prior ) observe('mixed_1_ToutA',res['mixed_1']['ToutA'],observed=observed_data.mixed_1_ToutA.values) observe('mixed_1_DoutA',res['mixed_1']['DoutA'],observed=observed_data.mixed_1_DoutA.values) observe('wheel_1_ToutC',res['wheel_1']['ToutC'],observed=observed_data.wheel_1_ToutC.values) observe('coil_2_ToutA',res['coil_2']['ToutA'],observed=observed_data.coil_2_ToutA.values) observe('coil_2_DoutA',res['coil_2']['DoutA'],observed=observed_data.coil_2_DoutA.values) observe('wheel_2_ToutP',res['wheel_2']['ToutP'],observed=observed_data.wheel_2_ToutP.values) observe('wheel_2_DoutP',res['wheel_2']['DoutP'],observed=observed_data.wheel_2_DoutP.values) observe('wheel_2_ToutR',res['wheel_2']['ToutR'],observed=observed_data.wheel_2_ToutR.values) observe('steamcoil_1_FP',res['steamcoil_1']['FP'],observed=observed_data.steamcoil_1_FP.values,sigma=1000) observe('steamcoil_1_Fs',res['steamcoil_1']['Fs'],observed=observed_data.steamcoil_1_Fs.values,sigma=20) observe('steamcoil_2_Fs',res['steamcoil_2']['Fs'],observed=observed_data.steamcoil_2_Fs.values,sigma=20) record('wheel_2_ToutC',res['wheel_2']['ToutC']) record('mixed_2_ToutA',res['mixed_2']['ToutA']) record('wheel_1_FaP',res['wheel_1']['FP']) record('wheel_1_FaR',res['wheel_1']['FR']) record('wheel_1_FaC',res['wheel_1']['FC']) record('mixed_1_FaA',res['mixed_1']['FA']) record('mixed_1_FaM',res['mixed_1']['FM']) record('F_air_S',res['Fa']['Fa_S']) record('F_air_H',res['Fa']['Fa_H']) record('F_air_X',res['Fa']['Fa_X']) self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False) param_posterior_reorder = {'F_air':{}} for equp_name in param_prior.keys(): param_posterior_reorder.setdefault(equp_name,{}) for param_name,param_value in self.param_posterior.items(): if '__' in param_name: continue if param_name == 'F_air_val_rw': param_value = np.median(param_value[-5:]) if param_name.startswith(equp_name): param_name_adj = param_name.replace(f'{equp_name}_','') param_posterior_reorder[equp_name][param_name_adj] = param_value self.record_model( model_name = 'DHU', model = param_posterior_reorder, train_data = {'x':np.array([1])}, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) # 样本内预测数据 TVP_data = [] for param_name in self.param_posterior.keys(): if param_name.replace('_mu','') not in observed_data.columns: continue TVP_data.append( pd.DataFrame( { 'param_name': param_name.replace('_mu',''), 'real' : observed_data.loc[:,param_name.replace('_mu','')].values, 'pred' : self.param_posterior[param_name] } ) ) self.TVP_data = pd.concat(TVP_data,axis=0) group_by_data = self.TVP_data.groupby(['param_name'])[['pred','real']] self.TVP_metric = ( pd.concat( [ group_by_data.apply(lambda dt:r2_score(dt.real,dt.pred)), group_by_data.apply(lambda dt:mean_absolute_error(dt.real,dt.pred)), group_by_data.apply(lambda dt:mean_absolute_percentage_error(dt.real,dt.pred)), ], axis=1 ) .set_axis(['R2','MAE','MAPE'],axis=1) .sort_values(by='R2',ascending=True) ) if plot_TVP: gg.options.figure_size = (10,10) plot = ( self.TVP_data .pipe(gg.ggplot) + gg.aes(x='real',y='pred') + gg.geom_point() + gg.facet_wrap(facets='param_name',scales='free') + gg.geom_abline(intercept=0,slope=1,color='red') ) plot.show() return self def predict(self,input_data:pd.DataFrame) -> dict: param_posterior = self.model_info['model_DHU'] res = DHU_3.model( Tin_F = input_data.coil_1_ToutA.values, Hin_F = input_data.coil_1_HoutA.values, HzP = input_data.HzP.values, HzR = input_data.HzR.values, coil_1_TinW = input_data.coil_1_TinW.values, coil_2_TinW = input_data.coil_2_TinW.values, coil_3_TinW = input_data.coil_3_TinW.values, coil_1_Val = input_data.coil_1_Val.values, coil_2_Val = input_data.coil_2_Val.values, coil_3_Val = input_data.coil_3_Val.values, wheel_1_TinR = input_data.wheel_1_TinR.values, wheel_2_TinR = input_data.wheel_2_TinR.values, mixed_1_TinM = input_data.mixed_1_TinM.values, mixed_2_TinM = input_data.mixed_2_TinM.values, mixed_1_HinM = input_data.mixed_1_HinM.values, mixed_2_HinM = input_data.mixed_2_HinM.values, engine = 'numpy', param = param_posterior ) return res def predict_system(self,input_data:pd.DataFrame) -> pd.DataFrame: pred_res = self.predict(input_data) system_output = {} for equp_name,output_info in pred_res.items(): for output_name,output_value in output_info.items(): system_output[f'{equp_name}_{output_name}'] = output_value system_output = pd.DataFrame(system_output) system_output['Fs'] = system_output.steamcoil_1_Fs + system_output.steamcoil_2_Fs return system_output @classmethod def model( cls, Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 HzP, # 处理侧风机频率 HzR, # 再生侧风机频率 coil_1_TinW, # 前表冷进水温度 coil_2_TinW, # 中表冷进水温度 coil_3_TinW, # 后表冷进水温度 coil_1_Val, # 前表冷阀门开度 coil_2_Val, # 中表冷阀门开度 coil_3_Val, # 后表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 wheel_2_TinR, # 后转轮再生侧温度 mixed_1_TinM, # 回风温度(处理侧) mixed_1_HinM, # 回风湿度(处理侧) mixed_2_TinM, # 补风温度(再生侧) mixed_2_HinM, # 补风湿度(再生侧) engine, param ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 coil_3_FW = coil_3_Val / 100 # 空气的质量流量 F_air_HzP_H = param['F_air']['HzP_H'] F_air_HzP_X = param['F_air']['HzP_X'] F_air_HzP_S = F_air_HzP_H + F_air_HzP_X F_air_HzR_B = param['F_air']['HzR_B'] F_air_S_base = 1 F_air_X_base = param['F_air']['X_base'] F_air_H_base = param['F_air']['H_base'] F_air_B_base = param['F_air']['B_base'] F_air_val_rw = param['F_air'].get('val_rw',0) F_air_val_pct = param['F_air'].get('val_pct',0) F_air_X_base_adj = F_air_X_base + F_air_val_rw F_air_H_base_adj = F_air_H_base - F_air_val_rw * F_air_val_pct F_air_B_base_adj = F_air_B_base - F_air_val_rw * (1 - F_air_val_pct) Fa_S = F_air_S_base + F_air_HzP_S * (HzP / 50) Fa_H = F_air_H_base_adj + F_air_HzP_H * (HzP / 50) Fa_X = F_air_X_base_adj + F_air_HzP_X * (HzP / 50) Fa_B = F_air_B_base_adj + F_air_HzR_B * (HzR / 50) Fa_P = Fa_B + Fa_X + Fa_H - Fa_S wheel_1_FaP = Fa_S - Fa_H wheel_1_FaC = Fa_X - wheel_1_FaP wheel_1_FaR = Fa_P wheel_2_FaP = Fa_S wheel_2_FaC = wheel_1_FaC wheel_2_FaR = wheel_1_FaC mixed_1_FaM = Fa_H mixed_1_FaA = wheel_1_FaP mixed_2_FaM = Fa_B mixed_2_FaA = wheel_1_FaC coil_2_FaA = Fa_S coil_3_FaA = Fa_S steamcoil_1_Fa = Fa_P steamcoil_2_Fa = wheel_1_FaC # 前转轮 wheel_1_res = WheelS3.model( TinP = Tin_F, HinP = Hin_F, FP = wheel_1_FaP, TinR = wheel_1_TinR, HinR = 0, FR = wheel_1_FaR, TinC = Tin_F, HinC = Hin_F, FC = wheel_1_FaC, engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = Mixed.model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = mixed_1_FaA, TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = mixed_1_FaM, engine = engine ) # 中表冷 coil_2_res = CoolingCoil2.model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = coil_2_FaA, TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮 wheel_2_res = WheelS3.model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = wheel_2_FaP, TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = wheel_2_FaC, TinR = wheel_2_TinR, HinR = 0, FR = wheel_2_FaR, engine = engine, param = param['wheel_2'], ) # 后表冷 coil_3_res = CoolingCoil2.model( TinA = wheel_2_res['ToutP'], HinA = wheel_2_res['HoutP'], FA = coil_3_FaA, TinW = coil_3_TinW, FW = coil_3_FW, engine = engine, param = param['coil_3'] ) # 后转轮湿度修正 wheel_2_res_adj = WheelS3.model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = wheel_2_FaP, TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = wheel_2_FaC, TinR = wheel_2_TinR, HinR = wheel_2_res['HoutC'], FR = wheel_2_FaR, engine = engine, param = param['wheel_2'], ) # 再生侧混风(排风) mixed_2_res = Mixed.model( TinA = wheel_2_res_adj['ToutR'], HinA = wheel_2_res_adj['HoutR'], FA = mixed_2_FaA, TinM = mixed_2_TinM, HinM = mixed_2_HinM, FM = mixed_2_FaM, engine = engine ) # 前转轮湿度修正 wheel_1_res_adj = WheelS3.model( TinP = Tin_F, HinP = Hin_F, FP = wheel_1_FaP, TinR = wheel_1_TinR, HinR = mixed_2_res['HoutA'], FR = wheel_1_FaR, TinC = Tin_F, HinC = Hin_F, FC = wheel_1_FaC, engine = engine, param = param['wheel_1'] ) # 前蒸气盘管 steamcoil_1_res = SteamCoilFs2.model( TinA = mixed_2_res['ToutA'], ToutA = wheel_1_TinR, FA = steamcoil_1_Fa, param = param['steamcoil_1'], engine = engine ) # steamcoil_1_res = SteamCoilFs3.model( # TinA = mixed_2_res['ToutA'], # ToutA = wheel_1_TinR, # HinA = mixed_2_res['DoutA'], # HoutA = mixed_2_res['HoutA'], # FA = steamcoil_1_Fa, # param = param['steamcoil_1'], # engine = engine # ) # 后蒸气盘管 steamcoil_2_res = SteamCoilFs.model( TinA = wheel_2_res_adj['ToutC'], ToutA = wheel_2_TinR, FA = steamcoil_2_Fa, param = param['steamcoil_2'], engine = engine ) return { 'coil_2' : coil_2_res, 'coil_3' : coil_3_res, 'wheel_1' : wheel_1_res_adj, 'wheel_2' : wheel_2_res_adj, 'mixed_1' : mixed_1_res, 'mixed_2' : mixed_2_res, 'steamcoil_1': steamcoil_1_res, 'steamcoil_2': steamcoil_2_res, 'Fa':{ 'Fa_S': Fa_S, 'Fa_H': Fa_H, 'Fa_X': Fa_X, 'Fa_P': Fa_P, 'Fa_B': Fa_B, }, 'summary' : {} }