from typing import Union import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as pt from .._base._base_device import BaseDevice from ...components import ( coil_water,coil_steam,wheel2,wheel3,mixed ) from ..utils.fit_utils import ( observe,reorder_posterior ) from ...tools.optimizer import optimizer from ...tools.data_cleaner import DataCleaner class SDHU_AB(BaseDevice): val_rw_adj_target = ('coil_3_ToutA','coil_3_DoutA') def __init__( self, DHU_type = 'A', exist_Fa_H = True, wheel_1 = None, coolingcoil_2 = 'CoolingCoil2', heatingcoil_1 = 'SteamCoil', mixed_1 = 'Mixed', mixed_2 = 'Mixed', other_info = None ) -> None: super().__init__() self.DHU_type = DHU_type.replace('SDHU_','') if self.DHU_type == 'A': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS3V3' elif self.DHU_type == 'B': wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS2V2' else: raise Exception('SDHU_type must be A or B') self.components_str = { 'wheel_1' : wheel_1, 'coil_2' : coolingcoil_2, 'heatingcoil_1': heatingcoil_1, 'mixed_1' : mixed_1, 'mixed_2' : mixed_2 } self.exist_Fa_H = exist_Fa_H self.other_info = other_info if other_info is not None else {} self.record_load_info( components_str = self.components_str, DHU_type = self.DHU_type, exist_Fa_H = self.exist_Fa_H, other_info = self.other_info ) @property def components(self): comp_map = { 'WheelS2':wheel2,'WheelS3':wheel3,'CoolingCoil':coil_water, 'SteamCoil':coil_steam,'Mixed':mixed } output ={} for comp_name,comp_model in self.components_str.items(): if comp_model == 'SteamCoilVal': output[comp_name] = coil_steam.SteamCoilVal( name = comp_name, Fs_rated = self.other_info[f'{comp_name}_Fs_rated'] ) continue for comp_map_k,comp_map_v in comp_map.items(): if comp_model.startswith(comp_map_k): output[comp_name] = getattr(comp_map_v,comp_model)(name = comp_name) return output @property def model_input_data_columns(self): columns = { 'Tin_F' : 'coil_1_ToutA', 'Hin_F' : 'coil_1_HoutA', 'fan_1_Hz' : 'fan_1_Hz', 'fan_2_Hz' : 'fan_2_Hz', 'coil_2_TinW' : 'coil_2_TinW', 'coil_2_Val' : 'coil_2_Val', 'wheel_1_TinR': 'wheel_1_TinR', } if self.exist_Fa_H: columns['mixed_1_TinM'] = 'mixed_1_TinM' columns['mixed_1_HinM'] = 'mixed_1_HinM' if self.DHU_type == 'A': columns['coil_1_ToutA'] = 'coil_1_ToutA' columns['coil_1_HoutA'] = 'coil_1_HoutA' return columns @property def model_observe_data_columns(self): columns = { 'mixed_1_ToutA': 'mixed_1_ToutA', 'mixed_1_DoutA': 'mixed_1_DoutA', 'coil_2_ToutA' : 'coil_2_ToutA', 'coil_2_DoutA' : 'coil_2_DoutA', } if self.DHU_type == 'A': columns['wheel_1_ToutC'] = 'wheel_1_ToutC' # A类除湿机前转轮是三分转轮 exclude_obs = self.other_info.get('exclude_obs',[]) for col in exclude_obs: if col in columns: del columns[col] return columns def model(self,*args,**kwargs): if self.DHU_type == 'A': return model_A(*args,**kwargs) elif self.DHU_type == 'B': # return model_B(*args,**kwargs) pass else: raise ValueError('DHU_type must be A or B') def fit( self, input_data : pd.DataFrame, observed_data: pd.DataFrame, plot_TVP : bool = True, ): if len(input_data) < 30: raise Exception('数据量过少') with pm.Model() as self.MODEL_PYMC: param_prior = {name:comp.prior() for name,comp in self.components.items()} param_prior['F_air'] = AirFlow_SDHU_A.prior(exist_Fa_H = self.exist_Fa_H) res = self.model( **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()}, engine = 'pymc', components = self.components, param = param_prior ) for std_name,name in self.model_observe_data_columns.items(): if name not in observed_data.columns: raise Exception(f'Missing column: {name}') observed_data = observed_data.rename(columns={name:std_name}) std_name_equp,std_name_point = std_name.rsplit('_',1) sigma = 1 observe( name = std_name, var = res[std_name_equp][std_name_point], observed = observed_data, sigma = sigma ) self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False) self.record_load_info(param_posterior = self.param_posterior) self.record_model( model_name = 'ATD', model = reorder_posterior(param_prior,self.param_posterior), train_data = {'x':np.array([1])}, train_metric = {'R2':1,'MAE':1,'MAPE':1} ) self.TVP_data = self.get_TVP(self.param_posterior,observed_data) self.TVP_metric = self.get_metric(self.TVP_data) if plot_TVP: self.plot_TVP(self.TVP_data).show() return self @property def F_air_val_rw(self): return None def set_F_air_val_rw(self,value:float): return self def clean_data( self, data : pd.DataFrame, data_type : list=['input','observed'], print_process: bool = True, fill_zero : bool = False, save_log : Union[str,None] = None ) -> pd.DataFrame: data = data.replace(-9999,np.nan) clean_data = DataCleaner(data,print_process=print_process) if 'input' in data_type: clean_data = ( clean_data .rm_rolling_fluct(window=60,fun='ptp',thre=0.1,include_cols=['State']) .rm_rule('State != 1') .rm_rule('fan_1_Hz < 10').rm_rule('fan_2_Hz < 10') .rm_outrange(method='raw',upper=140,lower=20,include_cols=['wheel_1_TinR']) ) if 'observed' in data_type: pass clean_data = clean_data.get_data( fill = 0 if fill_zero else None, save_log = save_log ) return clean_data # def optimize( # self, # cur_input_data: pd.DataFrame, # wheel_1_TinR : tuple = (70,120), # wheel_2_TinR : tuple = (70,120), # fan_2_Hz : tuple = (30,50), # constrains : list = None, # logging : bool = True, # target : str = 'summary_Fs', # target_min : bool = True # ) -> list: # constrains = [] if constrains is None else constrains # cur_input_data = cur_input_data.iloc[[0],:] # opt_var_boundary = {} # if wheel_1_TinR is not None: # opt_var_boundary['wheel_1_TinR'] = {'lb':min(wheel_1_TinR),'ub':max(wheel_1_TinR)} # if wheel_2_TinR is not None: # opt_var_boundary['wheel_2_TinR'] = {'lb':min(wheel_2_TinR),'ub':max(wheel_2_TinR)} # if fan_2_Hz is not None: # opt_var_boundary['fan_2_Hz'] = {'lb':min(fan_2_Hz),'ub':max(fan_2_Hz)} # opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())] # oth_var_value = ( # cur_input_data # .loc[:,list(self.model_input_data_columns.values())] # .drop(opt_var_value.columns,axis=1) # ) # opt_res = optimizer( # model = self, # opt_var_boundary = opt_var_boundary, # opt_var_value = opt_var_value, # oth_var_value = oth_var_value, # target = target, # target_min = target_min, # constrains = constrains, # logging = logging, # other_kwargs = {'NIND':2000,'MAXGEN':50} # ) # return opt_res # def plot_opt( # self, # cur_input_data: pd.DataFrame, # target_min : str = 'summary_waste', # ): # data_input = ( # pd.MultiIndex.from_product( # [ # np.linspace(70,120,1000), # np.linspace(70,120,1000), # ], # names=['wheel_1_TinR','wheel_2_TinR'] # ) # .to_frame(index=False) # ) # for col in cur_input_data.columns: # if col in data_input.columns: # continue # data_input[col] = cur_input_data.loc[:,col].iat[0] # data_output = self.predict_system(data_input) # data = ( # data_output # .assign( # wheel_1_TinR = data_input.loc[:,'wheel_1_TinR'], # wheel_2_TinR = data_input.loc[:,'wheel_2_TinR'], # ) # .assign(coil_3_DoutA=lambda dt:dt.coil_3_DoutA.round(1)) # .loc[lambda dt:dt.groupby('coil_3_DoutA')[target_min].idxmin()] # .loc[lambda dt:dt.coil_3_DoutA.mod(1)==0] # ) # import plotnine as gg # plot = ( # data # .pipe(gg.ggplot) # + gg.aes(x='wheel_1_TinR',y='wheel_2_TinR') # + gg.geom_path(size=1) # + gg.geom_point() # + gg.geom_label(gg.aes(label='coil_3_DoutA')) # + gg.geom_abline(slope=1,intercept=0,color='red',linetype='--') # ) # return plot # def plot_check(self,cur_input_data:pd.DataFrame) -> dict: # pa1=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_DoutP') # pa2=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_ToutP') # pa3=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_EFF') # pb1=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_DoutP') # pb2=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_ToutP') # pb3=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_EFF') # plot1 = (pa1|pa2|pa3)/(pb1|pb2|pb3) # return {'plot1':plot1} def model_A( Tin_F, # 前表冷后温度 Hin_F, # 前表冷后湿度 coil_1_ToutA, coil_1_HoutA, fan_1_Hz, # 处理侧风机频率 fan_2_Hz, # 再生侧风机频率 coil_2_TinW, # 中表冷进水温度 coil_2_Val, # 中表冷阀门开度 wheel_1_TinR, # 前转轮再生侧温度 engine : str, components: dict, param : dict, mixed_1_TinM = 0, # 回风温度(处理侧) mixed_1_HinM = 0, # 回风湿度(处理侧) ) -> dict: # 水的质量流量 coil_2_FW = coil_2_Val / 100 # 空气的质量流量 air_flow = AirFlow_SDHU_A.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param) # 前转轮 wheel_1_res = components['wheel_1'].model( TinP = coil_1_ToutA, HinP = coil_1_HoutA, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = 0, FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 处理侧混风(回风) mixed_1_res = components['mixed_1'].model( TinA = wheel_1_res['ToutP'], HinA = wheel_1_res['HoutP'], FA = air_flow['mixed_1_FaA'], TinM = mixed_1_TinM, HinM = mixed_1_HinM, FM = air_flow['mixed_1_FaM'], engine = engine ) # 中表冷 coil_2_res = components['coil_2'].model( TinA = mixed_1_res['ToutA'], HinA = mixed_1_res['HoutA'], FA = air_flow['coil_2_FaA'], TinW = coil_2_TinW, FW = coil_2_FW, engine = engine, param = param['coil_2'] ) # 后转轮湿度修正 wheel_1_res_adj = components['wheel_1'].model( TinP = coil_1_ToutA, HinP = coil_1_HoutA, FP = air_flow['wheel_1_FaP'], TinR = wheel_1_TinR, HinR = wheel_1_res['HoutC'], FR = air_flow['wheel_1_FaR'], TinC = Tin_F, HinC = Hin_F, FC = air_flow['wheel_1_FaC'], engine = engine, param = param['wheel_1'] ) # 前再生加热盘管 heatingcoil_1_res = components['heatingcoil_1'].model( TinA = wheel_1_res_adj['ToutC'], ToutA = wheel_1_TinR, FA = air_flow['heatingcoil_1_Fa'], param = param['heatingcoil_1'], engine = engine ) # waste = cal_Q_waste( # wheel_1_res = wheel_1_res_adj, # wheel_2_res = wheel_2_res_adj, # heatingcoil_1_res = heatingcoil_1_res, # heatingcoil_2_res = heatingcoil_2_res, # wheel_1_TinR = wheel_1_TinR, # wheel_2_TinR = wheel_2_TinR # ) return { 'coil_2' : coil_2_res, 'wheel_1' : wheel_1_res_adj, 'mixed_1' : mixed_1_res, 'heatingcoil_1': heatingcoil_1_res, 'Fa' : air_flow, 'summary' : { 'Fs' : heatingcoil_1_res['Fs'], # **waste, } } class AirFlow_SDHU_A: @classmethod def model(cls,fan_1_Hz,fan_2_Hz,param): F_air_X2_base = 1 # 进入转轮处理侧的新风量 F_air_H_base = param['F_air'].get('H_base',0) F_air_P_base = param['F_air']['P_base'] Fa_H = F_air_H_base + (fan_1_Hz/50) * param['F_air'].get('HzP_H',0) Fa_X2 = F_air_X2_base + (fan_1_Hz/50) * param['F_air']['HzP_X2'] Fa_S = Fa_H + Fa_X2 Fa_P = F_air_P_base + (fan_2_Hz/50) * param['F_air']['HzR_P'] return { 'wheel_1_FaP' : Fa_X2, 'wheel_1_FaC' : Fa_P, 'wheel_1_FaR' : Fa_P, 'coil_2_FaA' : Fa_S, 'mixed_1_FaM' : Fa_H, 'mixed_1_FaA' : Fa_X2, 'heatingcoil_1_Fa': Fa_P } @classmethod def prior(cls,exist_Fa_H): param = {} param['HzP_X2'] = pm.HalfNormal('F_air_HzP_X2',sigma=1,initval=0.5) param['HzR_P'] = pm.HalfNormal('F_air_HzR_P',sigma=1,initval=0.5) param['P_base'] = pm.TruncatedNormal('F_air_P_base',mu=1,sigma=0.2,lower=0,initval=1) if exist_Fa_H: param['H_base'] = pm.TruncatedNormal('F_air_H_base',mu=1,sigma=0.2,lower=0,initval=1) param['HzP_H'] = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.5) return param def cal_Q_waste( wheel_1_res, wheel_2_res, heatingcoil_1_res, heatingcoil_2_res, wheel_1_TinR, wheel_2_TinR ): waste_Qsen1 = wheel_1_res['Qsen'] waste_Qsen2 = wheel_2_res['Qsen'] waste_cond1 = heatingcoil_1_res['Q'] * (0.15 + 0.0001 * (wheel_1_TinR-70)**2) waste_cond2 = heatingcoil_2_res['Q'] * (0.15 + 0.0001 * (wheel_2_TinR-70)**2) waste_out = ( heatingcoil_1_res['Q'] + heatingcoil_2_res['Q'] - wheel_1_res['Qsen'] - wheel_1_res['Qlat'] - wheel_2_res['Qsen'] - wheel_2_res['Qlat'] ) return { 'waste_Qsen1': waste_Qsen1, 'waste_Qsen2': waste_Qsen2, 'waste_Qout' : waste_out, 'waste_cond1': waste_cond1, 'waste_cond2': waste_cond2, 'waste_out' : waste_out, 'waste' : waste_Qsen1+waste_cond2+waste_cond1+waste_cond2+waste_out, }