| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import numpy as np
- import pandas as pd
- import geatpy as ea
- from .model.model import SystemModel
- from .constrains.parse import parse_constrains
- from .constrains.execute import execute_constrains
- class Opt(ea.Problem):
- def __init__(
- self,
- lb : list,
- ub : list,
- varTypes : list,
- var_precis : list,
- maxormins : int,
- collection_map : dict,
- var_map : dict,
- boundary_insert: dict,
- opt_var : pd.DataFrame,
- sys_var : pd.DataFrame,
- system_model : SystemModel,
- oth_var : pd.DataFrame,
- cst_var : pd.DataFrame,
- opt_target : str,
- constrains : list=[]
- ):
- name = 'Opt'
- M = 1
- maxormins = [maxormins] # 1:最小化该目标;-1:最大化该目标
- Dim = len(lb)
- lb = lb
- ub = ub
- varTypes = varTypes # 变量的类型 0为连续变量,1为离散变量
-
- ea.Problem.__init__(
- self,
- name = name,
- M = M,
- maxormins = maxormins,
- Dim = Dim,
- varTypes = varTypes,
- lb = lb,
- ub = ub
- )
-
- self.lb = lb
- self.up = ub
- self.opt_var = opt_var #DF
- self.sys_var = sys_var #DF
- self.oth_var = oth_var #DF
- self.cst_var = cst_var #DF
- self.name_opt_vars = opt_var.columns.to_list()
- self.system_model = system_model
- self.opt_target = opt_target # 优化目标
- self.constrains = constrains # 约束
- self.collection_map = collection_map # 集合类型/映射类型的边界映射 {'u1':{0:1.5,1:2.3},'u2':{0:1.1,1:3.2}}
- self.var_map = var_map # 映射类型的边界的优化变量与系统变量之间的映射关系 {'u1':{'x1':{1:0.1,2:0.4},'x2':{2:0.1,3:0.4}}}
- self.var_precis = var_precis
- self.boundary_insert = boundary_insert
- self.name_opt_vars_c = self.get_name_opt_vars_c(varTypes) # 属于连续变量的优化变量
-
- self.index = opt_var.index # 用于给优化算法组件的输出的index赋值
-
- self.current_system_x = self.get_current_system_x()
- self.current_system_y = self.get_current_system_y() # 基于当前值的预测,因此并不能够反映真实值,会受模型误差的影响
- self.current_system_x_y = pd.concat([self.current_system_y,self.current_system_x],axis=1)
- # 系统模型中的x和y可能会出现重复的名称,这种情况只允许出现在x的【其他变量】中
- # 约束的时候会将这部分x从输入中剔除,避免重复索引,所以约束变量不允许使用【其他变量】中的变量
- oth_var_col = self.oth_var.columns if isinstance(oth_var,pd.DataFrame) else []
- self.current_system_x_y_cst = pd.concat([self.current_system_y,self.current_system_x.drop(oth_var_col,axis=1)],axis=1)
-
- self.constrains_var = pd.concat([self.current_system_x_y_cst, self.cst_var],axis=1)
- self.constrains = parse_constrains(self.constrains,self.constrains_var)
-
- # 校验当前值是否违反边界约束,若违反则可能会使得结果出现负优化的现象
- # 仅用于参考,因为模拟量不在约束范围内可能是由于设备未开启导致的,因此即使出现该现象,也不一定导致负优化
- # 该负优化现象并不是由算法失效导致的,而是由于不合理的边界或可行性约束导致的
- self.cur_sys_constrains_cfl = self.get_cur_sys_constrains_cfl()
- def get_name_opt_vars_c(self,varTypes):
- # 从变量名称中提取属于连续变量的名称
- complete_var_types = np.array(self.insert_sync_var(varTypes,flatten=True))
- complete_var_names = np.array(self.name_opt_vars)
- is_var_continus = complete_var_types == 0
- name_opt_vars_c = complete_var_names[is_var_continus]
- return name_opt_vars_c
-
- def check_data_and_system_input(self,data_input:list) -> None:
-
- # 检查系统模型和优化组件的输入
- system_model_input = self.system_model.MODEL_INPUT_ID
- if system_model_input is None:
- return None
- #TODO 当不校验系统输入的名称时,也需要校验长度
-
- # 检查是否有多或少
- input_diff = set(data_input).symmetric_difference(system_model_input)
- if len(input_diff) !=0:
- raise Exception(f'系统模型的输入与优化组件的输入不匹配,差异的输入为{input_diff},其中系统模型的输入为{system_model_input},优化组件的输入为{data_input}')
-
- # 检查顺序,若顺序有差异,输出系统模型的输入
- if data_input != system_model_input:
- raise Exception(f'系统模型和优化组件输入的顺序有差异,其中系统模型的输入为{system_model_input},优化组件的输入为{data_input}')
-
- def get_current_system_x(self):
- data_x_current = pd.concat([self.opt_var,self.sys_var,self.oth_var],axis=1)
- self.check_data_and_system_input(data_x_current.columns.to_list())
-
- # 数据校验
- if data_x_current.shape[0] != 1:
- print('系统模型的初始输入数据')
- print(data_x_current)
- raise Exception('系统模型的初始输入数据的行维度不等于1,检查输入数据')
-
- return data_x_current
-
- def get_current_system_y(self):
- data_y_current = self.system_model.predict(self.current_system_x)
-
- print(f'系统模型的输出变量:{data_y_current.columns.to_list()}')
-
- ## 校验系统模型的输出是否符合预期
-
- # 目标变量存在性校验
- if self.opt_target not in data_y_current.columns:
- raise ValueError(f'未在系统模型的输出中找到优化目标{self.opt_target},需检查系统模型优化的输出{data_y_current.columns.to_list()}')
-
- # 目标变量唯一性校验
- if isinstance(data_y_current.loc[:,self.opt_target],pd.DataFrame):
- raise ValueError(f'在系统模型的输出中找到多个相同名称的优化目标{self.opt_target},需检查系统模型的输出')
-
- return data_y_current
-
- def get_cur_sys_constrains_cfl(self) -> dict:
- """
- 获取与当前值违背的约束条件及边界条件
- """
- conflicts = {
- 'boundary_int': {}, # 区间边界
- 'boundary_col': {}, # 集合及映射边界
- 'constrains' : []
- }
-
- # 边界检查
- for idx,name in enumerate(self.name_opt_vars):
- cur_value = self.current_system_x.loc[:,[name]].iat[0,0]
-
- # 集合与映射边界的优化变量
- if name in self.collection_map.keys():
- collection = self.collection_map[name].values()
- opt_ub = max(collection)
- opt_lb = min(collection)
- if cur_value not in collection:
- conflicts['boundary_col'][name] = '优化变量不在集合内,但未超出边界范围'
-
- if (cur_value > opt_ub) or (cur_value < opt_lb):
- message = f'优化变量不在集合内,且超出边界范围,当前值={cur_value},上边界={opt_ub},下边界={opt_lb}'
- conflicts['boundary_col'][name] = message
-
- # 区间边界的优化变量
- else:
- opt_ub = self.insert_sync_var(self.ub,flatten=True)[idx]
- opt_lb = self.insert_sync_var(self.lb,flatten=True)[idx]
- if (cur_value > opt_ub) or (cur_value < opt_lb):
- message = f'超出边界范围,当前值={round(cur_value,2)},上边界={opt_ub},下边界={opt_lb}'
- conflicts['boundary_int'][name] = message
-
- # 可行性检查
- for cst in self.constrains:
- cv = execute_constrains(self.current_system_x_y_cst,cst).flatten()[0]
- if cv > 0:
- conflicts['constrains'].append(cst)
-
- return conflicts
-
- def restore_data(self,data_x,step=['var_map','col_map']):
- # 处理映射类型边界的系统变量(由于依赖于离散变量的序号,因此该步必须先于离散变量还原以前进行)
- # 根据优化变量, 还原系统变量
- if (len(self.var_map) > 0) and ('var_map' in step):
- for var_opt_name,var_map_dict in self.var_map.items():
- for var_sys_name,var_sys_map in var_map_dict.items():
- data_x[var_sys_name] = data_x[var_opt_name].astype(int).map(var_sys_map,na_action='ignore')
-
- # 处理边界(集合型/映射型),在优化算法组件中需将这部分变量还原
- # 还原优化变量
- if (len(self.collection_map) > 0) and ('col_map' in step):
- for var_name,map_dict in self.collection_map.items():
- data_x[var_name] = data_x[var_name].astype('int').map(map_dict,na_action='ignore')
-
- return data_x
-
- def insert_sync_var(self,data:np.ndarray,flatten=False) -> np.ndarray:
- cum_num = 0
- for pos,num in self.boundary_insert.items():
- if num > 0:
- cur_pos = cum_num + pos
-
- if flatten == False:
- insert_array = np.repeat(data[:,[cur_pos]],repeats=num,axis=1)
- data = np.insert(data,[cur_pos],insert_array,axis=1)
- elif flatten == True:
- insert_array = np.repeat(data[cur_pos],repeats=num)
- data = np.insert(data,cur_pos,insert_array)
- cum_num += num
-
- return data
-
- def aimFunc(self, pop):
- Vars = pop.Phen
- Vars = self.insert_sync_var(Vars)
- N = Vars.shape[0]
-
- # 汇总系统模型的输入
- data_opt = pd.DataFrame(data=Vars,columns=self.name_opt_vars)
- data_sys = data_copy_row(self.sys_var,N_row=N)
- data_oth = data_copy_row(self.oth_var,N_row=N)
- data_x = pd.concat([data_opt,data_sys,data_oth],axis=1)
-
- # 待解决问题
- # 输入给系统模型的Index可能会包含时间信息参与计算,因此需要输入
- # 但是输入相同的时间index可能会导致系统模型报错(例如出现reindex的操作)
- # data_x.index = self.index.repeat(N)
-
- # 根据边界条件将数据还原
- data_x = self.restore_data(data_x)
-
- self.data_y = self.system_model.predict(data_x)
- pop.ObjV = self.data_y.loc[:,[self.opt_target]].to_numpy().reshape(-1,1)
- data = pd.concat([data_x,self.data_y],axis=1)
-
- # 设定约束条件
- if self.constrains is None:
- return
- if len(self.constrains)==0:
- return
- cv = []
- for cst in self.constrains:
- cv.append(execute_constrains(data,cst))
- pop.CV = np.hstack(cv)
-
- report_cv_info(cv=pop.CV,constrains=self.constrains)
-
- def data_copy_row(data:pd.DataFrame,N_row:int):
- if data is None:
- return None
- result = np.ones([N_row,len(data.columns)]) * data.to_numpy()
- result = pd.DataFrame(data=result,columns=data.columns)
- return result
- def report_cv_info(cv:np.ndarray,constrains:list):
- suit_cv = cv<=0
- suit_pop = suit_cv.all(axis=1)
- not_suit_cst = suit_cv.mean(axis=0)<0.7
- not_suit_cst_idx = np.arange(cv.shape[1])[not_suit_cst]
- not_suit_cst_pct = suit_cv.mean(axis=0)[not_suit_cst].round(2)
-
- print(f'本次迭代可行种群数为{suit_pop.sum()},占比{(suit_pop.mean()*100).round(2)}%')
- if len(not_suit_cst_idx) > 0:
- print(f'满足以下约束条件的种群数量小于70%, 共{not_suit_cst.sum()}条约束, 可能会影响优化性能')
- for idx,pct in dict(zip(not_suit_cst_idx,not_suit_cst_pct)).items():
- cst = constrains[idx]
- print(f'序号:{idx} \t 约束满足比例:{round(pct*100,2)}% \t 约束:{cst}')
|