import numpy as np import pandas as pd import geatpy as ea from .model.model import SystemModel from .constrains.parse import parse_constrains from .constrains.execute import execute_constrains class Opt(ea.Problem): def __init__( self, lb : list, ub : list, varTypes : list, var_precis : list, maxormins : int, collection_map : dict, var_map : dict, boundary_insert: dict, opt_var : pd.DataFrame, sys_var : pd.DataFrame, system_model : SystemModel, oth_var : pd.DataFrame, cst_var : pd.DataFrame, opt_target : str, constrains : list=[] ): name = 'Opt' M = 1 maxormins = [maxormins] # 1:最小化该目标;-1:最大化该目标 Dim = len(lb) lb = lb ub = ub varTypes = varTypes # 变量的类型 0为连续变量,1为离散变量 ea.Problem.__init__( self, name = name, M = M, maxormins = maxormins, Dim = Dim, varTypes = varTypes, lb = lb, ub = ub ) self.lb = lb self.up = ub self.opt_var = opt_var #DF self.sys_var = sys_var #DF self.oth_var = oth_var #DF self.cst_var = cst_var #DF self.name_opt_vars = opt_var.columns.to_list() self.system_model = system_model self.opt_target = opt_target # 优化目标 self.constrains = constrains # 约束 self.collection_map = collection_map # 集合类型/映射类型的边界映射 {'u1':{0:1.5,1:2.3},'u2':{0:1.1,1:3.2}} self.var_map = var_map # 映射类型的边界的优化变量与系统变量之间的映射关系 {'u1':{'x1':{1:0.1,2:0.4},'x2':{2:0.1,3:0.4}}} self.var_precis = var_precis self.boundary_insert = boundary_insert self.name_opt_vars_c = self.get_name_opt_vars_c(varTypes) # 属于连续变量的优化变量 self.index = opt_var.index # 用于给优化算法组件的输出的index赋值 self.current_system_x = self.get_current_system_x() self.current_system_y = self.get_current_system_y() # 基于当前值的预测,因此并不能够反映真实值,会受模型误差的影响 self.current_system_x_y = pd.concat([self.current_system_y,self.current_system_x],axis=1) # 系统模型中的x和y可能会出现重复的名称,这种情况只允许出现在x的【其他变量】中 # 约束的时候会将这部分x从输入中剔除,避免重复索引,所以约束变量不允许使用【其他变量】中的变量 oth_var_col = self.oth_var.columns if isinstance(oth_var,pd.DataFrame) else [] self.current_system_x_y_cst = pd.concat([self.current_system_y,self.current_system_x.drop(oth_var_col,axis=1)],axis=1) self.constrains_var = pd.concat([self.current_system_x_y_cst, self.cst_var],axis=1) self.constrains = parse_constrains(self.constrains,self.constrains_var) # 校验当前值是否违反边界约束,若违反则可能会使得结果出现负优化的现象 # 仅用于参考,因为模拟量不在约束范围内可能是由于设备未开启导致的,因此即使出现该现象,也不一定导致负优化 # 该负优化现象并不是由算法失效导致的,而是由于不合理的边界或可行性约束导致的 self.cur_sys_constrains_cfl = self.get_cur_sys_constrains_cfl() def get_name_opt_vars_c(self,varTypes): # 从变量名称中提取属于连续变量的名称 complete_var_types = np.array(self.insert_sync_var(varTypes,flatten=True)) complete_var_names = np.array(self.name_opt_vars) is_var_continus = complete_var_types == 0 name_opt_vars_c = complete_var_names[is_var_continus] return name_opt_vars_c def check_data_and_system_input(self,data_input:list) -> None: # 检查系统模型和优化组件的输入 system_model_input = self.system_model.MODEL_INPUT_ID if system_model_input is None: return None #TODO 当不校验系统输入的名称时,也需要校验长度 # 检查是否有多或少 input_diff = set(data_input).symmetric_difference(system_model_input) if len(input_diff) !=0: raise Exception(f'系统模型的输入与优化组件的输入不匹配,差异的输入为{input_diff},其中系统模型的输入为{system_model_input},优化组件的输入为{data_input}') # 检查顺序,若顺序有差异,输出系统模型的输入 if data_input != system_model_input: raise Exception(f'系统模型和优化组件输入的顺序有差异,其中系统模型的输入为{system_model_input},优化组件的输入为{data_input}') def get_current_system_x(self): data_x_current = pd.concat([self.opt_var,self.sys_var,self.oth_var],axis=1) self.check_data_and_system_input(data_x_current.columns.to_list()) # 数据校验 if data_x_current.shape[0] != 1: print('系统模型的初始输入数据') print(data_x_current) raise Exception('系统模型的初始输入数据的行维度不等于1,检查输入数据') return data_x_current def get_current_system_y(self): data_y_current = self.system_model.predict(self.current_system_x) print(f'系统模型的输出变量:{data_y_current.columns.to_list()}') ## 校验系统模型的输出是否符合预期 # 目标变量存在性校验 if self.opt_target not in data_y_current.columns: raise ValueError(f'未在系统模型的输出中找到优化目标{self.opt_target},需检查系统模型优化的输出{data_y_current.columns.to_list()}') # 目标变量唯一性校验 if isinstance(data_y_current.loc[:,self.opt_target],pd.DataFrame): raise ValueError(f'在系统模型的输出中找到多个相同名称的优化目标{self.opt_target},需检查系统模型的输出') return data_y_current def get_cur_sys_constrains_cfl(self) -> dict: """ 获取与当前值违背的约束条件及边界条件 """ conflicts = { 'boundary_int': {}, # 区间边界 'boundary_col': {}, # 集合及映射边界 'constrains' : [] } # 边界检查 for idx,name in enumerate(self.name_opt_vars): cur_value = self.current_system_x.loc[:,[name]].iat[0,0] # 集合与映射边界的优化变量 if name in self.collection_map.keys(): collection = self.collection_map[name].values() opt_ub = max(collection) opt_lb = min(collection) if cur_value not in collection: conflicts['boundary_col'][name] = '优化变量不在集合内,但未超出边界范围' if (cur_value > opt_ub) or (cur_value < opt_lb): message = f'优化变量不在集合内,且超出边界范围,当前值={cur_value},上边界={opt_ub},下边界={opt_lb}' conflicts['boundary_col'][name] = message # 区间边界的优化变量 else: opt_ub = self.insert_sync_var(self.ub,flatten=True)[idx] opt_lb = self.insert_sync_var(self.lb,flatten=True)[idx] if (cur_value > opt_ub) or (cur_value < opt_lb): message = f'超出边界范围,当前值={round(cur_value,2)},上边界={opt_ub},下边界={opt_lb}' conflicts['boundary_int'][name] = message # 可行性检查 for cst in self.constrains: cv = execute_constrains(self.current_system_x_y_cst,cst).flatten()[0] if cv > 0: conflicts['constrains'].append(cst) return conflicts def restore_data(self,data_x,step=['var_map','col_map']): # 处理映射类型边界的系统变量(由于依赖于离散变量的序号,因此该步必须先于离散变量还原以前进行) # 根据优化变量, 还原系统变量 if (len(self.var_map) > 0) and ('var_map' in step): for var_opt_name,var_map_dict in self.var_map.items(): for var_sys_name,var_sys_map in var_map_dict.items(): data_x[var_sys_name] = data_x[var_opt_name].astype(int).map(var_sys_map,na_action='ignore') # 处理边界(集合型/映射型),在优化算法组件中需将这部分变量还原 # 还原优化变量 if (len(self.collection_map) > 0) and ('col_map' in step): for var_name,map_dict in self.collection_map.items(): data_x[var_name] = data_x[var_name].astype('int').map(map_dict,na_action='ignore') return data_x def insert_sync_var(self,data:np.ndarray,flatten=False) -> np.ndarray: cum_num = 0 for pos,num in self.boundary_insert.items(): if num > 0: cur_pos = cum_num + pos if flatten == False: insert_array = np.repeat(data[:,[cur_pos]],repeats=num,axis=1) data = np.insert(data,[cur_pos],insert_array,axis=1) elif flatten == True: insert_array = np.repeat(data[cur_pos],repeats=num) data = np.insert(data,cur_pos,insert_array) cum_num += num return data def aimFunc(self, pop): Vars = pop.Phen Vars = self.insert_sync_var(Vars) N = Vars.shape[0] # 汇总系统模型的输入 data_opt = pd.DataFrame(data=Vars,columns=self.name_opt_vars) data_sys = data_copy_row(self.sys_var,N_row=N) data_oth = data_copy_row(self.oth_var,N_row=N) data_x = pd.concat([data_opt,data_sys,data_oth],axis=1) # 待解决问题 # 输入给系统模型的Index可能会包含时间信息参与计算,因此需要输入 # 但是输入相同的时间index可能会导致系统模型报错(例如出现reindex的操作) # data_x.index = self.index.repeat(N) # 根据边界条件将数据还原 data_x = self.restore_data(data_x) self.data_y = self.system_model.predict(data_x) pop.ObjV = self.data_y.loc[:,[self.opt_target]].to_numpy().reshape(-1,1) data = pd.concat([data_x,self.data_y],axis=1) # 设定约束条件 if self.constrains is None: return if len(self.constrains)==0: return cv = [] for cst in self.constrains: cv.append(execute_constrains(data,cst)) pop.CV = np.hstack(cv) report_cv_info(cv=pop.CV,constrains=self.constrains) def data_copy_row(data:pd.DataFrame,N_row:int): if data is None: return None result = np.ones([N_row,len(data.columns)]) * data.to_numpy() result = pd.DataFrame(data=result,columns=data.columns) return result def report_cv_info(cv:np.ndarray,constrains:list): suit_cv = cv<=0 suit_pop = suit_cv.all(axis=1) not_suit_cst = suit_cv.mean(axis=0)<0.7 not_suit_cst_idx = np.arange(cv.shape[1])[not_suit_cst] not_suit_cst_pct = suit_cv.mean(axis=0)[not_suit_cst].round(2) print(f'本次迭代可行种群数为{suit_pop.sum()},占比{(suit_pop.mean()*100).round(2)}%') if len(not_suit_cst_idx) > 0: print(f'满足以下约束条件的种群数量小于70%, 共{not_suit_cst.sum()}条约束, 可能会影响优化性能') for idx,pct in dict(zip(not_suit_cst_idx,not_suit_cst_pct)).items(): cst = constrains[idx] print(f'序号:{idx} \t 约束满足比例:{round(pct*100,2)}% \t 约束:{cst}')