| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import time
- import numpy as np
- import pandas as pd
- from .._utils.data_summary import summary_dataframe
- try:
- from executor import exec_workflow
- from workflowlib.utils import queryset
- except:
- pass
- def main(config=None):
- model_name = config['model_name']
- model_code = config['model_code']
- input_code = config['input_code']
- output_code = config['output_code']
- get_mod_io = config.get('get_mod_io',True)
- model = SystemModel(model_name=model_name,model_code=model_code,input_code=input_code,output_code=output_code,get_mod_io=get_mod_io)
- return model
- class SystemModel:
-
- def __init__(
- self,
- model_name : str = None,
- model_code : str = None,
- input_code : str = None,
- output_code : str = None,
- get_mod_io : bool = None
- ) -> None:
-
- self.MODEL_NAME = model_name
- self.MODEL_CODE = model_code
- self.INPUT_CODE = input_code
- self.OUTPUT_CODE = output_code
- self.GET_MOD_IO = get_mod_io
- self.PAST_TIME = []
-
- # 获取画布下的系统模型组件的输入输出桩名称
- if self.GET_MOD_IO == True:
-
- # 系统模型的输入和输出的point_id
- node = queryset.nodes().filter(code=self.MODEL_CODE).first()
- try:
- self.MODEL_INPUT_ID = [i.get('point_id') for i in node.ports_in]
- self.MODEL_OUTPUT_ID = [i.get('point_id') for i in node.ports_out]
- except:
- raise Exception('未获取到系统模型的输入输出ID, 需检查优化算法组件的配置是否正确')
- else:
- self.MODEL_INPUT_ID = None
- self.MODEL_OUTPUT_ID = None
-
- def predict(self, data:pd.DataFrame) -> pd.DataFrame:
- all_data = []
- for i in range(data.shape[1]):
- try:
- all_data.append(data.iloc[:,[i]].astype('float64'))
- except Exception as E:
- data_i = data.iloc[:,[i]]
- print(f'{data_i}数据有误,不可转换为float')
- summary_dataframe(data,df_name='系统模型输入排查')
- raise Exception(E)
-
- time_start = time.time()
-
- try:
- output_data = exec_workflow(self.MODEL_NAME, self.INPUT_CODE, self.OUTPUT_CODE, *all_data)
- except Exception as E:
- summary_dataframe(data,df_name='系统模型输入排查')
- raise Exception(E)
-
- time_end = time.time()
- past_time = round(time_end-time_start,2)
- self.PAST_TIME.append(past_time)
-
- print(f'第{len(self.PAST_TIME)}次调用系统模型,本次调用时长为:{past_time}秒')
- output_data = output_data if isinstance(output_data,list) else [output_data]
- output_data = pd.concat(output_data,axis=1)
- return output_data
-
- def summary_past_time(self):
- try:
- print(
- f'SUM :{np.sum(self.PAST_TIME).round(2)} \n'
- f'Mean:{np.mean(self.PAST_TIME).round(2)} \n'
- f'SD :{np.std(self.PAST_TIME).round(2)} \n'
- f'MIN :{np.min(self.PAST_TIME)} \n'
- f'MAX :{np.max(self.PAST_TIME)} \n'
- f'Q25 :{np.quantile(self.PAST_TIME,0.25)} \n'
- f'Q75 :{np.quantile(self.PAST_TIME,0.75)} \n'
- )
- except:
- pass
-
- def predict_derivate_1(self,data:pd.DataFrame,vars:list,eps=1e-4) -> pd.DataFrame:
-
- if pd.Index(data=vars).has_duplicates is True:
- raise Exception(f'{vars}中存在重复值')
-
- vars = list(vars)
- n_var = len(vars)
- n_row = data.shape[0]
-
- sample_index = pd.Index(np.tile(np.arange(n_row),n_var+1),dtype='str')
- var_index = pd.Index(np.repeat(['raw']+vars,repeats=n_row),dtype='str')
- index = '__'+var_index+'__'+sample_index
- index.name = '__VAR__SAMPLE'
-
- data = pd.DataFrame(
- data = np.tile(data.values,[n_var+1,1]),
- columns = data.columns,
- index = index
- )
-
- for var in vars:
- if var not in data.columns:
- raise Exception(f'数据中不包含给定的列{var}')
- data.loc[lambda dt:dt.index.str.contains(f'__{var}__'),var] += eps
-
- pred = self.predict(data=data)
- pred.index = data.index
-
- all_derivate_1 = []
- for var in vars:
- f_dx = pred.loc[lambda dt:dt.index.str.contains(f'__{var}__'),:]
- f_x = pred.loc[lambda dt:dt.index.str.contains(f'__raw__'),:]
- derivate_1 = pd.DataFrame((f_dx.values-f_x.values)/eps,index=f_dx.index,columns=pred.columns)
- all_derivate_1.append(derivate_1)
- all_derivate_1 = pd.concat(all_derivate_1,axis=0)
-
- return all_derivate_1
- def predict_derivate_2(self,data:pd.DataFrame,vars:str,eps=1e-4) -> pd.DataFrame:
-
- if pd.Index(data=vars).has_duplicates is True:
- raise Exception(f'{vars}中存在重复值')
-
- f_x = self.predict_derivate_1(data=data,vars=vars,eps=eps)
- f_dx = self.predict_derivate_1(data=data+eps,vars=vars,eps=eps)
- all_derivate_2 = (f_dx-f_x)/eps
-
- return all_derivate_2
|