from typing import Union import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression try: import plotnine as gg except: pass from .._base._base import BaseModel class RoomDewPredictor(BaseModel): def __init__(self): super().__init__() def fit_Droom(self,Dout:np.ndarray, Droom:np.ndarray): _,_,best_lag,best_coef = self.get_lag_coef(Dout=Dout,Droom=Droom) self.record_model( model_name = 'Droom', model = {'coef':best_coef,'lag':best_lag}, train_data = {'Dout':Dout,'Droom':Droom}, train_metric = {'R2':1,'MAE':1,'MAPE':1}, ) return self @property def lag(self): return self.model_info['model_Droom']['lag'] def predict_Droom(self,Dout:np.ndarray,Droom_cur:float,sm_frac=0) -> np.ndarray: if isinstance(Droom_cur,np.ndarray): Droom_cur = Droom_cur[-1] model = self.model_info['model_Droom'] lag = model['lag'] coef = model['coef'] Dout = Dout[-lag-1:] Dout_diff = np.diff(Dout) Droom_diff = Dout_diff * coef Droom = smooth(Droom_diff.cumsum() + Droom_cur,sm_frac) return Droom @classmethod def to_diffdata( self, Dout : np.ndarray, Droom: Union[np.ndarray,None] ) -> pd.DataFrame: data = {'Dout':Dout} if Droom is not None: data['Droom'] = Droom data = ( pd.DataFrame(data) .assign(ts=np.arange(len(Dout))).set_index('ts') .rolling(15,center=False).mean() .diff() .rolling(15,center=False).mean() .dropna() ) for lag in range(1,30): data[f'Dout_lag{lag:02d}'] = data['Dout'].shift(lag) data = data.dropna() return data def plot_diffdata( self, Dout : np.ndarray, Droom: np.ndarray ): plot_raw = ( pd.DataFrame({'Dout':Dout,'Droom':Droom}) .assign(ts=np.arange(len(Dout))) .pipe(gg.ggplot) + gg.aes(x='ts') + gg.geom_line(gg.aes(y='Droom'),color='red') + gg.geom_line(gg.aes(y='Dout'),color='blue') ) plot_diff = ( self.to_diffdata(Dout=Dout,Droom=Droom) .reset_index() .pipe(gg.ggplot) + gg.aes(x='ts') + gg.geom_line(gg.aes(y='Droom'),color='red') + gg.geom_line(gg.aes(y='Dout'),color='blue') ) plot = (plot_raw / plot_diff) + gg.theme(figure_size=[8,6]) return plot def plot_diffdata_lagcorr(self,Dout:np.ndarray, Droom:np.ndarray): all_coef,all_r2,best_lag,best_coef = self.get_lag_coef(Dout=Dout,Droom=Droom) data = pd.DataFrame({'coef':all_coef,'r2':all_r2,'lag':np.arange(1,30)}) p1 = ( data .pipe(gg.ggplot) + gg.aes(x='lag',y='coef') + gg.geom_point() + gg.geom_line() + gg.geom_vline(xintercept=best_lag,color='red',linetype='--') + gg.scale_x_continuous(breaks=np.arange(1,30)) ) p2 = ( data .pipe(gg.ggplot) + gg.aes(x='lag',y='r2') + gg.geom_point() + gg.geom_line() + gg.geom_vline(xintercept=best_lag,color='red',linetype='--') + gg.scale_x_continuous(breaks=np.arange(1,30)) + gg.scale_y_continuous(breaks=np.arange(0,1,0.2)) ) plot_line = (p1 / p2) plot_scatter = ( self.to_diffdata(Dout=Dout,Droom=Droom) .reset_index() .melt(id_vars=['ts','Droom'],value_name='Dout_lag') .pipe(gg.ggplot) + gg.aes(x='Dout_lag',y='Droom') + gg.geom_point() + gg.facet_wrap('~variable',ncol=8) + gg.geom_smooth(method='lm',color='red',se=False) + gg.geom_vline(xintercept=0,color='grey',linetype='--') + gg.geom_hline(yintercept=0,color='grey',linetype='--') + gg.geom_abline(color='grey',linetype='--') ) plot = (plot_scatter | plot_line)+ gg.theme(figure_size=[25,8]) return plot def get_lag_coef(self,Dout:np.ndarray, Droom:np.ndarray): diffdata = self.to_diffdata(Dout=Dout,Droom=Droom) all_lag = np.arange(1,30) all_coef = [] all_r2 = [] for i in all_lag: lag = f'Dout_lag{i:02d}' x = diffdata.loc[:,[lag]].values y = diffdata.Droom.values lm = LinearRegression(fit_intercept=False).fit(x,y) coef = lm.coef_ r2 = lm.score(x,y) all_coef.append(coef[0]) all_r2.append(r2) all_coef = np.array(all_coef) all_r2 = np.array(all_r2) best_lag = all_lag[all_r2.argmax()] best_coef = all_coef[all_r2.argmax()] return all_coef,all_r2,best_lag,best_coef def smooth(y:pd.Series,frac=0.1): import statsmodels.api as sm x = np.arange(len(y)) lowess_result = sm.nonparametric.lowess(y, x, frac=frac) # frac 控制平滑程度 return lowess_result[:, 1]