| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907 |
- from typing import Union
- import numpy as np
- import pandas as pd
- import pymc as pm
- import pytensor.tensor as pt
- from .._base._base_device import BaseDevice
- from ...components import (
- coil_water,coil_steam,wheel2,wheel3,mixed
- )
- from ..utils.fit_utils import (
- observe,reorder_posterior
- )
- from ...tools.optimizer import optimizer
- from ...tools.data_cleaner import DataCleaner
- class DHU_AB(BaseDevice):
-
- val_rw_adj_target = ('coil_2_DoutA','wheel_2_DoutP')
-
- def __init__(
- self,
- DHU_type = 'A',
- exist_Fa_H = True,
- exist_Fa_B = True,
- wheel_1 = None,
- wheel_2 = None,
- coolingcoil_2 = 'CoolingCoil2',
- coolingcoil_3 = 'CoolingCoil2',
- heatingcoil_1 = 'SteamCoil',
- heatingcoil_2 = 'SteamCoil',
- mixed_1 = 'Mixed',
- mixed_2 = 'Mixed',
- other_info = None
- ) -> None:
- super().__init__()
- self.DHU_type = DHU_type.replace('DHU_','')
- if self.DHU_type == 'A':
- wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS3V3'
- wheel_2 = wheel_2 if wheel_2 is not None else 'WheelS3V2'
- elif self.DHU_type == 'B':
- wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS2V2'
- wheel_2 = wheel_2 if wheel_2 is not None else 'WheelS3V2'
- else:
- raise Exception('DHU_type must be A or B')
-
- self.components_str = {
- 'wheel_1' : wheel_1,
- 'wheel_2' : wheel_2,
- 'coil_2' : coolingcoil_2,
- 'coil_3' : coolingcoil_3,
- 'heatingcoil_1': heatingcoil_1,
- 'heatingcoil_2': heatingcoil_2,
- 'mixed_1' : mixed_1,
- 'mixed_2' : mixed_2
- }
- self.exist_Fa_H = exist_Fa_H
- self.exist_Fa_B = exist_Fa_B
- self.other_info = other_info if other_info is not None else {}
- self.record_load_info(
- components_str = self.components_str,
- DHU_type = self.DHU_type,
- exist_Fa_H = self.exist_Fa_H,
- exist_Fa_B = self.exist_Fa_B,
- other_info = self.other_info
- )
-
- @property
- def components(self):
- comp_map = {
- 'WheelS2':wheel2,'WheelS3':wheel3,'CoolingCoil':coil_water,
- 'SteamCoil':coil_steam,'Mixed':mixed
- }
- output ={}
- for comp_name,comp_model in self.components_str.items():
- if comp_model == 'SteamCoilVal':
- output[comp_name] = coil_steam.SteamCoilVal(
- name = comp_name,
- Fs_rated = self.other_info[f'{comp_name}_Fs_rated']
- )
- continue
- for comp_map_k,comp_map_v in comp_map.items():
- if comp_model.startswith(comp_map_k):
- output[comp_name] = getattr(comp_map_v,comp_model)(name = comp_name)
- return output
-
- @property
- def model_input_data_columns(self):
- columns = {
- 'Tin_F' : 'coil_1_ToutA',
- 'Hin_F' : 'coil_1_HoutA',
- 'fan_1_Hz' : 'fan_1_Hz',
- 'fan_2_Hz' : 'fan_2_Hz',
- 'coil_1_TinW' : 'coil_1_TinW',
- 'coil_2_TinW' : 'coil_2_TinW',
- 'coil_3_TinW' : 'coil_3_TinW',
- 'coil_1_Val' : 'coil_1_Val',
- 'coil_2_Val' : 'coil_2_Val',
- 'coil_3_Val' : 'coil_3_Val',
- 'wheel_1_TinR': 'wheel_1_TinR',
- 'wheel_2_TinR': 'wheel_2_TinR',
- }
- if self.exist_Fa_H:
- columns['mixed_1_TinM'] = 'mixed_1_TinM'
- columns['mixed_1_HinM'] = 'mixed_1_HinM'
- if self.exist_Fa_B:
- columns['mixed_2_TinM'] = 'mixed_2_TinM'
- columns['mixed_2_HinM'] = 'mixed_2_HinM'
- return columns
-
- @property
- def model_observe_data_columns(self):
- columns = {
- 'mixed_1_ToutA': 'mixed_1_ToutA',
- 'mixed_1_DoutA': 'mixed_1_DoutA',
- 'coil_2_ToutA' : 'coil_2_ToutA',
- 'coil_2_DoutA' : 'coil_2_DoutA',
- 'wheel_2_ToutP': 'wheel_2_ToutP',
- 'wheel_2_DoutP': 'wheel_2_DoutP',
- 'wheel_2_ToutC': 'wheel_2_ToutC', # 涉及后再生加热盘管的热量
- }
- if self.DHU_type == 'A':
- columns['wheel_1_ToutC'] = 'wheel_1_ToutC' # A类除湿机前转轮是三分转轮
- if self.exist_Fa_B:
- columns['mixed_2_ToutA'] = 'mixed_2_ToutA'
-
- for idx in [1,2]:
- heatingcoil_idx = f'heatingcoil_{idx}'
- if isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilFs2):
- columns[f'{heatingcoil_idx}_FP'] = f'{heatingcoil_idx}_FP'
- columns[f'{heatingcoil_idx}_Fs'] = f'{heatingcoil_idx}_Fs'
- elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilFs):
- columns[f'{heatingcoil_idx}_Fs'] = f'{heatingcoil_idx}_Fs'
- elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoil):
- # columns['wheel_2_ToutC'] = 'wheel_2_ToutC'
- # columns['wheel_1_ToutR'] = 'wheel_1_ToutR'
- # columns['mixed_2_ToutA'] = 'mixed_2_ToutA'
- pass
- elif isinstance(self.components[heatingcoil_idx],coil_steam.SteamCoilVal):
- columns[f'{heatingcoil_idx}_Val'] = f'{heatingcoil_idx}_Val'
- else:
- raise Exception('WRONG')
-
- exclude_obs = self.other_info.get('exclude_obs',[])
- for col in exclude_obs:
- if col in columns:
- del columns[col]
- return columns
-
- def fit(
- self,
- input_data : pd.DataFrame,
- observed_data: pd.DataFrame,
- rw_FA_val : bool = False,
- plot_TVP : bool = True,
- ):
- if len(input_data) < 30:
- raise Exception('数据量过少')
-
- with pm.Model() as self.MODEL_PYMC:
- param_prior = {name:comp.prior() for name,comp in self.components.items()}
- param_prior['F_air'] = AirFlow_DHU_AB.prior(
- rw_FA_val = rw_FA_val,
- N = len(input_data),
- exist_Fa_H = self.exist_Fa_H,
- exist_Fa_B = self.exist_Fa_B
- )
-
- res = self.model(
- **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()},
- engine = 'pymc',
- components = self.components,
- param = param_prior
- )
- for std_name,name in self.model_observe_data_columns.items():
- if name not in observed_data.columns:
- raise Exception(f'Missing column: {name}')
- observed_data = observed_data.rename(columns={name:std_name})
-
- std_name_equp,std_name_point = std_name.rsplit('_',1)
- sigma = {
- 'wheel_2_DoutP' : 0.3,
- 'heatingcoil_1_Fs': 20,
- 'heatingcoil_2_Fs': 20,
- 'heatingcoil_1_FP': 10000,
- 'heatingcoil_2_FP': 10000,
- }
- if std_name in ['heatingcoil_1_Val','heatingcoil_2_Val']:
- sigma = res[std_name_equp]['sigma']
- else:
- sigma = {
- 'wheel_2_DoutP' : 0.3,
- 'heatingcoil_1_Fs': 20,
- 'heatingcoil_2_Fs': 20,
- 'heatingcoil_1_FP': 10000,
- 'heatingcoil_2_FP': 10000,
- }.get(std_name,10)
-
- observe(
- name = std_name,
- var = res[std_name_equp][std_name_point],
- observed = observed_data,
- sigma = sigma
- )
-
- self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False)
-
- self.record_load_info(
- param_posterior = self.param_posterior
- )
- self.record_model(
- model_name = 'ATD',
- model = reorder_posterior(param_prior,self.param_posterior),
- train_data = {
- 'wheel_1_TinR' : observed_data.loc[:,'wheel_1_TinR'].values,
- 'wheel_2_TinR' : observed_data.loc[:,'wheel_2_TinR'].values,
- 'wheel_2_DoutP': observed_data.loc[:,'wheel_2_DoutP'].values,
- },
- train_metric = {'R2':1,'MAE':1,'MAPE':1}
- )
- self.TVP_data = self.get_TVP(self.param_posterior,observed_data)
- self.TVP_metric = self.get_metric(self.TVP_data)
- if plot_TVP:
- self.plot_TVP(self.TVP_data).show()
- return self
-
- @property
- def F_air_val_rw(self):
- return self.model_info['model_ATD']['F_air']['val_rw']
-
- def set_F_air_val_rw(self,value:float):
- self.model_info['model_ATD']['F_air']['val_rw'] = value
- return self
-
- def clean_data(
- self,
- data : pd.DataFrame,
- data_type : list=['input','observed'],
- print_process: bool = True,
- fill_zero : bool = False,
- save_log : Union[str,None] = None
- ) -> pd.DataFrame:
-
- data = data.replace(-9999,np.nan)
- clean_data = DataCleaner(data,print_process=print_process)
-
- filter_columns = []
- if 'input' in data_type:
- filter_columns += list(self.model_input_data_columns.values())
- clean_data = (
- clean_data
- .rm_rolling_fluct(window=60,fun='ptp',thre=0.1,include_cols=['State'])
- .rm_rule('State != 1')
- .rm_outrange(method='raw',upper=140,lower=20,include_cols=['wheel_1_TinR','wheel_2_TinR'])
- .rm_outrange(method='quantile',upper=0.95,lower=0.05,include_cols=filter_columns)
- )
- if self.DHU_type == 'A':
- clean_data = (
- clean_data
- .rm_rule('wheel_1_ToutC<=coil_1_ToutA')
- )
- if 'observed' in data_type:
- filter_columns += list(self.model_observe_data_columns.values())
- clean_data = clean_data.get_data(
- fill = 0 if fill_zero else None,
- save_log = save_log
- )
- clean_data = clean_data.loc[:,filter_columns]
- return clean_data
-
- def optimize(
- self,
- cur_input_data: pd.DataFrame,
- wheel_1_TinR : tuple = (70,120),
- wheel_2_TinR : tuple = (70,120),
- fan_2_Hz : tuple = (30,50),
- constrains : list = None,
- logging : bool = True,
- target : str = 'summary_Fs',
- target_min : bool = True
- ) -> list:
- constrains = [] if constrains is None else constrains
- cur_input_data = cur_input_data.iloc[[0],:]
-
- opt_var_boundary = {}
- if wheel_1_TinR is not None:
- opt_var_boundary['wheel_1_TinR'] = {'lb':min(wheel_1_TinR),'ub':max(wheel_1_TinR)}
- if wheel_2_TinR is not None:
- opt_var_boundary['wheel_2_TinR'] = {'lb':min(wheel_2_TinR),'ub':max(wheel_2_TinR)}
- if fan_2_Hz is not None:
- opt_var_boundary['fan_2_Hz'] = {'lb':min(fan_2_Hz),'ub':max(fan_2_Hz)}
-
- opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())]
- oth_var_value = (
- cur_input_data
- .loc[:,list(self.model_input_data_columns.values())]
- .drop(opt_var_value.columns,axis=1)
- )
- opt_res = optimizer(
- model = self,
- opt_var_boundary = opt_var_boundary,
- opt_var_value = opt_var_value,
- oth_var_value = oth_var_value,
- target = target,
- target_min = target_min,
- constrains = constrains,
- logging = logging,
- other_kwargs = {'NIND':2000,'MAXGEN':50}
- )
- return opt_res
-
- def model(self,*args,**kwargs):
- if self.DHU_type == 'A':
- return model_A(*args,**kwargs)
- elif self.DHU_type == 'B':
- return model_B(*args,**kwargs)
- else:
- raise ValueError('DHU_type must be A or B')
-
- def plot_opt(
- self,
- cur_input_data: pd.DataFrame,
- target_min : str = 'summary_waste',
- coil_3_DoutA : tuple = None
- ):
- if coil_3_DoutA is None:
- coil_3_DoutA = (
- self.model_info['model_train_info_ATD']['wheel_2_DoutP_min'],
- self.model_info['model_train_info_ATD']['wheel_2_DoutP_max']
- )
- data_input = (
- pd.MultiIndex.from_product(
- [
- np.linspace(
- self.model_info['model_train_info_ATD']['wheel_1_TinR_min']-5,
- self.model_info['model_train_info_ATD']['wheel_1_TinR_max']+5,
- 1000
- ),
- np.linspace(
- self.model_info['model_train_info_ATD']['wheel_2_TinR_min']-5,
- self.model_info['model_train_info_ATD']['wheel_2_TinR_max']+5,
- 1000
- ),
- ],
- names=['wheel_1_TinR','wheel_2_TinR']
- )
- .to_frame(index=False)
- )
- for col in cur_input_data.columns:
- if col in data_input.columns:
- continue
- data_input[col] = cur_input_data.loc[:,col].iat[0]
- data_output = self.predict_system(data_input)
- data = (
- data_output
- .assign(
- wheel_1_TinR = data_input.loc[:,'wheel_1_TinR'],
- wheel_2_TinR = data_input.loc[:,'wheel_2_TinR'],
- )
- .assign(coil_3_DoutA=lambda dt:dt.coil_3_DoutA.round(1))
- .loc[lambda dt:dt.coil_3_DoutA.between(*(min(coil_3_DoutA),max(coil_3_DoutA)))]
- .loc[lambda dt:dt.groupby('coil_3_DoutA')[target_min].idxmin()]
- .loc[lambda dt:dt.coil_3_DoutA.mod(1)==0]
- )
- import plotnine as gg
- plot = (
- data
- .pipe(gg.ggplot)
- + gg.aes(x='wheel_1_TinR',y='wheel_2_TinR')
- + gg.geom_path(size=1)
- + gg.geom_point()
- + gg.geom_label(gg.aes(label='coil_3_DoutA'))
- + gg.geom_abline(slope=1,intercept=0,color='red',linetype='--')
- )
- return plot
-
- def plot_check(self,cur_input_data:pd.DataFrame) -> dict:
- pa1=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_DoutP')
- pa2=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_ToutP')
- pa3=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_EFF')
- pb1=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_DoutP')
- pb2=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_ToutP')
- pb3=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_EFF')
- plot_EFF = (pa1|pa2|pa3)/(pb1|pb2|pb3)
-
- p1=self.curve(x='wheel_1_TinR',y='summary_waste_cond1',input_data=cur_input_data)
- p2=self.curve(x='wheel_2_TinR',y='summary_waste_cond2',input_data=cur_input_data)
- p3=self.curve(x='wheel_1_TinR',y='summary_waste_Qsen1',input_data=cur_input_data)
- p4=self.curve(x='wheel_2_TinR',y='summary_waste_Qsen2',input_data=cur_input_data)
- p5=self.curve(x='wheel_1_TinR',y='summary_waste_out',input_data=cur_input_data)
- p6=self.curve(x='wheel_2_TinR',y='summary_waste_out',input_data=cur_input_data)
- plot_waste = (p1|p3|p5)/(p2|p4|p6)
-
- plot_opt = self.plot_opt(cur_input_data)
-
- return {'plot_EFF':plot_EFF,'plot_waste':plot_waste,'plot_opt':plot_opt}
-
-
-
-
- def model_A(
- Tin_F, # 前表冷后温度
- Hin_F, # 前表冷后湿度
- fan_1_Hz, # 处理侧风机频率
- fan_2_Hz, # 再生侧风机频率
- coil_1_TinW, # 前表冷进水温度
- coil_2_TinW, # 中表冷进水温度
- coil_3_TinW, # 后表冷进水温度
- coil_1_Val, # 前表冷阀门开度
- coil_2_Val, # 中表冷阀门开度
- coil_3_Val, # 后表冷阀门开度
- wheel_1_TinR, # 前转轮再生侧温度
- wheel_2_TinR, # 后转轮再生侧温度
- engine : str,
- components: dict,
- param : dict,
- mixed_1_TinM = 0, # 回风温度(处理侧)
- mixed_1_HinM = 0, # 回风湿度(处理侧)
- mixed_2_TinM = 0, # 补风温度(再生侧)
- mixed_2_HinM = 0, # 补风湿度(再生侧)
- ) -> dict:
-
- # 水的质量流量
- coil_2_FW = coil_2_Val / 100
- coil_3_FW = coil_3_Val / 100
- # 空气的质量流量
- air_flow = AirFlow_DHU_AB.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param,type='DHU_A')
-
- # 前转轮
- wheel_1_res = components['wheel_1'].model(
- TinP = Tin_F,
- HinP = Hin_F,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = 0,
- FR = air_flow['wheel_1_FaR'],
- TinC = Tin_F,
- HinC = Hin_F,
- FC = air_flow['wheel_1_FaC'],
- engine = engine,
- param = param['wheel_1']
- )
-
- # 处理侧混风(回风)
- mixed_1_res = components['mixed_1'].model(
- TinA = wheel_1_res['ToutP'],
- HinA = wheel_1_res['HoutP'],
- FA = air_flow['mixed_1_FaA'],
- TinM = mixed_1_TinM,
- HinM = mixed_1_HinM,
- FM = air_flow['mixed_1_FaM'],
- engine = engine
- )
-
- # 中表冷
- coil_2_res = components['coil_2'].model(
- TinA = mixed_1_res['ToutA'],
- HinA = mixed_1_res['HoutA'],
- FA = air_flow['coil_2_FaA'],
- TinW = coil_2_TinW,
- FW = coil_2_FW,
- engine = engine,
- param = param['coil_2']
- )
-
- # 后转轮
- wheel_2_res = components['wheel_2'].model(
- TinP = coil_2_res['ToutA'],
- HinP = coil_2_res['HoutA'],
- FP = air_flow['wheel_2_FaP'],
- TinC = wheel_1_res['ToutC'],
- HinC = wheel_1_res['HoutC'],
- FC = air_flow['wheel_2_FaC'],
- TinR = wheel_2_TinR,
- HinR = 0,
- FR = air_flow['wheel_2_FaR'],
- engine = engine,
- param = param['wheel_2'],
- )
-
- # 后表冷
- coil_3_res = components['coil_3'].model(
- TinA = wheel_2_res['ToutP'],
- HinA = wheel_2_res['HoutP'],
- FA = air_flow['coil_3_FaA'],
- TinW = coil_3_TinW,
- FW = coil_3_FW,
- engine = engine,
- param = param['coil_3']
- )
-
- # 后转轮湿度修正
- wheel_2_res_adj = components['wheel_2'].model(
- TinP = coil_2_res['ToutA'],
- HinP = coil_2_res['HoutA'],
- FP = air_flow['wheel_2_FaP'],
- TinC = wheel_1_res['ToutC'],
- HinC = wheel_1_res['HoutC'],
- FC = air_flow['wheel_2_FaC'],
- TinR = wheel_2_TinR,
- HinR = wheel_2_res['HoutC'],
- FR = air_flow['wheel_2_FaR'],
- engine = engine,
- param = param['wheel_2'],
- )
- # 再生侧混风(排风)
- mixed_2_res = components['mixed_2'].model(
- TinA = wheel_2_res_adj['ToutR'],
- HinA = wheel_2_res_adj['HoutR'],
- FA = air_flow['mixed_2_FaA'],
- TinM = mixed_2_TinM,
- HinM = mixed_2_HinM,
- FM = air_flow['mixed_2_FaM'],
- engine = engine
- )
-
- # 前转轮湿度修正
- wheel_1_res_adj = components['wheel_1'].model(
- TinP = Tin_F,
- HinP = Hin_F,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = mixed_2_res['HoutA'],
- FR = air_flow['wheel_1_FaR'],
- TinC = Tin_F,
- HinC = Hin_F,
- FC = air_flow['wheel_1_FaC'],
- engine = engine,
- param = param['wheel_1']
- )
-
- # 前再生加热盘管
- heatingcoil_1_res = components['heatingcoil_1'].model(
- TinA = mixed_2_res['ToutA'],
- ToutA = wheel_1_TinR,
- FA = air_flow['heatingcoil_1_Fa'],
- param = param['heatingcoil_1'],
- engine = engine
- )
-
- # 后再生加热盘管
- heatingcoil_2_res = components['heatingcoil_2'].model(
- TinA = wheel_2_res_adj['ToutC'],
- ToutA = wheel_2_TinR,
- FA = air_flow['heatingcoil_2_Fa'],
- param = param['heatingcoil_2'],
- engine = engine
- )
-
- waste = cal_Q_waste(
- wheel_1_res = wheel_1_res_adj,
- wheel_2_res = wheel_2_res_adj,
- heatingcoil_1_res = heatingcoil_1_res,
- heatingcoil_2_res = heatingcoil_2_res,
- wheel_1_TinR = wheel_1_TinR,
- wheel_2_TinR = wheel_2_TinR,
- coil_3_Val = coil_3_Val
- )
- return {
- 'coil_2' : coil_2_res,
- 'coil_3' : coil_3_res,
- 'wheel_1' : wheel_1_res_adj,
- 'wheel_2' : wheel_2_res_adj,
- 'mixed_1' : mixed_1_res,
- 'mixed_2' : mixed_2_res,
- 'heatingcoil_1': heatingcoil_1_res,
- 'heatingcoil_2': heatingcoil_2_res,
- 'Fa' : air_flow,
- 'summary' : {
- 'Fs' : heatingcoil_1_res['Fs'] + heatingcoil_2_res['Fs'],
- **waste,
- }
- }
-
- def model_B(
- Tin_F, # 前表冷后温度
- Hin_F, # 前表冷后湿度
- fan_1_Hz, # 处理侧风机频率
- fan_2_Hz, # 再生侧风机频率
- coil_1_TinW, # 前表冷进水温度
- coil_2_TinW, # 中表冷进水温度
- coil_3_TinW, # 后表冷进水温度
- coil_1_Val, # 前表冷阀门开度
- coil_2_Val, # 中表冷阀门开度
- coil_3_Val, # 后表冷阀门开度
- wheel_1_TinR, # 前转轮再生侧温度
- wheel_2_TinR, # 后转轮再生侧温度
- engine : str,
- components: dict,
- param : dict,
- mixed_1_TinM = 0, # 回风温度(处理侧)
- mixed_1_HinM = 0, # 回风湿度(处理侧)
- mixed_2_TinM = 0, # 补风温度(再生侧)
- mixed_2_HinM = 0, # 补风湿度(再生侧)
- ) -> dict:
-
- # 水的质量流量
- coil_2_FW = coil_2_Val / 100
- coil_3_FW = coil_3_Val / 100
- # 空气的质量流量
- air_flow = AirFlow_DHU_AB.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param,type='DHU_B')
-
- # 前转轮
- wheel_1_res = components['wheel_1'].model(
- TinP = Tin_F,
- HinP = Hin_F,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = 0,
- FR = air_flow['wheel_1_FaR'],
- engine = engine,
- param = param['wheel_1'],
- )
-
- # 处理侧混风(回风)
- mixed_1_res = components['mixed_1'].model(
- TinA = wheel_1_res['ToutP'],
- HinA = wheel_1_res['HoutP'],
- FA = air_flow['mixed_1_FaA'],
- TinM = mixed_1_TinM,
- HinM = mixed_1_HinM,
- FM = air_flow['mixed_1_FaM'],
- engine = engine
- )
-
- # 中表冷
- coil_2_res = components['coil_2'].model(
- TinA = mixed_1_res['ToutA'],
- HinA = mixed_1_res['HoutA'],
- FA = air_flow['coil_2_FaA'],
- TinW = coil_2_TinW,
- FW = coil_2_FW,
- engine = engine,
- param = param['coil_2']
- )
-
- # 后转轮
- wheel_2_res = components['wheel_2'].model(
- TinP = coil_2_res['ToutA'],
- HinP = coil_2_res['HoutA'],
- FP = air_flow['wheel_2_FaP'],
- TinC = mixed_1_res['ToutA'],
- HinC = mixed_1_res['HoutA'],
- FC = air_flow['wheel_2_FaC'],
- TinR = wheel_2_TinR,
- HinR = 0,
- FR = air_flow['wheel_2_FaR'],
- engine = engine,
- param = param['wheel_2'],
- )
-
- # 后表冷
- coil_3_res = components['coil_3'].model(
- TinA = wheel_2_res['ToutP'],
- HinA = wheel_2_res['HoutP'],
- FA = air_flow['coil_3_FaA'],
- TinW = coil_3_TinW,
- FW = coil_3_FW,
- engine = engine,
- param = param['coil_3']
- )
-
-
- # 后转轮湿度修正
- wheel_2_res_adj = components['wheel_2'].model(
- TinP = coil_2_res['ToutA'],
- HinP = coil_2_res['HoutA'],
- FP = air_flow['wheel_2_FaP'],
- TinC = mixed_1_res['ToutA'],
- HinC = mixed_1_res['HoutA'],
- FC = air_flow['wheel_2_FaC'],
- TinR = wheel_2_TinR,
- HinR = wheel_2_res['HoutC'],
- FR = air_flow['wheel_2_FaR'],
- engine = engine,
- param = param['wheel_2'],
- )
-
- # 再生侧混风(排风)
- mixed_2_res = components['mixed_2'].model(
- TinA = wheel_2_res_adj['ToutR'],
- HinA = wheel_2_res_adj['HoutR'],
- FA = air_flow['mixed_2_FaA'],
- TinM = mixed_2_TinM,
- HinM = mixed_2_HinM,
- FM = air_flow['mixed_2_FaM'],
- engine = engine
- )
-
- # 前转轮湿度修正
- wheel_1_res_adj = components['wheel_1'].model(
- TinP = Tin_F,
- HinP = Hin_F,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = mixed_2_res['HoutA'],
- FR = air_flow['wheel_1_FaR'],
- engine = engine,
- param = param['wheel_1'],
- )
-
- # 前蒸气盘管
- heatingcoil_1_res = components['heatingcoil_1'].model(
- TinA = mixed_2_res['ToutA'],
- ToutA = wheel_1_TinR,
- FA = air_flow['heatingcoil_1_Fa'],
- param = param['heatingcoil_1'],
- engine = engine
- )
-
- # 后蒸气盘管
- heatingcoil_2_res = components['heatingcoil_2'].model(
- TinA = wheel_2_res_adj['ToutC'],
- ToutA = wheel_2_TinR,
- FA = air_flow['heatingcoil_2_Fa'],
- param = param['heatingcoil_2'],
- engine = engine
- )
-
- waste = cal_Q_waste(
- wheel_1_res = wheel_1_res_adj,
- wheel_2_res = wheel_2_res_adj,
- heatingcoil_1_res = heatingcoil_1_res,
- heatingcoil_2_res = heatingcoil_2_res,
- wheel_1_TinR = wheel_1_TinR,
- wheel_2_TinR = wheel_2_TinR,
- coil_3_Val = coil_3_Val
- )
-
- return {
- 'coil_2' : coil_2_res,
- 'coil_3' : coil_3_res,
- 'wheel_1' : wheel_1_res_adj,
- 'wheel_2' : wheel_2_res_adj,
- 'mixed_1' : mixed_1_res,
- 'mixed_2' : mixed_2_res,
- 'heatingcoil_1': heatingcoil_1_res,
- 'heatingcoil_2': heatingcoil_2_res,
- 'Fa' : air_flow,
- 'summary' : {
- 'Fs' : heatingcoil_1_res['Fs'] + heatingcoil_2_res['Fs'],
- **waste
- }
- }
-
-
- class AirFlow_DHU_AB:
-
- @classmethod
- def model(cls,fan_1_Hz,fan_2_Hz,param,type):
-
- # 当定频风机固定的时候,各出入口处的基准的风量
- F_air_S_base = 1
- F_air_X_base = param['F_air']['X_base']
- F_air_H_base = param['F_air'].get('H_base',0)
- F_air_B_base = param['F_air'].get('B_base',0)
- F_air_val_rw = param['F_air'].get('val_rw',0)
- F_air_val_pct = param['F_air'].get('val_pct',0)
-
-
-
- # 新风阀的变化造成的基准风量变化
- F_air_S_base_adj = F_air_S_base
- F_air_X_base_adj = F_air_X_base + F_air_val_rw
- F_air_H_base_adj = F_air_H_base - F_air_val_rw * F_air_val_pct if 'H_base' in param['F_air'] else 0
- F_air_B_base_adj = F_air_B_base - F_air_val_rw * (1 - F_air_val_pct) if 'B_base' in param['F_air'] else 0
-
- # 考虑风机频率变化对风量的影响,得到最终风量
- F_air_HzP_X = param['F_air']['HzP_X']
- F_air_HzP_H = param['F_air'].get('HzP_H',0)
- F_air_HzP_S = F_air_HzP_X + F_air_HzP_H
- F_air_HzR_B = param['F_air'].get('HzR_B',0)
- Fa_S = F_air_S_base_adj + F_air_HzP_S * (fan_1_Hz / 50)
- Fa_H = F_air_H_base_adj + F_air_HzP_H * (fan_1_Hz / 50)
- Fa_X = F_air_X_base_adj + F_air_HzP_X * (fan_1_Hz / 50)
- Fa_B = F_air_B_base_adj + F_air_HzR_B * (fan_2_Hz / 50)
- Fa_P = Fa_B + Fa_X + Fa_H - Fa_S
-
- if type == 'DHU_A':
- wheel_1_FaP = Fa_S - Fa_H
- wheel_1_FaC = Fa_X - wheel_1_FaP
- wheel_1_FaR = Fa_P
- wheel_2_FaP = Fa_S
- wheel_2_FaC = wheel_1_FaC
- wheel_2_FaR = wheel_1_FaC
- mixed_1_FaM = Fa_H
- mixed_1_FaA = wheel_1_FaP
- mixed_2_FaM = Fa_B
- mixed_2_FaA = wheel_1_FaC
- coil_2_FaA = Fa_S
- coil_3_FaA = Fa_S
- heatingcoil_1_Fa = Fa_P
- heatingcoil_2_Fa = wheel_1_FaC
-
- elif type == 'DHU_B':
- wheel_1_FaP = Fa_X
- wheel_1_FaC = np.nan
- wheel_1_FaR = Fa_P
- wheel_2_FaP = Fa_S
- wheel_2_FaC = Fa_X + Fa_H - Fa_S
- wheel_2_FaR = wheel_2_FaC
- mixed_1_FaM = Fa_H
- mixed_1_FaA = Fa_X
- mixed_2_FaM = Fa_B
- mixed_2_FaA = wheel_2_FaC
- coil_2_FaA = Fa_S
- coil_3_FaA = Fa_S
- heatingcoil_1_Fa = Fa_P
- heatingcoil_2_Fa = wheel_2_FaC
-
- else:
- raise Exception('type error')
- return {
- 'Fa_S':Fa_S,'Fa_H':Fa_H,'Fa_X':Fa_X,'Fa_B':Fa_B,'Fa_P':Fa_P,
- 'wheel_1_FaP':wheel_1_FaP,'wheel_1_FaC':wheel_1_FaC,'wheel_1_FaR':wheel_1_FaR,
- 'wheel_2_FaP':wheel_2_FaP,'wheel_2_FaC':wheel_2_FaC,'wheel_2_FaR':wheel_2_FaR,
- 'mixed_1_FaM':mixed_1_FaM,'mixed_1_FaA':mixed_1_FaA,
- 'mixed_2_FaM':mixed_2_FaM,'mixed_2_FaA':mixed_2_FaA,
- 'coil_2_FaA':coil_2_FaA,'coil_3_FaA':coil_3_FaA,
- 'heatingcoil_1_Fa':heatingcoil_1_Fa,'heatingcoil_2_Fa':heatingcoil_2_Fa
- }
-
- @classmethod
- def prior(
- cls,
- rw_FA_val : bool,
- N : int,
- exist_Fa_H: bool,
- exist_Fa_B: bool
- ) -> dict:
- param = {}
-
- # 新风参数
- param['HzP_X'] = pm.HalfNormal('F_air_HzP_X',sigma=1,initval=1)
- X_base_initval = 0.5 if exist_Fa_H else 1.5
- param['X_base'] = pm.TruncatedNormal(
- 'F_air_X_base',mu=0.5,sigma=0.2,lower=0,initval=X_base_initval)
-
- if exist_Fa_H:
- param['HzP_H'] = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.1)
- param['H_base'] = pm.TruncatedNormal('F_air_H_base',mu=0.6,sigma=0.2,lower=0,upper=0.999,initval=0.6)
-
- if exist_Fa_B:
- param['HzR_B'] = pm.HalfNormal('F_air_HzR_B',sigma=1,initval=0.5)
- param['B_base'] = pm.TruncatedNormal('F_air_B_base',mu=0.2,sigma=0.1,lower=0,initval=0.1)
-
- if rw_FA_val:
- period = 48
- n_segments = int(np.ceil(N/period))
- remainder = N % period
- repeat = [period] * (n_segments - 1) + ([remainder] if remainder != 0 else [])
- rw = pm.GaussianRandomWalk(
- 'rw',sigma=0.1,init_dist=pm.Normal.dist(mu=0,sigma=0.3),shape=n_segments)
- rw = pm.math.switch(rw<0,0,rw)
- param['val_rw'] = pm.Deterministic('F_air_val_rw',pt.repeat(rw,repeat))
- param['val_pct'] = pm.Beta('F_air_val_pct',alpha=8,beta=1,initval=0.9)
- # param['val_pct'] = pm.Dirichlet('F_air_val_pct',np.array([0.1,0.1,0.8]),initval=np.array([0.1,0.1,0.8]))
- else:
- param['val_rw'] = 0
- param['val_pct'] = 0
-
- return param
-
- def cal_Q_waste(
- wheel_1_res,
- wheel_2_res,
- heatingcoil_1_res,
- heatingcoil_2_res,
- wheel_1_TinR,
- wheel_2_TinR,
- coil_3_Val
- ) -> dict:
- def waste_cond_func1(TinR):
- waste = 0.15 + 0.0001 * (TinR-70)**3
- return np.where(waste>0,waste,0)
- def waste_cond_func2(TinR):
- waste = 0.25 * (1 - np.exp(-0.04 * (TinR - 70)))
- return np.where(waste>0,waste,0)
-
- waste_Qsen1 = wheel_1_res['Qsen']
- waste_Qsen2 = wheel_2_res['Qsen'] * np.where(coil_3_Val>0.01,1,0) # 阀门关闭时,认为热量没有浪费,用作加热用
- waste_cond1 = heatingcoil_1_res['Q'] * waste_cond_func1(wheel_1_TinR)
- waste_cond2 = heatingcoil_2_res['Q'] * waste_cond_func1(wheel_2_TinR)
- waste_out = (
- heatingcoil_1_res['Q'] + heatingcoil_2_res['Q']
- - wheel_1_res['Qsen'] - wheel_1_res['Qlat']
- - wheel_2_res['Qsen'] - wheel_2_res['Qlat']
- )
- return {
- 'waste_Qsen1': waste_Qsen1,
- 'waste_Qsen2': waste_Qsen2,
- 'waste_Qout' : waste_out,
- 'waste_cond1': waste_cond1,
- 'waste_cond2': waste_cond2,
- 'waste_out' : waste_out,
- # 'waste' : waste_cond2+waste_cond1,
- 'waste' : waste_Qsen1+waste_Qsen2+waste_cond1+waste_cond2+waste_out,
- }
|