from typing import Union import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as pt from .._base._base_device import BaseDevice from ...components import ( coil_water,coil_steam,wheel2,wheel3,mixed ) from ..utils.fit_utils import ( observe,reorder_posterior ) from ...tools.optimizer import optimizer from ...tools.data_cleaner import DataCleaner class SDHU_AB(BaseDevice): val_rw_adj_target = ('coil_3_ToutA','coil_3_DoutA') def __init__( self, DHU_type = 'A', exist_Fa_H = True, wheel_1 = None, coolingcoil_2 = 'CoolingCoil2', heatingcoil_1 = 'SteamCoil', mixed_1 = 'Mixed', mixed_2 = 'Mixed', other_info = None ) -> None: super().__init__() self.DHU_type = DHU_type.replace('SDHU_','') if self.DHU_type == 'A': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS3V3' elif self.DHU_type == 'B': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS2V2' else: raise Exception('SDHU_type must be A or B') self.components_str = { 'wheel_1' : wheel_1, 'coil_2' : coolingcoil_2, 'heatingcoil_1': heatingcoil_1, 'mixed_1' : mixed_1, 'mixed_2' : mixed_2 } self.exist_Fa_H = exist_Fa_H self.other_info = other_info if other_info is not None else {} self.record_load_info( components_str = self.components_str, DHU_type = self.DHU_type, exist_Fa_H = self.exist_Fa_H, other_info = self.other_info ) @property def components(self): comp_map = { 'WheelS2':wheel2,'WheelS3':wheel3,'CoolingCoil':coil_water, 'SteamCoil':coil_steam,'Mixed':mixed } output ={} for comp_name,comp_model in self.components_str.items(): if comp_model == 'SteamCoilVal': output[comp_name] = coil_steam.SteamCoilVal( name = comp_name, Fs_rated = self.other_info[f'{comp_name}_Fs_rated'] ) continue for comp_map_k,comp_map_v in comp_map.items(): if comp_model.startswith(comp_map_k): output[comp_name] = getattr(comp_map_v,comp_model)(name = comp_name) return output @property def model_input_data_columns(self): columns = { 'Tin_F' : 'coil_1_ToutA', 'Hin_F' : 'coil_1_HoutA', 'fan_1_Hz' : 'fan_1_Hz', 'fan_2_Hz' : 'fan_2_Hz', 'coil_2_TinW' : 'coil_2_TinW', 'coil_2_Val' : 'coil_2_Val', 'wheel_1_TinR': 'wheel_1_TinR', } if self.exist_Fa_H: columns['mixed_1_TinM'] = 'mixed_1_TinM' columns['mixed_1_HinM'] = 'mixed_1_HinM' if self.DHU_type == 'A': columns['coil_1_ToutA'] = 'coil_1_ToutA' columns['coil_1_HoutA'] = 'coil_1_HoutA' return columns @property def model_observe_data_columns(self): columns = { 'mixed_1_ToutA': 'mixed_1_ToutA', 'mixed_1_DoutA': 'mixed_1_DoutA', 'coil_2_ToutA' : 'coil_2_ToutA', 'coil_2_DoutA' : 'coil_2_DoutA', } if self.DHU_type == 'A': columns['wheel_1_ToutC'] = 'wheel_1_ToutC' # A类除湿机前转轮是三分转轮 exclude_obs = self.other_info.get('exclude_obs',[]) for col in exclude_obs: if col in columns: del columns[col] return columns def model(self,*args,**kwargs): if self.DHU_type == 'A': return model_A(*args,**kwargs) elif self.DHU_type == 'B': # return model_B(*args,**kwargs) pass else: raise ValueError('DHU_type must be A or B') def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, plot_TVP : bool = True, ): if len(input_data) < 30: raise Exception('数据量过少') with pm.Model() as self.MODEL_PYMC: param_prior = {name:comp.prior() for name,comp in self.components.items()} param_prior['F_air'] = AirFlow_SDHU_A.prior(exist_Fa_H = self.exist_Fa_H) res = self.model( **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()}, engine = 'pymc', components = self.components, param = param_prior ) for std_name,name in self.model_observe_data_columns.items(): if name not in observed_data.columns: raise Exception(f'Missing column: {name}') observed_data = observed_data.rename(columns={name:std_name}) std_name_equp,std_name_point = std_name.rsplit('_',1) sigma = 1 observe( name = std_name, var = res[std_name_equp][std_name_point], observed = observed_data, sigma = sigma ) self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False) self.record_load_info(param_posterior = self.param_posterior) self.record_model( model_name = 'ATD', model = reorder_posterior(param_prior,self.param_posterior), train_data = { 'wheel_1_TinR': observed_data.loc[:,'wheel_1_TinR'].values, 'fan_2_Hz' : observed_data.loc[:,'wheel_2_TinR'].values, 'coil_2_DoutA': observed_data.loc[:,'coil_2_DoutA'].values, }, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) self.TVP_data = self.get_TVP(self.param_posterior,observed_data) self.TVP_metric = self.get_metric(self.TVP_data) if plot_TVP: self.plot_TVP(self.TVP_data).show() return self @property def F_air_val_rw(self): return None def set_F_air_val_rw(self,value:float): return self def clean_data( self, data : pd.DataFrame, data_type : list=['input','observed'], print_process: bool = True, fill_zero : bool = False, save_log : Union[str,None] = None ) -> pd.DataFrame: data = data.replace(-9999,np.nan) clean_data = DataCleaner(data,print_process=print_process) if 'input' in data_type: clean_data = ( clean_data .rm_rolling_fluct(window=60,fun='ptp',thre=0.1,include_cols=['State']) .rm_rule('State != 1') .rm_rule('fan_1_Hz < 10').rm_rule('fan_2_Hz < 10') .rm_outrange(method='raw',upper=140,lower=20,include_cols=['wheel_1_TinR']) ) if 'observed' in data_type: pass clean_data = clean_data.get_data( fill = 0 if fill_zero else None, save_log = save_log ) return clean_data def optimize( self, cur_input_data: pd.DataFrame, wheel_1_TinR : tuple = (70,120), fan_2_Hz : tuple = (30,50), constrains : list = None, logging : bool = True, target : str = 'summary_Fs', target_min : bool = True ) -> list: constrains = [] if constrains is None else constrains cur_input_data = cur_input_data.iloc[[0],:] opt_var_boundary = {} if wheel_1_TinR is not None: opt_var_boundary['wheel_1_TinR'] = {'lb':min(wheel_1_TinR),'ub':max(wheel_1_TinR)} if fan_2_Hz is not None: opt_var_boundary['fan_2_Hz'] = {'lb':min(fan_2_Hz),'ub':max(fan_2_Hz)} opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())] oth_var_value = ( cur_input_data .loc[:,list(set(self.model_input_data_columns.values()))] .drop(opt_var_value.columns,axis=1) ) opt_res = optimizer( model = self, opt_var_boundary = opt_var_boundary, opt_var_value = opt_var_value, oth_var_value = oth_var_value, target = target, target_min = target_min, constrains = constrains, logging = logging, other_kwargs = {'NIND':2000,'MAXGEN':50} ) return opt_res def plot_opt( self, cur_input_data: pd.DataFrame, target_min : str = 'summary_waste', coil_2_DoutA : tuple = None ): if coil_2_DoutA is None: coil_2_DoutA = ( self.model_info['model_train_info_ATD']['coil_2_DoutA_min'], self.model_info['model_train_info_ATD']['coil_2_DoutA_max'] ) data_input = ( pd.MultiIndex.from_product( [ np.linspace( self.model_info['model_train_info_ATD']['wheel_1_TinR_min']-5, self.model_info['model_train_info_ATD']['wheel_1_TinR_max']+5, 1000 ), np.linspace( self.model_info['model_train_info_ATD']['fan_2_Hz_min']-5, self.model_info['model_train_info_ATD']['fan_2_Hz_max']+5, 1000 ), ], names=['wheel_1_TinR','fan_2_Hz'] ) .to_frame(index=False) ) for col in cur_input_data.columns: if col in data_input.columns: continue data_input[col] = cur_input_data.loc[:,col].iat[0] data_output = self.predict_system(data_input) data = ( data_output .assign( wheel_1_TinR = data_input.loc[:,'wheel_1_TinR'], fan_2_Hz = data_input.loc[:,'fan_2_Hz'], ) .assign(coil_2_DoutA=lambda dt:dt.coil_2_DoutA.round(1)) .loc[lambda dt:dt.coil_2_DoutA.between(*(min(coil_2_DoutA),max(coil_2_DoutA)))] .loc[lambda dt:dt.groupby('coil_2_DoutA')[target_min].idxmin()] .loc[lambda dt:dt.coil_2_DoutA.mod(0.5)==0] ) import plotnine as gg plot = ( data .pipe(gg.ggplot) + gg.aes(x='wheel_1_TinR',y='fan_2_Hz') + gg.geom_path(size=1) + gg.geom_point() + gg.geom_label(gg.aes(label='coil_2_DoutA')) + gg.geom_abline(slope=1,intercept=0,color='red',linetype='--') ) return plot def plot_check(self,cur_input_data:pd.DataFrame) -> dict: pa1=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_DoutP') pa2=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_ToutP') pa3=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_EFF') pb1=self.curve(input_data=cur_input_data,x='fan_2_Hz',y='wheel_1_DoutP') pb2=self.curve(input_data=cur_input_data,x='fan_2_Hz',y='wheel_1_ToutP') pb3=self.curve(input_data=cur_input_data,x='fan_2_Hz',y='wheel_1_EFF') plot1 = (pa1|pa2|pa3)/(pb1|pb2|pb3) return {'plot1':plot1} def model_A( Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 coil_1_ToutA, coil_1_HoutA, fan_1_Hz, # 处理侧风机频率 fan_2_Hz, # 再生侧风机频率 coil_2_TinW, # 中表冷进水温度 coil_2_Val, # 中表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 engine : str, components: dict, param : dict, mixed_1_TinM = 0, # 回风温度(处理侧) mixed_1_HinM = 0, # 回风湿度(处理侧) ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 # 空气的质量流量 air_flow = AirFlow_SDHU_A.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param) # 前转轮 wheel_1_res = components['wheel_1'].model( TinP = coil_1_ToutA, HinP = coil_1_HoutA, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = 0, FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = components['mixed_1'].model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = air_flow['mixed_1_FaA'], TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = air_flow['mixed_1_FaM'], engine = engine ) # 中表冷 coil_2_res = components['coil_2'].model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = air_flow['coil_2_FaA'], TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮湿度修正 wheel_1_res_adj = components['wheel_1'].model( TinP = coil_1_ToutA, HinP = coil_1_HoutA, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = wheel_1_res['HoutC'], FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 前再生加热盘管 heatingcoil_1_res = components['heatingcoil_1'].model( TinA = wheel_1_res_adj['ToutC'], ToutA = wheel_1_TinR, FA = air_flow['heatingcoil_1_Fa'], param = param['heatingcoil_1'], engine = engine ) waste = cal_Q_waste( wheel_1_res = wheel_1_res_adj, heatingcoil_1_res = heatingcoil_1_res, wheel_1_TinR = wheel_1_TinR, fan_2_Hz = fan_2_Hz ) return { 'coil_2' : coil_2_res, 'wheel_1' : wheel_1_res_adj, 'mixed_1' : mixed_1_res, 'heatingcoil_1': heatingcoil_1_res, 'Fa' : air_flow, 'summary' : { 'Fs' : heatingcoil_1_res['Fs'], **waste, } } class AirFlow_SDHU_A: @classmethod def model(cls,fan_1_Hz,fan_2_Hz,param): F_air_X2_base = 1 # 进入转轮处理侧的新风量 F_air_H_base = param['F_air'].get('H_base',0) F_air_P_base = param['F_air']['P_base'] Fa_H = F_air_H_base + (fan_1_Hz/50) * param['F_air'].get('HzP_H',0) Fa_X2 = F_air_X2_base + (fan_1_Hz/50) * param['F_air']['HzP_X2'] Fa_S = Fa_H + Fa_X2 Fa_P = F_air_P_base + (fan_2_Hz/50) * param['F_air']['HzR_P'] return { 'wheel_1_FaP' : Fa_X2, 'wheel_1_FaC' : Fa_P, 'wheel_1_FaR' : Fa_P, 'coil_2_FaA' : Fa_S, 'mixed_1_FaM' : Fa_H, 'mixed_1_FaA' : Fa_X2, 'heatingcoil_1_Fa': Fa_P } @classmethod def prior(cls,exist_Fa_H): param = {} param['HzP_X2'] = pm.HalfNormal('F_air_HzP_X2',sigma=1,initval=0.5) param['HzR_P'] = pm.HalfNormal('F_air_HzR_P',sigma=1,initval=0.5) param['P_base'] = pm.TruncatedNormal('F_air_P_base',mu=1,sigma=0.2,lower=0,initval=1) if exist_Fa_H: param['H_base'] = pm.TruncatedNormal('F_air_H_base',mu=1,sigma=0.2,lower=0,initval=1) param['HzP_H'] = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.5) return param def cal_Q_waste( wheel_1_res, heatingcoil_1_res, wheel_1_TinR, fan_2_Hz ) -> dict: def waste_cond_func1(TinR): waste = 0.15 + 0.0001 * (TinR-70)**3 return np.where(waste>0,waste,0) def waste_cond_func2(TinR): waste = 0.25 * (1 - np.exp(-0.04 * (TinR - 70))) return np.where(waste>0,waste,0) heatingcoil_1_Q = heatingcoil_1_res['Q'] heatingcoil_1_Q = np.where(heatingcoil_1_Q>0,heatingcoil_1_Q,0) waste_Qsen1 = wheel_1_res['Qsen'] waste_cond1 = heatingcoil_1_Q * waste_cond_func1(wheel_1_TinR) res = { 'waste_Qsen1': waste_Qsen1, 'waste_cond1': waste_cond1, } waste_out = heatingcoil_1_Q - wheel_1_res['Qsen'] - wheel_1_res['Qlat'] waste_out = np.where(waste_out>0,waste_out,0) waste = waste_Qsen1 + waste_cond1 + waste_out + fan_2_Hz/100 res['waste_out'] = waste_out res['waste'] = waste return res