| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 |
- from typing import Union
- import numpy as np
- import pandas as pd
- import pymc as pm
- import pytensor.tensor as pt
- from .._base._base_device import BaseDevice
- from ...components import (
- coil_water,coil_steam,wheel2,wheel3,mixed
- )
- from ..utils.fit_utils import (
- observe,reorder_posterior
- )
- from ...tools.optimizer import optimizer
- from ...tools.data_cleaner import DataCleaner
- class SDHU_AB(BaseDevice):
-
- val_rw_adj_target = ('coil_3_ToutA','coil_3_DoutA')
-
- def __init__(
- self,
- DHU_type = 'A',
- exist_Fa_H = True,
- wheel_1 = None,
- coolingcoil_2 = 'CoolingCoil2',
- heatingcoil_1 = 'SteamCoil',
- mixed_1 = 'Mixed',
- mixed_2 = 'Mixed',
- other_info = None
- ) -> None:
- super().__init__()
- self.DHU_type = DHU_type.replace('SDHU_','')
- if self.DHU_type == 'A':
- wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS3V3'
- elif self.DHU_type == 'B':
- wheel_1 = wheel_1 if wheel_1 is not None else 'WheelS2V2'
- else:
- raise Exception('SDHU_type must be A or B')
-
- self.components_str = {
- 'wheel_1' : wheel_1,
- 'coil_2' : coolingcoil_2,
- 'heatingcoil_1': heatingcoil_1,
- 'mixed_1' : mixed_1,
- 'mixed_2' : mixed_2
- }
- self.exist_Fa_H = exist_Fa_H
- self.other_info = other_info if other_info is not None else {}
- self.record_load_info(
- components_str = self.components_str,
- DHU_type = self.DHU_type,
- exist_Fa_H = self.exist_Fa_H,
- other_info = self.other_info
- )
-
- @property
- def components(self):
- comp_map = {
- 'WheelS2':wheel2,'WheelS3':wheel3,'CoolingCoil':coil_water,
- 'SteamCoil':coil_steam,'Mixed':mixed
- }
- output ={}
- for comp_name,comp_model in self.components_str.items():
- if comp_model == 'SteamCoilVal':
- output[comp_name] = coil_steam.SteamCoilVal(
- name = comp_name,
- Fs_rated = self.other_info[f'{comp_name}_Fs_rated']
- )
- continue
- for comp_map_k,comp_map_v in comp_map.items():
- if comp_model.startswith(comp_map_k):
- output[comp_name] = getattr(comp_map_v,comp_model)(name = comp_name)
- return output
-
- @property
- def model_input_data_columns(self):
- columns = {
- 'Tin_F' : 'coil_1_ToutA',
- 'Hin_F' : 'coil_1_HoutA',
- 'fan_1_Hz' : 'fan_1_Hz',
- 'fan_2_Hz' : 'fan_2_Hz',
- 'coil_2_TinW' : 'coil_2_TinW',
- 'coil_2_Val' : 'coil_2_Val',
- 'wheel_1_TinR': 'wheel_1_TinR',
- }
- if self.exist_Fa_H:
- columns['mixed_1_TinM'] = 'mixed_1_TinM'
- columns['mixed_1_HinM'] = 'mixed_1_HinM'
- if self.DHU_type == 'A':
- columns['coil_1_ToutA'] = 'coil_1_ToutA'
- columns['coil_1_HoutA'] = 'coil_1_HoutA'
- return columns
-
- @property
- def model_observe_data_columns(self):
- columns = {
- 'mixed_1_ToutA': 'mixed_1_ToutA',
- 'mixed_1_DoutA': 'mixed_1_DoutA',
- 'coil_2_ToutA' : 'coil_2_ToutA',
- 'coil_2_DoutA' : 'coil_2_DoutA',
- }
- if self.DHU_type == 'A':
- columns['wheel_1_ToutC'] = 'wheel_1_ToutC' # A类除湿机前转轮是三分转轮
-
- exclude_obs = self.other_info.get('exclude_obs',[])
- for col in exclude_obs:
- if col in columns:
- del columns[col]
- return columns
-
- def model(self,*args,**kwargs):
- if self.DHU_type == 'A':
- return model_A(*args,**kwargs)
- elif self.DHU_type == 'B':
- # return model_B(*args,**kwargs)
- pass
- else:
- raise ValueError('DHU_type must be A or B')
-
- def fit(
- self,
- input_data : pd.DataFrame,
- observed_data: pd.DataFrame,
- plot_TVP : bool = True,
- ):
- if len(input_data) < 30:
- raise Exception('数据量过少')
-
- with pm.Model() as self.MODEL_PYMC:
- param_prior = {name:comp.prior() for name,comp in self.components.items()}
- param_prior['F_air'] = AirFlow_SDHU_A.prior(exist_Fa_H = self.exist_Fa_H)
-
- res = self.model(
- **{k:input_data.loc[:,v].values for k,v in self.model_input_data_columns.items()},
- engine = 'pymc',
- components = self.components,
- param = param_prior
- )
- for std_name,name in self.model_observe_data_columns.items():
- if name not in observed_data.columns:
- raise Exception(f'Missing column: {name}')
- observed_data = observed_data.rename(columns={name:std_name})
-
- std_name_equp,std_name_point = std_name.rsplit('_',1)
- sigma = 1
- observe(
- name = std_name,
- var = res[std_name_equp][std_name_point],
- observed = observed_data,
- sigma = sigma
- )
-
- self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False)
-
- self.record_load_info(param_posterior = self.param_posterior)
- self.record_model(
- model_name = 'ATD',
- model = reorder_posterior(param_prior,self.param_posterior),
- train_data = {'x':np.array([1])},
- train_metric = {'R2':1,'MAE':1,'MAPE':1}
- )
- self.TVP_data = self.get_TVP(self.param_posterior,observed_data)
- self.TVP_metric = self.get_metric(self.TVP_data)
- if plot_TVP:
- self.plot_TVP(self.TVP_data).show()
- return self
-
- @property
- def F_air_val_rw(self):
- return None
-
- def set_F_air_val_rw(self,value:float):
- return self
-
- def clean_data(
- self,
- data : pd.DataFrame,
- data_type : list=['input','observed'],
- print_process: bool = True,
- fill_zero : bool = False,
- save_log : Union[str,None] = None
- ) -> pd.DataFrame:
- data = data.replace(-9999,np.nan)
- clean_data = DataCleaner(data,print_process=print_process)
-
- if 'input' in data_type:
- clean_data = (
- clean_data
- .rm_rolling_fluct(window=60,fun='ptp',thre=0.1,include_cols=['State'])
- .rm_rule('State != 1')
- .rm_rule('fan_1_Hz < 10').rm_rule('fan_2_Hz < 10')
- .rm_outrange(method='raw',upper=140,lower=20,include_cols=['wheel_1_TinR'])
- )
- if 'observed' in data_type:
- pass
- clean_data = clean_data.get_data(
- fill = 0 if fill_zero else None,
- save_log = save_log
- )
- return clean_data
-
- # def optimize(
- # self,
- # cur_input_data: pd.DataFrame,
- # wheel_1_TinR : tuple = (70,120),
- # wheel_2_TinR : tuple = (70,120),
- # fan_2_Hz : tuple = (30,50),
- # constrains : list = None,
- # logging : bool = True,
- # target : str = 'summary_Fs',
- # target_min : bool = True
- # ) -> list:
- # constrains = [] if constrains is None else constrains
- # cur_input_data = cur_input_data.iloc[[0],:]
-
- # opt_var_boundary = {}
- # if wheel_1_TinR is not None:
- # opt_var_boundary['wheel_1_TinR'] = {'lb':min(wheel_1_TinR),'ub':max(wheel_1_TinR)}
- # if wheel_2_TinR is not None:
- # opt_var_boundary['wheel_2_TinR'] = {'lb':min(wheel_2_TinR),'ub':max(wheel_2_TinR)}
- # if fan_2_Hz is not None:
- # opt_var_boundary['fan_2_Hz'] = {'lb':min(fan_2_Hz),'ub':max(fan_2_Hz)}
-
- # opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())]
- # oth_var_value = (
- # cur_input_data
- # .loc[:,list(self.model_input_data_columns.values())]
- # .drop(opt_var_value.columns,axis=1)
- # )
- # opt_res = optimizer(
- # model = self,
- # opt_var_boundary = opt_var_boundary,
- # opt_var_value = opt_var_value,
- # oth_var_value = oth_var_value,
- # target = target,
- # target_min = target_min,
- # constrains = constrains,
- # logging = logging,
- # other_kwargs = {'NIND':2000,'MAXGEN':50}
- # )
- # return opt_res
-
-
- # def plot_opt(
- # self,
- # cur_input_data: pd.DataFrame,
- # target_min : str = 'summary_waste',
- # ):
- # data_input = (
- # pd.MultiIndex.from_product(
- # [
- # np.linspace(70,120,1000),
- # np.linspace(70,120,1000),
- # ],
- # names=['wheel_1_TinR','wheel_2_TinR']
- # )
- # .to_frame(index=False)
- # )
- # for col in cur_input_data.columns:
- # if col in data_input.columns:
- # continue
- # data_input[col] = cur_input_data.loc[:,col].iat[0]
- # data_output = self.predict_system(data_input)
- # data = (
- # data_output
- # .assign(
- # wheel_1_TinR = data_input.loc[:,'wheel_1_TinR'],
- # wheel_2_TinR = data_input.loc[:,'wheel_2_TinR'],
- # )
- # .assign(coil_3_DoutA=lambda dt:dt.coil_3_DoutA.round(1))
- # .loc[lambda dt:dt.groupby('coil_3_DoutA')[target_min].idxmin()]
- # .loc[lambda dt:dt.coil_3_DoutA.mod(1)==0]
- # )
- # import plotnine as gg
- # plot = (
- # data
- # .pipe(gg.ggplot)
- # + gg.aes(x='wheel_1_TinR',y='wheel_2_TinR')
- # + gg.geom_path(size=1)
- # + gg.geom_point()
- # + gg.geom_label(gg.aes(label='coil_3_DoutA'))
- # + gg.geom_abline(slope=1,intercept=0,color='red',linetype='--')
- # )
- # return plot
-
- # def plot_check(self,cur_input_data:pd.DataFrame) -> dict:
- # pa1=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_DoutP')
- # pa2=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_ToutP')
- # pa3=self.curve(input_data=cur_input_data,x='wheel_1_TinR',y='wheel_1_EFF')
- # pb1=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_DoutP')
- # pb2=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_ToutP')
- # pb3=self.curve(input_data=cur_input_data,x='wheel_2_TinR',y='wheel_2_EFF')
- # plot1 = (pa1|pa2|pa3)/(pb1|pb2|pb3)
- # return {'plot1':plot1}
-
-
-
-
- def model_A(
- Tin_F, # 前表冷后温度
- Hin_F, # 前表冷后湿度
- coil_1_ToutA,
- coil_1_HoutA,
- fan_1_Hz, # 处理侧风机频率
- fan_2_Hz, # 再生侧风机频率
- coil_2_TinW, # 中表冷进水温度
- coil_2_Val, # 中表冷阀门开度
- wheel_1_TinR, # 前转轮再生侧温度
- engine : str,
- components: dict,
- param : dict,
- mixed_1_TinM = 0, # 回风温度(处理侧)
- mixed_1_HinM = 0, # 回风湿度(处理侧)
- ) -> dict:
-
- # 水的质量流量
- coil_2_FW = coil_2_Val / 100
- # 空气的质量流量
- air_flow = AirFlow_SDHU_A.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param)
-
- # 前转轮
- wheel_1_res = components['wheel_1'].model(
- TinP = coil_1_ToutA,
- HinP = coil_1_HoutA,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = 0,
- FR = air_flow['wheel_1_FaR'],
- TinC = Tin_F,
- HinC = Hin_F,
- FC = air_flow['wheel_1_FaC'],
- engine = engine,
- param = param['wheel_1']
- )
-
- # 处理侧混风(回风)
- mixed_1_res = components['mixed_1'].model(
- TinA = wheel_1_res['ToutP'],
- HinA = wheel_1_res['HoutP'],
- FA = air_flow['mixed_1_FaA'],
- TinM = mixed_1_TinM,
- HinM = mixed_1_HinM,
- FM = air_flow['mixed_1_FaM'],
- engine = engine
- )
-
- # 中表冷
- coil_2_res = components['coil_2'].model(
- TinA = mixed_1_res['ToutA'],
- HinA = mixed_1_res['HoutA'],
- FA = air_flow['coil_2_FaA'],
- TinW = coil_2_TinW,
- FW = coil_2_FW,
- engine = engine,
- param = param['coil_2']
- )
-
- # 后转轮湿度修正
- wheel_1_res_adj = components['wheel_1'].model(
- TinP = coil_1_ToutA,
- HinP = coil_1_HoutA,
- FP = air_flow['wheel_1_FaP'],
- TinR = wheel_1_TinR,
- HinR = wheel_1_res['HoutC'],
- FR = air_flow['wheel_1_FaR'],
- TinC = Tin_F,
- HinC = Hin_F,
- FC = air_flow['wheel_1_FaC'],
- engine = engine,
- param = param['wheel_1']
- )
-
- # 前再生加热盘管
- heatingcoil_1_res = components['heatingcoil_1'].model(
- TinA = wheel_1_res_adj['ToutC'],
- ToutA = wheel_1_TinR,
- FA = air_flow['heatingcoil_1_Fa'],
- param = param['heatingcoil_1'],
- engine = engine
- )
-
-
- # waste = cal_Q_waste(
- # wheel_1_res = wheel_1_res_adj,
- # wheel_2_res = wheel_2_res_adj,
- # heatingcoil_1_res = heatingcoil_1_res,
- # heatingcoil_2_res = heatingcoil_2_res,
- # wheel_1_TinR = wheel_1_TinR,
- # wheel_2_TinR = wheel_2_TinR
- # )
- return {
- 'coil_2' : coil_2_res,
- 'wheel_1' : wheel_1_res_adj,
- 'mixed_1' : mixed_1_res,
- 'heatingcoil_1': heatingcoil_1_res,
- 'Fa' : air_flow,
- 'summary' : {
- 'Fs' : heatingcoil_1_res['Fs'],
- # **waste,
- }
- }
-
-
- class AirFlow_SDHU_A:
-
- @classmethod
- def model(cls,fan_1_Hz,fan_2_Hz,param):
- F_air_X2_base = 1 # 进入转轮处理侧的新风量
- F_air_H_base = param['F_air'].get('H_base',0)
- F_air_P_base = param['F_air']['P_base']
-
- Fa_H = F_air_H_base + (fan_1_Hz/50) * param['F_air'].get('HzP_H',0)
- Fa_X2 = F_air_X2_base + (fan_1_Hz/50) * param['F_air']['HzP_X2']
- Fa_S = Fa_H + Fa_X2
- Fa_P = F_air_P_base + (fan_2_Hz/50) * param['F_air']['HzR_P']
-
- return {
- 'wheel_1_FaP' : Fa_X2,
- 'wheel_1_FaC' : Fa_P,
- 'wheel_1_FaR' : Fa_P,
- 'coil_2_FaA' : Fa_S,
- 'mixed_1_FaM' : Fa_H,
- 'mixed_1_FaA' : Fa_X2,
- 'heatingcoil_1_Fa': Fa_P
- }
-
- @classmethod
- def prior(cls,exist_Fa_H):
- param = {}
- param['HzP_X2'] = pm.HalfNormal('F_air_HzP_X2',sigma=1,initval=0.5)
- param['HzR_P'] = pm.HalfNormal('F_air_HzR_P',sigma=1,initval=0.5)
- param['P_base'] = pm.TruncatedNormal('F_air_P_base',mu=1,sigma=0.2,lower=0,initval=1)
- if exist_Fa_H:
- param['H_base'] = pm.TruncatedNormal('F_air_H_base',mu=1,sigma=0.2,lower=0,initval=1)
- param['HzP_H'] = pm.HalfNormal('F_air_HzP_H',sigma=1,initval=0.5)
- return param
-
-
-
- def cal_Q_waste(
- wheel_1_res,
- wheel_2_res,
- heatingcoil_1_res,
- heatingcoil_2_res,
- wheel_1_TinR,
- wheel_2_TinR
- ):
- waste_Qsen1 = wheel_1_res['Qsen']
- waste_Qsen2 = wheel_2_res['Qsen']
- waste_cond1 = heatingcoil_1_res['Q'] * (0.15 + 0.0001 * (wheel_1_TinR-70)**2)
- waste_cond2 = heatingcoil_2_res['Q'] * (0.15 + 0.0001 * (wheel_2_TinR-70)**2)
- waste_out = (
- heatingcoil_1_res['Q'] + heatingcoil_2_res['Q']
- - wheel_1_res['Qsen'] - wheel_1_res['Qlat']
- - wheel_2_res['Qsen'] - wheel_2_res['Qlat']
- )
- return {
- 'waste_Qsen1': waste_Qsen1,
- 'waste_Qsen2': waste_Qsen2,
- 'waste_Qout' : waste_out,
- 'waste_cond1': waste_cond1,
- 'waste_cond2': waste_cond2,
- 'waste_out' : waste_out,
- 'waste' : waste_Qsen1+waste_cond2+waste_cond1+waste_cond2+waste_out,
- }
|