from typing import Union import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as pt from .._base._base_device import BaseDevice from ...components import ( coil_water,coil_steam,wheel2,wheel3,mixed ) from ..utils.fit_utils import ( observe,reorder_posterior ) from ...tools.optimizer import optimizer from ...tools.data_cleaner import DataCleaner class DHU_AB(BaseDevice): val_rw_adj_target = ('coil_2_DoutA','coil_3_DoutA') def __init__( self, DHU_type = 'A', exist_Fa_H = True, exist_Fa_B = True, wheel_1 = None, wheel_2 = None, coolingcoil_2 = 'CoolingCoil2', coolingcoil_3 = 'CoolingCoil2', heatingcoil_1 = 'SteamCoil', heatingcoil_2 = 'SteamCoil', mixed_1 = 'Mixed', mixed_2 = 'Mixed', other_info = None ) -> None: super().__init__() self.DHU_type = DHU_type.replace('DHU_','') if self.DHU_type == 'A': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS3V3' wheel_2 = wheel_2 if wheel_2 is not None else 'WheelS3V2' elif self.DHU_type == 'B': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS2V2' wheel_2 = wheel_2 if wheel_2 is not None else 'WheelS3V3' else: raise Exception('DHU_type must be A or B') self.components_str = { 'wheel_1' : wheel_1, 'wheel_2' : wheel_2, 'coil_2' : coolingcoil_2, 'coil_3' : coolingcoil_3, 'heatingcoil_1': heatingcoil_1, 'heatingcoil_2': heatingcoil_2, 'mixed_1' : mixed_1, 'mixed_2' : mixed_2 } self.exist_Fa_H = exist_Fa_H self.exist_Fa_B = exist_Fa_B self.other_info = other_info if other_info is not None else {} self.record_load_info( components_str = self.components_str, DHU_type = self.DHU_type, exist_Fa_H = self.exist_Fa_H, exist_Fa_B = self.exist_Fa_B, other_info = self.other_info ) @property def components(self): comp_map = { 'WheelS2':wheel2,'WheelS3':wheel3,'CoolingCoil':coil_water, 'SteamCoil':coil_steam,'Mixed':mixed } output ={} for comp_name,comp_model in self.components_str.items(): if comp_model == 'SteamCoilVal': output[comp_name] = coil_steam.SteamCoilVal( name = comp_name, Fs_rated = self.other_info[f'{comp_name}_Fs_rated'] ) continue for comp_map_k,comp_map_v in comp_map.items(): if comp_model.startswith(comp_map_k): output[comp_name] = getattr(comp_map_v,comp_model)(name = comp_name) return output @property def model_input_data_columns(self): columns = { 'Tin_F' : 'coil_1_ToutA', 'Hin_F' : 'coil_1_HoutA', 'fan_1_Hz' : 'fan_1_Hz', 'fan_2_Hz' : 'fan_2_Hz', 'coil_1_TinW' : 'coil_1_TinW', 'coil_2_TinW' : 'coil_2_TinW', 'coil_3_TinW' : 'coil_3_TinW', 'coil_1_Val' : 'coil_1_Val', 'coil_2_Val' : 'coil_2_Val', 'coil_3_Val' : 'coil_3_Val', 'wheel_1_TinR': 'wheel_1_TinR', 'wheel_2_TinR': 'wheel_2_TinR', } if self.exist_Fa_H: columns['mixed_1_TinM'] = 'mixed_1_TinM' columns['mixed_1_HinM'] = 'mixed_1_HinM' if self.exist_Fa_B: columns['mixed_2_TinM'] = 'mixed_2_TinM' columns['mixed_2_HinM'] = 'mixed_2_HinM' return columns @property def model_observe_data_columns(self): columns = { 'mixed_1_ToutA': 'mixed_1_ToutA', 'mixed_1_DoutA': 'mixed_1_DoutA', 'coil_2_ToutA' : 'coil_2_ToutA', 'coil_2_DoutA' : 'coil_2_DoutA', 'wheel_2_ToutP': 'wheel_2_ToutP', 'wheel_2_DoutP': 'wheel_2_DoutP', 'mixed_2_ToutA': 'mixed_2_ToutA', # 涉及前再生加热盘管的热量 'wheel_2_ToutC': 'wheel_2_ToutC', # 涉及后再生加热盘管的热量 } if self.DHU_type == 'A': columns['wheel_1_ToutC'] = 'wheel_1_ToutC' # A类除湿机前转轮是三分转轮 for idx in [1,2]: heatingcoil_idx = f'heatingcoil_{idx}' if isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilFs2): columns[f'{heatingcoil_idx}_FP'] = f'{heatingcoil_idx}_FP' columns[f'{heatingcoil_idx}_Fs'] = f'{heatingcoil_idx}_Fs' elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilFs): columns[f'{heatingcoil_idx}_Fs'] = f'{heatingcoil_idx}_Fs' elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoil): # columns['wheel_2_ToutC'] = 'wheel_2_ToutC' # columns['wheel_1_ToutR'] = 'wheel_1_ToutR' # columns['mixed_2_ToutA'] = 'mixed_2_ToutA' pass elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilVal): columns[f'{heatingcoil_idx}_Val'] = f'{heatingcoil_idx}_Val' else: raise Exception('WRONG') exclude_obs = self.other_info.get('exclude_obs',[]) for col in exclude_obs: if col in columns: del columns[col] return columns def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, rw_FA_val : bool = False, plot_TVP : bool = True, ): if len(input_data) < 30: raise Exception('数据量过少') with pm.Model() as self.MODEL_PYMC: param_prior = {name:comp.prior() for name,comp in self.components.items()} param_prior['F_air'] = AirFlow_DHU_AB.prior( rw_FA_val = rw_FA_val, N = len(input_data), exist_Fa_H = self.exist_Fa_H, exist_Fa_B = self.exist_Fa_B ) res = self.model( **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()}, engine = 'pymc', components = self.components, param = param_prior ) for std_name,name in self.model_observe_data_columns.items(): if name not in observed_data.columns: raise Exception(f'Missing column: {name}') observed_data = observed_data.rename(columns={name:std_name}) std_name_equp,std_name_point = std_name.rsplit('_',1) sigma = { 'wheel_2_DoutP' : 0.3, 'heatingcoil_1_Fs': 20, 'heatingcoil_2_Fs': 20, 'heatingcoil_1_FP': 10000, 'heatingcoil_2_FP': 10000, } if std_name in ['heatingcoil_1_Val','heatingcoil_2_Val']: sigma = res[std_name_equp]['sigma'] else: sigma = { 'wheel_2_DoutP' : 0.3, 'heatingcoil_1_Fs': 20, 'heatingcoil_2_Fs': 20, 'heatingcoil_1_FP': 10000, 'heatingcoil_2_FP': 10000, }.get(std_name,1) observe( name = std_name, var = res[std_name_equp][std_name_point], observed = observed_data, sigma = sigma ) self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False) self.record_load_info( param_posterior = self.param_posterior ) self.record_model( model_name = 'ATD', model = reorder_posterior(param_prior,self.param_posterior), train_data = {'x':np.array([1])}, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) self.TVP_data = self.get_TVP(self.param_posterior,observed_data) self.TVP_metric = self.get_metric(self.TVP_data) if plot_TVP: self.plot_TVP(self.TVP_data).show() return self @property def F_air_val_rw(self): return self.model_info['model_ATD']['F_air']['val_rw'] def set_F_air_val_rw(self,value:float): self.model_info['model_ATD']['F_air']['val_rw'] = value return self def clean_data( self, data : pd.DataFrame, data_type : list=['input','observed'], print_process: bool = True, fill_zero : bool = False, save_log : Union[str,None] = None ) -> pd.DataFrame: data = data.replace(-9999,np.nan) clean_data = DataCleaner(data,print_process=print_process) if 'input' in data_type: clean_data = ( clean_data .rm_rolling_fluct(window=60,fun='ptp',thre=0.1,include_cols=['State']) .rm_rule('State != 1') .rm_outrange(method='raw',upper=140,lower=20,include_cols=['wheel_1_TinR','wheel_2_TinR']) ) if self.DHU_type == 'A': clean_data = ( clean_data .rm_rule('wheel_1_ToutC<=coil_1_ToutA') ) if 'observed' in data_type: pass clean_data = clean_data.get_data( fill = 0 if fill_zero else None, save_log = save_log ) return clean_data def optimize( self, cur_input_data: pd.DataFrame, wheel_1_TinR : tuple = (70,120), wheel_2_TinR : tuple = (70,120), fan_2_Hz : tuple = (30,50), constrains : list = None, logging : bool = True, target : str = 'summary_Fs', target_min : bool = True ) -> list: constrains = [] if constrains is None else constrains cur_input_data = cur_input_data.iloc[[0],:] opt_var_boundary = {} if wheel_1_TinR is not None: opt_var_boundary['wheel_1_TinR'] = {'lb':min(wheel_1_TinR),'ub':max(wheel_1_TinR)} if wheel_2_TinR is not None: opt_var_boundary['wheel_2_TinR'] = {'lb':min(wheel_2_TinR),'ub':max(wheel_2_TinR)} if fan_2_Hz is not None: opt_var_boundary['fan_2_Hz'] = {'lb':min(fan_2_Hz),'ub':max(fan_2_Hz)} opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())] oth_var_value = ( cur_input_data .loc[:,list(self.model_input_data_columns.values())] .drop(opt_var_value.columns,axis=1) ) opt_res = optimizer( model = self, opt_var_boundary = opt_var_boundary, opt_var_value = opt_var_value, oth_var_value = oth_var_value, target = target, target_min = target_min, constrains = constrains, logging = logging, other_kwargs = {'NIND':2000,'MAXGEN':50} ) return opt_res def model(self,*args,**kwargs): if self.DHU_type == 'A': return model_A(*args,**kwargs) elif self.DHU_type == 'B': return model_B(*args,**kwargs) else: raise ValueError('DHU_type must be A or B') def plot_opt( self, cur_input_data: pd.DataFrame, target_min : str = 'summary_waste', ): data_input = ( pd.MultiIndex.from_product( [ np.linspace(70,120,1000), np.linspace(70,120,1000), ], names=['wheel_1_TinR','wheel_2_TinR'] ) .to_frame(index=False) ) for col in cur_input_data.columns: if col in data_input.columns: continue data_input[col] = cur_input_data.loc[:,col].iat[0] data_output = self.predict_system(data_input) data = ( data_output .assign( wheel_1_TinR = data_input.loc[:,'wheel_1_TinR'], wheel_2_TinR = data_input.loc[:,'wheel_2_TinR'], ) .assign(coil_3_DoutA=lambda dt:dt.coil_3_DoutA.round(1)) .loc[lambda dt:dt.groupby('coil_3_DoutA')[target_min].idxmin()] .loc[lambda dt:dt.coil_3_DoutA.mod(1)==0] ) import plotnine as gg plot = ( data .pipe(gg.ggplot) + gg.aes(x='wheel_1_TinR',y='wheel_2_TinR') + gg.geom_path(size=1) + gg.geom_point() + gg.geom_label(gg.aes(label='coil_3_DoutA')) + gg.geom_abline(slope=1,intercept=0,color='red',linetype='--') ) return plot def plot_check(self,cur_input_data:pd.DataFrame) -> dict: pa1=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_DoutP') pa2=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_ToutP') pa3=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_EFF') pb1=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_DoutP') pb2=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_ToutP') pb3=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_EFF') plot_EFF = (pa1|pa2|pa3)/(pb1|pb2|pb3) p1=self.curve(x='wheel_1_TinR',y='summary_waste_cond1',input_data=cur_input_data) p2=self.curve(x='wheel_2_TinR',y='summary_waste_cond2',input_data=cur_input_data) p3=self.curve(x='wheel_1_TinR',y='summary_waste_Qsen1',input_data=cur_input_data) p4=self.curve(x='wheel_2_TinR',y='summary_waste_Qsen2',input_data=cur_input_data) p5=self.curve(x='wheel_1_TinR',y='summary_waste_out',input_data=cur_input_data) p6=self.curve(x='wheel_2_TinR',y='summary_waste_out',input_data=cur_input_data) plot_waste = (p1|p3|p5)/(p2|p4|p6) return {'plot_EFF':plot_EFF,'plot_waste':plot_waste} def model_A( Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 fan_1_Hz, # 处理侧风机频率 fan_2_Hz, # 再生侧风机频率 coil_1_TinW, # 前表冷进水温度 coil_2_TinW, # 中表冷进水温度 coil_3_TinW, # 后表冷进水温度 coil_1_Val, # 前表冷阀门开度 coil_2_Val, # 中表冷阀门开度 coil_3_Val, # 后表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 wheel_2_TinR, # 后转轮再生侧温度 engine : str, components: dict, param : dict, mixed_1_TinM = 0, # 回风温度(处理侧) mixed_1_HinM = 0, # 回风湿度(处理侧) mixed_2_TinM = 0, # 补风温度(再生侧) mixed_2_HinM = 0, # 补风湿度(再生侧) ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 coil_3_FW = coil_3_Val / 100 # 空气的质量流量 air_flow = AirFlow_DHU_AB.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param,type='DHU_A') # 前转轮 wheel_1_res = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = 0, FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = components['mixed_1'].model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = air_flow['mixed_1_FaA'], TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = air_flow['mixed_1_FaM'], engine = engine ) # 中表冷 coil_2_res = components['coil_2'].model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = air_flow['coil_2_FaA'], TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮 wheel_2_res = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = 0, FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 后表冷 coil_3_res = components['coil_3'].model( TinA = wheel_2_res['ToutP'], HinA = wheel_2_res['HoutP'], FA = air_flow['coil_3_FaA'], TinW = coil_3_TinW, FW = coil_3_FW, engine = engine, param = param['coil_3'] ) # 后转轮湿度修正 wheel_2_res_adj = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = wheel_1_res['ToutC'], HinC = wheel_1_res['HoutC'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = wheel_2_res['HoutC'], FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 再生侧混风(排风) mixed_2_res = components['mixed_2'].model( TinA = wheel_2_res_adj['ToutR'], HinA = wheel_2_res_adj['HoutR'], FA = air_flow['mixed_2_FaA'], TinM = mixed_2_TinM, HinM = mixed_2_HinM, FM = air_flow['mixed_2_FaM'], engine = engine ) # 前转轮湿度修正 wheel_1_res_adj = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = mixed_2_res['HoutA'], FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 前再生加热盘管 heatingcoil_1_res = components['heatingcoil_1'].model( TinA = mixed_2_res['ToutA'], ToutA = wheel_1_TinR, FA = air_flow['heatingcoil_1_Fa'], param = param['heatingcoil_1'], engine = engine ) # 后再生加热盘管 heatingcoil_2_res = components['heatingcoil_2'].model( TinA = wheel_2_res_adj['ToutC'], ToutA = wheel_2_TinR, FA = air_flow['heatingcoil_2_Fa'], param = param['heatingcoil_2'], engine = engine ) waste = cal_Q_waste( wheel_1_res = wheel_1_res_adj, wheel_2_res = wheel_2_res_adj, heatingcoil_1_res = heatingcoil_1_res, heatingcoil_2_res = heatingcoil_2_res, wheel_1_TinR = wheel_1_TinR, wheel_2_TinR = wheel_2_TinR ) return { 'coil_2' : coil_2_res, 'coil_3' : coil_3_res, 'wheel_1' : wheel_1_res_adj, 'wheel_2' : wheel_2_res_adj, 'mixed_1' : mixed_1_res, 'mixed_2' : mixed_2_res, 'heatingcoil_1': heatingcoil_1_res, 'heatingcoil_2': heatingcoil_2_res, 'Fa' : air_flow, 'summary' : { 'Fs' : heatingcoil_1_res['Fs'] + heatingcoil_2_res['Fs'], **waste, } } def model_B( Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 fan_1_Hz, # 处理侧风机频率 fan_2_Hz, # 再生侧风机频率 coil_1_TinW, # 前表冷进水温度 coil_2_TinW, # 中表冷进水温度 coil_3_TinW, # 后表冷进水温度 coil_1_Val, # 前表冷阀门开度 coil_2_Val, # 中表冷阀门开度 coil_3_Val, # 后表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 wheel_2_TinR, # 后转轮再生侧温度 engine : str, components: dict, param : dict, mixed_1_TinM = 0, # 回风温度(处理侧) mixed_1_HinM = 0, # 回风湿度(处理侧) mixed_2_TinM = 0, # 补风温度(再生侧) mixed_2_HinM = 0, # 补风湿度(再生侧) ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 coil_3_FW = coil_3_Val / 100 # 空气的质量流量 air_flow = AirFlow_DHU_AB.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param,type='DHU_B') # 前转轮 wheel_1_res = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = 0, FR = air_flow['wheel_1_FaR'], engine = engine, param = param['wheel_1'], ) # 处理侧混风(回风) mixed_1_res = components['mixed_1'].model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = air_flow['mixed_1_FaA'], TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = air_flow['mixed_1_FaM'], engine = engine ) # 中表冷 coil_2_res = components['coil_2'].model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = air_flow['coil_2_FaA'], TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮 wheel_2_res = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = mixed_1_res['ToutA'], HinC = mixed_1_res['HoutA'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = 0, FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 后表冷 coil_3_res = components['coil_3'].model( TinA = wheel_2_res['ToutP'], HinA = wheel_2_res['HoutP'], FA = air_flow['coil_3_FaA'], TinW = coil_3_TinW, FW = coil_3_FW, engine = engine, param = param['coil_3'] ) # 后转轮湿度修正 wheel_2_res_adj = components['wheel_2'].model( TinP = coil_2_res['ToutA'], HinP = coil_2_res['HoutA'], FP = air_flow['wheel_2_FaP'], TinC = mixed_1_res['ToutA'], HinC = mixed_1_res['HoutA'], FC = air_flow['wheel_2_FaC'], TinR = wheel_2_TinR, HinR = wheel_2_res['HoutC'], FR = air_flow['wheel_2_FaR'], engine = engine, param = param['wheel_2'], ) # 再生侧混风(排风) mixed_2_res = components['mixed_2'].model( TinA = wheel_2_res_adj['ToutR'], HinA = wheel_2_res_adj['HoutR'], FA = air_flow['mixed_2_FaA'], TinM = mixed_2_TinM, HinM = mixed_2_HinM, FM = air_flow['mixed_2_FaM'], engine = engine ) # 前转轮湿度修正 wheel_1_res_adj = components['wheel_1'].model( TinP = Tin_F, HinP = Hin_F, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = mixed_2_res['HoutA'], FR = air_flow['wheel_1_FaR'], engine = engine, param = param['wheel_1'], ) # 前蒸气盘管 heatingcoil_1_res = components['heatingcoil_1'].model( TinA = mixed_2_res['ToutA'], ToutA = wheel_1_TinR, FA = air_flow['heatingcoil_1_Fa'], param = param['heatingcoil_1'], engine = engine ) # 后蒸气盘管 heatingcoil_2_res = components['heatingcoil_2'].model( TinA = wheel_2_res_adj['ToutC'], ToutA = wheel_2_TinR, FA = air_flow['heatingcoil_2_Fa'], param = param['heatingcoil_2'], engine = engine ) waste = cal_Q_waste( wheel_1_res = wheel_1_res_adj, wheel_2_res = wheel_2_res_adj, heatingcoil_1_res = heatingcoil_1_res, heatingcoil_2_res = heatingcoil_2_res, wheel_1_TinR = wheel_1_TinR, wheel_2_TinR = wheel_2_TinR ) return { 'coil_2' : coil_2_res, 'coil_3' : coil_3_res, 'wheel_1' : wheel_1_res_adj, 'wheel_2' : wheel_2_res_adj, 'mixed_1' : mixed_1_res, 'mixed_2' : mixed_2_res, 'heatingcoil_1': heatingcoil_1_res, 'heatingcoil_2': heatingcoil_2_res, 'Fa' : air_flow, 'summary' : { 'Fs' : heatingcoil_1_res['Fs'] + heatingcoil_2_res['Fs'], **waste } } class AirFlow_DHU_AB: @classmethod def model(cls,fan_1_Hz,fan_2_Hz,param,type): # 当定频风机固定的时候,各出入口处的基准的风量 F_air_S_base = 1 F_air_X_base = param['F_air']['X_base'] F_air_H_base = param['F_air'].get('H_base',0) F_air_B_base = param['F_air'].get('B_base',0) F_air_val_rw = param['F_air'].get('val_rw',0) F_air_val_pct = param['F_air'].get('val_pct',0) # 新风阀的变化造成的基准风量变化 F_air_S_base_adj = F_air_S_base F_air_X_base_adj = F_air_X_base + F_air_val_rw F_air_H_base_adj = F_air_H_base - F_air_val_rw * F_air_val_pct if 'H_base' in param['F_air'] else 0 F_air_B_base_adj = F_air_B_base - F_air_val_rw * (1 - F_air_val_pct) if 'B_base' in param['F_air'] else 0 # 考虑风机频率变化对风量的影响,得到最终风量 F_air_HzP_X = param['F_air']['HzP_X'] F_air_HzP_H = param['F_air'].get('HzP_H',0) F_air_HzP_S = F_air_HzP_X + F_air_HzP_H F_air_HzR_B = param['F_air'].get('HzR_B',0) Fa_S = F_air_S_base_adj + F_air_HzP_S * (fan_1_Hz / 50) Fa_H = F_air_H_base_adj + F_air_HzP_H * (fan_1_Hz / 50) Fa_X = F_air_X_base_adj + F_air_HzP_X * (fan_1_Hz / 50) Fa_B = F_air_B_base_adj + F_air_HzR_B * (fan_2_Hz / 50) Fa_P = Fa_B + Fa_X + Fa_H - Fa_S if type == 'DHU_A': wheel_1_FaP = Fa_S - Fa_H wheel_1_FaC = Fa_X - wheel_1_FaP wheel_1_FaR = Fa_P wheel_2_FaP = Fa_S wheel_2_FaC = wheel_1_FaC wheel_2_FaR = wheel_1_FaC mixed_1_FaM = Fa_H mixed_1_FaA = wheel_1_FaP mixed_2_FaM = Fa_B mixed_2_FaA = wheel_1_FaC coil_2_FaA = Fa_S coil_3_FaA = Fa_S heatingcoil_1_Fa = Fa_P heatingcoil_2_Fa = wheel_1_FaC elif type == 'DHU_B': wheel_1_FaP = Fa_X wheel_1_FaC = np.nan wheel_1_FaR = Fa_P wheel_2_FaP = Fa_S wheel_2_FaC = Fa_X + Fa_H - Fa_S wheel_2_FaR = wheel_2_FaC mixed_1_FaM = Fa_H mixed_1_FaA = Fa_X mixed_2_FaM = Fa_B mixed_2_FaA = wheel_2_FaC coil_2_FaA = Fa_S coil_3_FaA = Fa_S heatingcoil_1_Fa = Fa_P heatingcoil_2_Fa = wheel_2_FaC else: raise Exception('type error') return { 'Fa_S':Fa_S,'Fa_H':Fa_H,'Fa_X':Fa_X,'Fa_B':Fa_B,'Fa_P':Fa_P, 'wheel_1_FaP':wheel_1_FaP,'wheel_1_FaC':wheel_1_FaC,'wheel_1_FaR':wheel_1_FaR, 'wheel_2_FaP':wheel_2_FaP,'wheel_2_FaC':wheel_2_FaC,'wheel_2_FaR':wheel_2_FaR, 'mixed_1_FaM':mixed_1_FaM,'mixed_1_FaA':mixed_1_FaA, 'mixed_2_FaM':mixed_2_FaM,'mixed_2_FaA':mixed_2_FaA, 'coil_2_FaA':coil_2_FaA,'coil_3_FaA':coil_3_FaA, 'heatingcoil_1_Fa':heatingcoil_1_Fa,'heatingcoil_2_Fa':heatingcoil_2_Fa } @classmethod def prior( cls, rw_FA_val : bool, N : int, exist_Fa_H: bool, exist_Fa_B: bool ) -> dict: param = {} # 新风参数 param['HzP_X'] = pm.HalfNormal('F_air_HzP_X',sigma=1,initval=1) param['X_base'] = pm.TruncatedNormal('F_air_X_base',mu=0.5,sigma=0.2,lower=0,initval=0.5) if exist_Fa_H: param['HzP_H'] = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.1) param['H_base'] = pm.TruncatedNormal('F_air_H_base',mu=0.6,sigma=0.2,lower=0,upper=0.999,initval=0.6) if exist_Fa_B: param['HzR_B'] = pm.HalfNormal('F_air_HzR_B',sigma=1,initval=0.5) param['B_base'] = pm.TruncatedNormal('F_air_B_base',mu=0.2,sigma=0.1,lower=0,initval=0.1) if rw_FA_val: period = 48 n_segments = int(np.ceil(N/period)) remainder = N % period repeat = [period] * (n_segments - 1) + ([remainder] if remainder != 0 else []) rw = pm.GaussianRandomWalk( 'rw',sigma=0.1,init_dist=pm.Normal.dist(mu=0,sigma=0.3),shape=n_segments) param['val_rw'] = pm.Deterministic('F_air_val_rw',pt.repeat(rw,repeat)) param['val_pct'] = pm.Beta('F_air_val_pct',alpha=8,beta=1,initval=0.9) # param['val_pct'] = pm.Dirichlet('F_air_val_pct',np.array([0.1,0.1,0.8]),initval=np.array([0.1,0.1,0.8])) else: param['val_rw'] = 0 param['val_pct'] = 0 return param def cal_Q_waste( wheel_1_res, wheel_2_res, heatingcoil_1_res, heatingcoil_2_res, wheel_1_TinR, wheel_2_TinR ): waste_Qsen1 = wheel_1_res['Qsen'] waste_Qsen2 = wheel_2_res['Qsen'] waste_cond1 = heatingcoil_1_res['Q'] * (0.15 + 0.0001 * (wheel_1_TinR-70)**2) waste_cond2 = heatingcoil_2_res['Q'] * (0.15 + 0.0001 * (wheel_2_TinR-70)**2) waste_out = ( heatingcoil_1_res['Q'] + heatingcoil_2_res['Q'] - wheel_1_res['Qsen'] - wheel_1_res['Qlat'] - wheel_2_res['Qsen'] - wheel_2_res['Qlat'] ) return { 'waste_Qsen1': waste_Qsen1, 'waste_Qsen2': waste_Qsen2, 'waste_Qout' : waste_out, 'waste_cond1': waste_cond1, 'waste_cond2': waste_cond2, 'waste_out' : waste_out, 'waste' : waste_Qsen1+waste_cond2+waste_cond1+waste_cond2+waste_out, }