| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import numpy as np
- import pandas as pd
- import pymc as pm
- from sklearn.metrics import r2_score,mean_absolute_error,mean_absolute_percentage_error
- def record(name,var):
- pm.Deterministic(f'{name}_mu',var)
- def observe(name,var,observed,sigma=1):
- if isinstance(observed,pd.DataFrame):
- observed = observed.loc[:,name].values
- mu = pm.Deterministic(f'{name}_mu',var)
- sigma = pm.HalfNormal(f'{name}_sigma',sigma=sigma)
- pm.Normal(name,mu=mu,sigma=sigma,observed=observed)
- def reorder_posterior(prior:dict,posterior:dict):
- param_posterior_reorder = {'F_air':{}}
- for equp_name in prior.keys():
- param_posterior_reorder.setdefault(equp_name,{})
- for param_name,param_value in posterior.items():
- if '__' in param_name:
- continue
- if param_name == 'F_air_val_rw':
- param_value = np.median(param_value[-5:])
- if param_name.startswith(equp_name):
- param_name_adj = param_name.replace(f'{equp_name}_','')
- param_posterior_reorder[equp_name][param_name_adj] = param_value
- return param_posterior_reorder
- def get_fitted_result(
- posterior : dict,
- observed_data: pd.DataFrame,
- plot_TVP : bool
- ) -> tuple:
- # 样本内预测数据
- TVP_data = []
- for param_name in posterior.keys():
- if param_name.replace('_mu','') not in observed_data.columns:
- continue
- TVP_data.append(
- pd.DataFrame(
- {
- 'param_name': param_name.replace('_mu',''),
- 'real' : observed_data.loc[:,param_name.replace('_mu','')].values,
- 'pred' : posterior[param_name]
- }
- )
- )
- TVP_data = pd.concat(TVP_data,axis=0)
-
- group_by_data = TVP_data.groupby(['param_name'])[['pred','real']]
- TVP_metric = (
- pd.concat(
- [
- group_by_data.apply(lambda dt:r2_score(dt.real,dt.pred)),
- group_by_data.apply(lambda dt:mean_absolute_error(dt.real,dt.pred)),
- group_by_data.apply(lambda dt:mean_absolute_percentage_error(dt.real,dt.pred)),
- ],
- axis=1
- )
- .set_axis(['R2','MAE','MAPE'],axis=1)
- .sort_values(by='R2',ascending=True)
- )
-
- if plot_TVP:
- import plotnine as gg
- plot = (
- TVP_data
- .pipe(gg.ggplot)
- + gg.aes(x='real',y='pred')
- + gg.geom_point()
- + gg.facet_wrap(facets='param_name',scales='free')
- + gg.geom_abline(intercept=0,slope=1,color='red')
- + gg.theme(figure_size=[10,10])
- )
- plot.show()
-
- return TVP_data,TVP_metric
|