import numpy as np import pandas as pd import geatpy as ea from ._utils.config_info import ConfigInfo from .opt_obj import Opt All_Algorithm = { 'soea_DE_rand_1_L_templet':{ 'templet': ea.soea_DE_rand_1_L_templet, 'param' : {'F':0.6,'Cr':0.85,'L':15} }, 'soea_DE_currentToBest_1_L_templet':{ 'templet': ea.soea_DE_currentToBest_1_L_templet, 'param' : {'F':0.6,'Cr':0.85,'L':15} }, 'soea_DE_best_1_L_templet':{ 'templet': ea.soea_DE_best_1_L_templet, 'param' : {'F':0.5,'Cr':0.8,'L':15} }, } def main(opt_obj:Opt,allow_neg_opt:bool,config=None) -> list: config_info = ConfigInfo(config) # 组件配置参数检查 config_info.check_property_exist({'NIND':'种群数量','MAXGEN':'迭代次数'}) # 种群参数 Encoding = 'RI' NIND = config['NIND'] Field = ea.crtfld( Encoding = Encoding, varTypes = opt_obj.varTypes, ranges = opt_obj.ranges, borders = opt_obj.borders, precisions = opt_obj.var_precis ) population = ea.Population(Encoding,Field,NIND) # 算法参数 templete = All_Algorithm[config['algorithm']]['templet'] param = All_Algorithm[config['algorithm']]['param'] Algorithm = templete(opt_obj,population) Algorithm.MAXGEN = config['MAXGEN'] # 复杂问题需增加代数。 Algorithm.F = param['F'] # 0.5~0.8 增大 F 增强全局搜索,但可能降低收敛速度。 Algorithm.Cr = param['Cr'] # 0.8~1.0 高 Cr 适合强相关变量,低 Cr 增强多样性。 Algorithm.L = param['L'] # 5~20 变量相关性越强,L 应越大。高维问题可设为变量维度的 10%~20%。 Algorithm.logTras = int(config['MAXGEN']/10) Algorithm.verbose = False Algorithm.drawing = 0 [bestindi,pop] = Algorithm.run() # 检查是否有可行解 #TODO 设定没有可行解时取当前解 if len(bestindi) == 0: print_cv(pop.CV,opt_obj.constrains,opt_obj.cur_sys_constrains_cfl) raise Exception('没有可行解') opt_system_x = get_opt_system_x(opt_obj,bestindi) # 完整的系统模型输入 opt_var = opt_system_x.loc[:,opt_obj.name_opt_vars] # 优化变量 sys_before_after_output = get_sys_output_before_after(opt_obj,opt_system_x) # 优化前后系统模型输出 opt_var_before_after = get_opt_var_before_after(opt_obj,opt_var) # 优化前后系统模型输入(优化变量) # 打印优化后的结果对比 alg_log = print_opt_result(Algorithm,bestindi,opt_obj,sys_before_after_output,opt_var_before_after) # 检查本次优化是否为负优化 check_neg_opt(opt_obj,sys_before_after_output,allow_neg_opt) # 组件输出并校验 process_list = [alg_log,opt_var_before_after,sys_before_after_output] result_list = process_list + config_info.split_df(method='id',df=opt_var,by_group='优化变量') return result_list def print_opt_result( Algorithm, bestindi, opt_obj : Opt, sys_before_after_output: pd.DataFrame, opt_var_before_after : pd.DataFrame ) -> pd.DataFrame: print('+'*20+' 优化结果 '+'+'*20) print('\n算法迭代过程:') with pd.option_context('display.max_rows', None): with pd.option_context('display.max_columns', None): alg_log = pd.DataFrame(Algorithm.log).round(2) print(alg_log) print(f'\n评价次数 : {Algorithm.evalsNum}') print(f'所用时间 : {round(Algorithm.passTime,2)}秒') print(f'最优的目标函数值({opt_obj.opt_target}) : {round(bestindi.ObjV[0][0],4)}') # print('\n最后一次迭代的系统模型输出:') # print(opt_obj.data_y.round(4).drop_duplicates().T) print('\n优化前后控制变量的对比:') print(opt_var_before_after) print('\n优化前后系统模型输出的结果对比:') # 优化后系统模型的结果不唯一,取离优化目标最接近的结果 print(sys_before_after_output) print('\n系统模型的初始输入违背约束条件的情况:') print_sys_init_constrains(opt_obj.cur_sys_constrains_cfl) print(' \n系统模型调用时长统计:') opt_obj.system_model.summary_past_time() print('\n'+'+'*50) return alg_log def print_cv( cv : np.ndarray, constrains : list, cur_sys_constrains_cfl: dict ) -> None: #TODO 增加每一次迭代的CV结果 # 最后一次迭代的CV print('每一个约束条件最后一次迭代的CV(大于0表明不满足约束条件):') last_cv = [ cv.mean(axis=0), cv.max(axis=0), cv.min(axis=0), (cv<0).sum(axis=0), (cv<0).mean(axis=0), ] last_cv = pd.DataFrame( data=np.vstack(last_cv).T, columns=['Mean_CV','Max_CV','Min_CV','Count_CV<0','Pct_CV<0'] ) print(last_cv) #TODO 联合各个约束条件检查CV print('\n对应以下约束条件:') for idx,cst in enumerate(constrains): print(f'{idx} : {cst}<0') print('\n系统模型的初始输入违背约束条件的情况:') print_sys_init_constrains(cur_sys_constrains_cfl) def get_opt_system_x(opt_obj:Opt,bestindi) -> pd.DataFrame: # 优化变量的优化结果 data = opt_obj.insert_sync_var(bestindi.Phen.reshape(1,-1)) opt_var = pd.DataFrame( data = data, columns = opt_obj.name_opt_vars, index = opt_obj.index ) system_x = pd.concat([opt_var,opt_obj.sys_var,opt_obj.oth_var],axis=1) # 将优化结果中映射边界的对应的变量进行还原 system_x = opt_obj.restore_data(system_x) return system_x def get_sys_output_before_after(opt_obj:Opt,system_x): # 优化前后,系统模型输出的变化 opt_system_y = opt_obj.system_model.predict(system_x) sys_before_after_output = ( pd.concat([ opt_obj.current_system_y, opt_system_y ],axis=0) .transpose() .set_axis(['Before','After'],axis=1) .assign( Diff = lambda dt:dt.After - dt.Before, Diff_pct = lambda dt:((dt.After - dt.Before)/dt.Before * 100).round(2).astype('str') + '%' ) .round(2) ) return sys_before_after_output def get_opt_var_before_after(opt_obj,opt_var): # 优化前后,优化变量的变化 result_opt = ( pd.concat([opt_obj.opt_var,opt_var],axis=0) .reset_index(drop=True) .transpose() .set_axis(['Before','After'],axis=1) .assign( Diff = lambda dt:dt.After - dt.Before, Diff_pct = lambda dt:((dt.After - dt.Before)/dt.Before * 100).round(2).astype('str') + '%' ) .round(2) ) return result_opt def check_neg_opt(opt_obj,sys_before_after_output,allow_neg_opt): if allow_neg_opt is None: allow_neg_opt = False if allow_neg_opt is True: return None if not isinstance(allow_neg_opt,bool): raise Exception(f'判断是否需要校验负优化的输入不是布尔值({type(allow_neg_opt)})') # 检查本次优化是否为负优化 raw_pred = sys_before_after_output.at[opt_obj.opt_target,'Before'] opt_pred = sys_before_after_output.at[opt_obj.opt_target,'After'] neg_min_opt = (opt_obj.maxormins[0] == 1) and (opt_pred > raw_pred) neg_max_opt = (opt_obj.maxormins[0] == -1) and (opt_pred < raw_pred) if neg_min_opt or neg_max_opt: raise Exception(f'出现负优化,当前值为{raw_pred},优化后为{opt_pred}') return None def print_sys_init_constrains(cur_sys_constrains_cfl:dict): # 系统模型的初始输入违背约束条件的情况 for cst_type,var_message in cur_sys_constrains_cfl.items(): # 约束类型(边界/可行性) print(cst_type.upper()) if isinstance(var_message,dict): for var_name,message in var_message.items(): print(var_name+' : '+message) if isinstance(var_message,list): for message in var_message: print(message)