import time import numpy as np import pandas as pd from .._utils.data_summary import summary_dataframe try: from executor import exec_workflow from workflowlib.utils import queryset except: pass def main(config=None): model_name = config['model_name'] model_code = config['model_code'] input_code = config['input_code'] output_code = config['output_code'] get_mod_io = config.get('get_mod_io',True) model = SystemModel(model_name=model_name,model_code=model_code,input_code=input_code,output_code=output_code,get_mod_io=get_mod_io) return model class SystemModel: def __init__( self, model_name : str = None, model_code : str = None, input_code : str = None, output_code : str = None, get_mod_io : bool = None ) -> None: self.MODEL_NAME = model_name self.MODEL_CODE = model_code self.INPUT_CODE = input_code self.OUTPUT_CODE = output_code self.GET_MOD_IO = get_mod_io self.PAST_TIME = [] # 获取画布下的系统模型组件的输入输出桩名称 if self.GET_MOD_IO == True: # 系统模型的输入和输出的point_id node = queryset.nodes().filter(code=self.MODEL_CODE).first() try: self.MODEL_INPUT_ID = [i.get('point_id') for i in node.ports_in] self.MODEL_OUTPUT_ID = [i.get('point_id') for i in node.ports_out] except: raise Exception('未获取到系统模型的输入输出ID, 需检查优化算法组件的配置是否正确') else: self.MODEL_INPUT_ID = None self.MODEL_OUTPUT_ID = None def predict(self, data:pd.DataFrame) -> pd.DataFrame: all_data = [] for i in range(data.shape[1]): try: all_data.append(data.iloc[:,[i]].astype('float64')) except Exception as E: data_i = data.iloc[:,[i]] print(f'{data_i}数据有误,不可转换为float') summary_dataframe(data,df_name='系统模型输入排查') raise Exception(E) time_start = time.time() try: output_data = exec_workflow(self.MODEL_NAME, self.INPUT_CODE, self.OUTPUT_CODE, *all_data) except Exception as E: summary_dataframe(data,df_name='系统模型输入排查') raise Exception(E) time_end = time.time() past_time = round(time_end-time_start,2) self.PAST_TIME.append(past_time) print(f'第{len(self.PAST_TIME)}次调用系统模型,本次调用时长为:{past_time}秒') output_data = output_data if isinstance(output_data,list) else [output_data] output_data = pd.concat(output_data,axis=1) return output_data def summary_past_time(self): try: print( f'SUM :{np.sum(self.PAST_TIME).round(2)} \n' f'Mean:{np.mean(self.PAST_TIME).round(2)} \n' f'SD :{np.std(self.PAST_TIME).round(2)} \n' f'MIN :{np.min(self.PAST_TIME)} \n' f'MAX :{np.max(self.PAST_TIME)} \n' f'Q25 :{np.quantile(self.PAST_TIME,0.25)} \n' f'Q75 :{np.quantile(self.PAST_TIME,0.75)} \n' ) except: pass def predict_derivate_1(self,data:pd.DataFrame,vars:list,eps=1e-4) -> pd.DataFrame: if pd.Index(data=vars).has_duplicates is True: raise Exception(f'{vars}中存在重复值') vars = list(vars) n_var = len(vars) n_row = data.shape[0] sample_index = pd.Index(np.tile(np.arange(n_row),n_var+1),dtype='str') var_index = pd.Index(np.repeat(['raw']+vars,repeats=n_row),dtype='str') index = '__'+var_index+'__'+sample_index index.name = '__VAR__SAMPLE' data = pd.DataFrame( data = np.tile(data.values,[n_var+1,1]), columns = data.columns, index = index ) for var in vars: if var not in data.columns: raise Exception(f'数据中不包含给定的列{var}') data.loc[lambda dt:dt.index.str.contains(f'__{var}__'),var] += eps pred = self.predict(data=data) pred.index = data.index all_derivate_1 = [] for var in vars: f_dx = pred.loc[lambda dt:dt.index.str.contains(f'__{var}__'),:] f_x = pred.loc[lambda dt:dt.index.str.contains(f'__raw__'),:] derivate_1 = pd.DataFrame((f_dx.values-f_x.values)/eps,index=f_dx.index,columns=pred.columns) all_derivate_1.append(derivate_1) all_derivate_1 = pd.concat(all_derivate_1,axis=0) return all_derivate_1 def predict_derivate_2(self,data:pd.DataFrame,vars:str,eps=1e-4) -> pd.DataFrame: if pd.Index(data=vars).has_duplicates is True: raise Exception(f'{vars}中存在重复值') f_x = self.predict_derivate_1(data=data,vars=vars,eps=eps) f_dx = self.predict_derivate_1(data=data+eps,vars=vars,eps=eps) all_derivate_2 = (f_dx-f_x)/eps return all_derivate_2