| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from typing import Union
- import numpy as np
- import pandas as pd
- from sklearn.linear_model import LinearRegression
- try:
- import plotnine as gg
- except:
- pass
- from .._base._base import BaseModel
- class RoomDewPredictor(BaseModel):
- def __init__(self):
- super().__init__()
-
- def fit_Droom(self,Dout:np.ndarray, Droom:np.ndarray):
- _,_,best_lag,best_coef = self.get_lag_coef(Dout=Dout,Droom=Droom)
- self.record_model(
- model_name = 'Droom',
- model = {'coef':best_coef,'lag':best_lag},
- train_data = {'Dout':Dout,'Droom':Droom},
- train_metric = {'R2':1,'MAE':1,'MAPE':1},
- )
- return self
-
- @property
- def lag(self):
- return self.model_info['model_Droom']['lag']
-
- def predict_Droom(self,Dout:np.ndarray,Droom_cur:float,sm_frac=0) -> np.ndarray:
- if isinstance(Droom_cur,np.ndarray):
- Droom_cur = Droom_cur[-1]
- model = self.model_info['model_Droom']
- lag = model['lag']
- coef = model['coef']
- Dout = Dout[-lag-1:]
- Dout_diff = np.diff(Dout)
- Droom_diff = Dout_diff * coef
- Droom = smooth(Droom_diff.cumsum() + Droom_cur,sm_frac)
- return Droom
-
- @classmethod
- def to_diffdata(
- self,
- Dout : np.ndarray,
- Droom: Union[np.ndarray,None]
- ) -> pd.DataFrame:
- data = {'Dout':Dout}
- if Droom is not None:
- data['Droom'] = Droom
- data = (
- pd.DataFrame(data)
- .assign(ts=np.arange(len(Dout))).set_index('ts')
- .rolling(15,center=False).mean()
- .diff()
- .rolling(15,center=False).mean()
- .dropna()
- )
- for lag in range(1,30):
- data[f'Dout_lag{lag:02d}'] = data['Dout'].shift(lag)
- data = data.dropna()
- return data
-
- def plot_diffdata(
- self,
- Dout : np.ndarray,
- Droom: np.ndarray
- ):
- plot_raw = (
- pd.DataFrame({'Dout':Dout,'Droom':Droom})
- .assign(ts=np.arange(len(Dout)))
- .pipe(gg.ggplot)
- + gg.aes(x='ts')
- + gg.geom_line(gg.aes(y='Droom'),color='red')
- + gg.geom_line(gg.aes(y='Dout'),color='blue')
- )
- plot_diff = (
- self.to_diffdata(Dout=Dout,Droom=Droom)
- .reset_index()
- .pipe(gg.ggplot)
- + gg.aes(x='ts')
- + gg.geom_line(gg.aes(y='Droom'),color='red')
- + gg.geom_line(gg.aes(y='Dout'),color='blue')
- )
- plot = (plot_raw / plot_diff) + gg.theme(figure_size=[8,6])
- return plot
-
- def plot_diffdata_lagcorr(self,Dout:np.ndarray, Droom:np.ndarray):
-
- all_coef,all_r2,best_lag,best_coef = self.get_lag_coef(Dout=Dout,Droom=Droom)
- data = pd.DataFrame({'coef':all_coef,'r2':all_r2,'lag':np.arange(1,30)})
- p1 = (
- data
- .pipe(gg.ggplot)
- + gg.aes(x='lag',y='coef')
- + gg.geom_point()
- + gg.geom_line()
- + gg.geom_vline(xintercept=best_lag,color='red',linetype='--')
- + gg.scale_x_continuous(breaks=np.arange(1,30))
- )
- p2 = (
- data
- .pipe(gg.ggplot)
- + gg.aes(x='lag',y='r2')
- + gg.geom_point()
- + gg.geom_line()
- + gg.geom_vline(xintercept=best_lag,color='red',linetype='--')
- + gg.scale_x_continuous(breaks=np.arange(1,30))
- + gg.scale_y_continuous(breaks=np.arange(0,1,0.2))
- )
- plot_line = (p1 / p2)
-
- plot_scatter = (
- self.to_diffdata(Dout=Dout,Droom=Droom)
- .reset_index()
- .melt(id_vars=['ts','Droom'],value_name='Dout_lag')
- .pipe(gg.ggplot)
- + gg.aes(x='Dout_lag',y='Droom')
- + gg.geom_point()
- + gg.facet_wrap('~variable',ncol=8)
- + gg.geom_smooth(method='lm',color='red',se=False)
- + gg.geom_vline(xintercept=0,color='grey',linetype='--')
- + gg.geom_hline(yintercept=0,color='grey',linetype='--')
- + gg.geom_abline(color='grey',linetype='--')
- )
- plot = (plot_scatter | plot_line)+ gg.theme(figure_size=[25,8])
-
- return plot
-
- def get_lag_coef(self,Dout:np.ndarray, Droom:np.ndarray):
- diffdata = self.to_diffdata(Dout=Dout,Droom=Droom)
- all_lag = np.arange(1,30)
- all_coef = []
- all_r2 = []
- for i in all_lag:
- lag = f'Dout_lag{i:02d}'
- x = diffdata.loc[:,[lag]].values
- y = diffdata.Droom.values
- lm = LinearRegression(fit_intercept=False).fit(x,y)
- coef = lm.coef_
- r2 = lm.score(x,y)
-
- all_coef.append(coef[0])
- all_r2.append(r2)
- all_coef = np.array(all_coef)
- all_r2 = np.array(all_r2)
- best_lag = all_lag[all_r2.argmax()]
- best_coef = all_coef[all_r2.argmax()]
- return all_coef,all_r2,best_lag,best_coef
-
- def smooth(y:pd.Series,frac=0.1):
- import statsmodels.api as sm
- x = np.arange(len(y))
- lowess_result = sm.nonparametric.lowess(y, x, frac=frac) # frac 控制平滑程度
- return lowess_result[:, 1]
|