from copy import deepcopy import numpy as np import pandas as pd import pymc as pm try: import plotnine as gg except: pass import psychrolib psychrolib.SetUnitSystem(psychrolib.SI) get_Enthalpy = np.vectorize(psychrolib.GetMoistAirEnthalpy) from .._model._base import BaseModel from ..components.coil import CoolingCoil,SteamCoilFs,SteamCoilFs2 from ..components.wheel import WheelS3 from ..components.mixed import Mixed def record(name,var): pm.Deterministic(f'{name}_mu',var) def observe(name,var,observed,sigma=1): mu = pm.Deterministic(f'{name}_mu',var) sigma = pm.HalfNormal(f'{name}_sigma',sigma=sigma) pm.Normal(name,mu=mu,sigma=sigma,observed=observed) class DHU_2(BaseModel): def __init__(self) -> None: super().__init__() def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, plot_TVP : bool = True ): with pm.Model() as self.MODEL_PYMC: param_prior = { 'wheel_1' : WheelS3.prior('wheel_1'), 'wheel_2' : WheelS3.prior('wheel_2'), 'coil_2' : CoolingCoil.prior('coil_2'), 'coil_3' : CoolingCoil.prior('coil_3'), 'steamcoil_1': SteamCoilFs2.prior('steamcoil_1'), 'steamcoil_2': SteamCoilFs.prior('steamcoil_2'), 'F_air' : { 'F_air_F': 12, # 新风 'F_air_B': pm.TruncatedNormal('F_air_B',mu=25,sigma=1,initval=25,lower=0.1), # 回风 'F_air_R': pm.TruncatedNormal('F_air_R',mu=36,sigma=1,initval=36,lower=0.1), # 送风 'F_air_S': pm.TruncatedNormal('F_air_S',mu=3,sigma=1,initval=3,lower=0.1), # 补风 } } res = DHU_2.model( Tin_F = input_data.coil_1_ToutA.values, Hin_F = input_data.coil_1_HoutA.values, HzP = None, HzR = None, FF_air = param_prior['F_air']['F_air_F'], FB_air = param_prior['F_air']['F_air_B'], FR_air = param_prior['F_air']['F_air_R'], FS_air = param_prior['F_air']['F_air_S'], coil_1_TinW = input_data.coil_1_TinW.values, coil_2_TinW = input_data.coil_2_TinW.values, coil_3_TinW = input_data.coil_3_TinW.values, coil_1_Val = input_data.coil_1_Val.values / 100, coil_2_Val = input_data.coil_2_Val.values / 100, coil_3_Val = input_data.coil_3_Val.values / 100, wheel_1_TinR = input_data.wheel_1_TinR.values, wheel_2_TinR = input_data.wheel_2_TinR.values, mixed_1_TinM = input_data.mixed_1_TinM.values, mixed_2_TinM = input_data.mixed_2_TinM.values, mixed_1_HinM = input_data.mixed_1_HinM.values, mixed_2_HinM = input_data.mixed_2_HinM.values, engine = 'pymc', param = param_prior ) observe('mixed_1_ToutA',res['mixed_1']['ToutA'],observed=observed_data.mixed_1_ToutA.values) observe('mixed_1_DoutA',res['mixed_1']['DoutA'],observed=observed_data.mixed_1_DoutA.values) observe('wheel_1_ToutC',res['wheel_1']['ToutC'],observed=observed_data.wheel_1_ToutC.values) observe('coil_2_ToutA',res['coil_2']['ToutA'],observed=observed_data.coil_2_ToutA.values) observe('coil_2_DoutA',res['coil_2']['DoutA'],observed=observed_data.coil_2_DoutA.values) observe('wheel_2_ToutP',res['wheel_2']['ToutP'],observed=observed_data.wheel_2_ToutP.values) observe('wheel_2_DoutP',res['wheel_2']['DoutP'],observed=observed_data.wheel_2_DoutP.values) observe('wheel_2_ToutR',res['wheel_2']['ToutR'],observed=observed_data.wheel_2_ToutR.values) observe('steamcoil_1_FP',res['steamcoil_1']['FP'],observed=observed_data.steamcoil_1_FP.values,sigma=1000) observe('steamcoil_1_Fs',res['steamcoil_1']['Fs'],observed=observed_data.steamcoil_1_Fs.values,sigma=20) observe('steamcoil_2_Fs',res['steamcoil_2']['Fs'],observed=observed_data.steamcoil_2_Fs.values,sigma=20) record('wheel_2_ToutC',res['wheel_2']['ToutC']) record('mixed_2_ToutA',res['mixed_2']['ToutA']) param_posterior = pm.find_MAP(maxeval=50000) param_posterior_reorder = {'F_air':{'F':12}} for equp_name in param_prior.keys(): param_posterior_reorder.setdefault(equp_name,{}) for param_name,param_value in param_posterior.items(): if '__' in param_name: continue if param_name.startswith(equp_name): param_name_adj = param_name.replace(f'{equp_name}_','') param_posterior_reorder[equp_name][param_name_adj] = param_value if plot_TVP: TVP_data = [] for param_name in param_posterior.keys(): if param_name.replace('_mu','') not in observed_data.columns: continue TVP_data.append( pd.DataFrame( { 'param_name': param_name.replace('_mu',''), 'real' : observed_data.loc[:,param_name.replace('_mu','')].values, 'pred' : param_posterior[param_name] } ) ) gg.options.figure_size = (10,10) plot = ( pd.concat(TVP_data,axis=0) .pipe(gg.ggplot) + gg.aes(x='real',y='pred') + gg.geom_point() + gg.facet_wrap(facets='param_name',scales='free') + gg.geom_abline(intercept=0,slope=1,color='red') ) plot.show() self.record_model( model_name = 'DHU', model = param_posterior_reorder, train_data = {'x':np.array([1])}, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) return self def predict(self,input_data:pd.DataFrame) -> dict: param_posterior = self.model_info['model_DHU'] res = DHU_2.model( Tin_F = input_data.coil_1_ToutA.values, Hin_F = input_data.coil_1_HoutA.values, HzP = None, HzR = None, FF_air = param_posterior['F_air']['F'], FB_air = param_posterior['F_air']['B'], FR_air = param_posterior['F_air']['R'], FS_air = param_posterior['F_air']['S'], coil_1_TinW = input_data.coil_1_TinW.values, coil_2_TinW = input_data.coil_2_TinW.values, coil_3_TinW = input_data.coil_3_TinW.values, coil_1_Val = input_data.coil_1_Val.values / 100, coil_2_Val = input_data.coil_2_Val.values / 100, coil_3_Val = input_data.coil_3_Val.values / 100, wheel_1_TinR = input_data.wheel_1_TinR.values, wheel_2_TinR = input_data.wheel_2_TinR.values, mixed_1_TinM = input_data.mixed_1_TinM.values, mixed_2_TinM = input_data.mixed_2_TinM.values, mixed_1_HinM = input_data.mixed_1_HinM.values, mixed_2_HinM = input_data.mixed_2_HinM.values, engine = 'numpy', param = param_posterior ) return res def predict_system(self,input_data:pd.DataFrame) -> pd.DataFrame: pred_res = self.predict(input_data) system_output = {} for equp_name,output_info in pred_res.items(): for output_name,output_value in output_info.items(): system_output[f'{equp_name}_{output_name}'] = output_value system_output = pd.DataFrame(system_output) system_output['Fs'] = system_output.steamcoil_1_Fs + system_output.steamcoil_2_Fs return system_output @classmethod def model( cls, Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 HzP, # 处理侧风机频率 HzR, # 再生侧风机频率 FF_air, FB_air, FR_air, FS_air, coil_1_TinW, # 前表冷进水温度 coil_2_TinW, # 中表冷进水温度 coil_3_TinW, # 后表冷进水温度 coil_1_Val, # 前表冷阀门开度 coil_2_Val, # 中表冷阀门开度 coil_3_Val, # 后表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 wheel_2_TinR, # 后转轮再生侧温度 mixed_1_TinM, # 回风温度(处理侧) mixed_1_HinM, # 回风湿度(处理侧) mixed_2_TinM, # 补风温度(再生侧) mixed_2_HinM, # 补风湿度(再生侧) engine, param ) -> dict: # 空气的质量流量 # FF_air = 12 # 新风 # FB_air = pm.TruncatedNormal('FB_air',mu=25,sigma=1,initval=25,lower=0.1) # 回风 # FR_air = pm.TruncatedNormal('FR_air',mu=36,sigma=1,initval=36,lower=0.1) # 送风 # FS_air = pm.TruncatedNormal('FS_air',mu=3,sigma=1,initval=3,lower=0.1) # 补风 FO_air = FF_air + FB_air + FS_air - FR_air # 排风 # 水的质量流量 coil_2_FW = coil_2_Val coil_3_FW = coil_3_Val # 前转轮 wheel_1_res = WheelS3.model( TinP = Tin_F, HinP = Hin_F, FP = FR_air - FB_air, TinR = wheel_1_TinR, HinR = 0, FR = FO_air, TinC = Tin_F, HinC = Hin_F, FC = FF_air+FB_air-FR_air, engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = Mixed.model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = FR_air - FB_air, TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = FB_air, engine = engine ) # 中表冷 coil_2_res = CoolingCoil.model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = FR_air, TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮 wheel_2_res = WheelS3.model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = FR_air, TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = FF_air+FB_air-FR_air, TinR = wheel_2_TinR, HinR = 0, FR = FO_air-FS_air, engine = engine, param = param['wheel_2'], ) # 后表冷 coil_3_res = CoolingCoil.model( TinA = wheel_2_res['ToutP'], HinA = wheel_2_res['HoutP'], FA = FR_air, TinW = coil_3_TinW, FW = coil_3_FW, engine = engine, param = param['coil_3'] ) # 后转轮湿度修正 wheel_2_res_adj = WheelS3.model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = FR_air, TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = FF_air+FB_air-FR_air, TinR = wheel_2_TinR, HinR = wheel_2_res['HoutC'], FR = FO_air-FS_air, engine = engine, param = param['wheel_2'], ) # 再生侧混风(排风) mixed_2_res = Mixed.model( TinA = wheel_2_res_adj['ToutR'], HinA = wheel_2_res_adj['HoutR'], FA = FO_air-FS_air, TinM = mixed_2_TinM, HinM = mixed_2_HinM, FM = FS_air, engine = engine ) # 前转轮湿度修正 wheel_1_res_adj = WheelS3.model( TinP = Tin_F, HinP = Hin_F, FP = FR_air - FB_air, TinR = wheel_1_TinR, HinR = mixed_2_res['HoutA'], FR = FO_air, TinC = Tin_F, HinC = Hin_F, FC = FF_air+FB_air-FR_air, engine = engine, param = param['wheel_1'] ) # 前蒸气盘管 steamcoil_1_res = SteamCoilFs2.model( TinA = mixed_2_res['ToutA'], ToutA = wheel_1_TinR, FA = FO_air, param = param['steamcoil_1'], engine = engine ) # 后蒸气盘管 steamcoil_2_res = SteamCoilFs.model( TinA = wheel_2_res_adj['ToutC'], ToutA = wheel_2_TinR, FA = FO_air-FS_air, param = param['steamcoil_2'], engine = engine ) return { 'coil_2' : coil_2_res, 'coil_3' : coil_3_res, 'wheel_1' : wheel_1_res_adj, 'wheel_2' : wheel_2_res_adj, 'mixed_1' : mixed_1_res, 'mixed_2' : mixed_2_res, 'steamcoil_1': steamcoil_1_res, 'steamcoil_2': steamcoil_2_res, 'F_air' : { 'FF_air': FF_air, 'FB_air': FB_air, 'FR_air': FR_air, 'FS_air': FS_air, 'FO_air': FO_air }, 'summary':{} }