from typing import Union import numpy as np import pandas as pd from .opt_obj import Opt from .opt_alg import main as opt_alg_main from .constrains.build import main as build_all_constrains from ._utils.config_info import ConfigInfo from .model.diagnosis import ModelDiag def main(*args, system_model=None, config=None): config_info = ConfigInfo(config=config) group_data = config_info.get_io_group_info(io='in',type='data',data=args) group_point = config_info.get_io_group_info(io='in',type='point_id') check_var_data(group_data) # 校验数据的维度 # 组件输入一:模型变量 group_opt_var = group_data.get('优化变量') group_sys_var = group_data.get('系统变量') group_oth_var = group_data.get('其他变量') group_opt_col = group_point.get('优化变量') group_sys_col = group_point.get('系统变量') group_oth_col = group_point.get('其他变量') # 组件输入二:外部变量 group_cst_var = group_data.get('外部变量') group_cst_col = group_point.get('外部变量') # 组件输入三:动态配置项 group_dyn_cfg = group_data.get('动态配置',[None]) allow_neg_opt = group_dyn_cfg[0] # 是否需要校验负优化 # 初始化边界条件 boundary,boundary_insert = get_boundary(group_opt_var) lb,ub,var_type,var_precis,collection_map,var_map = init_boundary(boundary) opt_var = get_opt_var(group_opt_var,group_opt_col) sys_var = get_sys_var(group_sys_var,group_sys_col) oth_var = get_oth_var(group_oth_var,group_oth_col,index=opt_var.index) cst_var = get_cst_var(group_cst_var,group_cst_col,index=opt_var.index) # 初始化可行性约束,动态可行性约束需要提前解析 constrains = build_all_constrains(config) # 系统模型 if system_model is None: from .model.model import main as get_system_model system_model = get_system_model(config) # 优化对象 opt_object = Opt( lb = lb, ub = ub, varTypes = var_type, var_precis = var_precis, maxormins = 1 if config['dir_min'] is True else -1, collection_map = collection_map, var_map = var_map, boundary_insert = boundary_insert, opt_var = opt_var, sys_var = sys_var, system_model = system_model, oth_var = oth_var, cst_var = cst_var, opt_target = config['target'], constrains = constrains ) # 系统模型诊断 diag = config.get('diag_model',True) if diag == True: model_diag = ModelDiag( model = system_model, data = opt_object.current_system_x, c_var = opt_object.name_opt_vars_c, d_var = 1, ) model_diag.summary_diag() # 优化算法 # 是否允许负优化,组件的配置或输入中任意一处允许负优化,则不检查负优化 allow_neg_opt_config = config_info.get_property('allow_neg_opt',default=False) allow_neg_opt = any([allow_neg_opt,allow_neg_opt_config]) result_list = opt_alg_main(opt_obj = opt_object, allow_neg_opt = allow_neg_opt, config =config) # 输出中增加系统模型的初始输出(对应系统模型的初始输入) output_groups = config_info.get_io_group_info(io='out',type='point_id') result_list = result_append_init_sys_y(result_list,output_groups,opt_object) return result_list def check_var_data(group_data) -> None: for name,data in group_data.items(): if not isinstance(data,pd.DataFrame): continue if data.shape[0] == 1: raise Exception(f'{name}的数据行数不等于1,不满足要求,数据为{data}') def get_opt_var(group_opt_var:list,group_opt_col:list) -> pd.DataFrame: opt_var = [] for bound_info in group_opt_var: opt_var.append(bound_info['cur_val']) opt_var = pd.concat(opt_var,axis=1) opt_var.columns = group_opt_col return opt_var def get_sys_var(group_sys_var:list,group_sys_col:list) -> Union[pd.DataFrame,None]: if group_sys_var is None or len(group_sys_var)==0: print('【警告】未获取到"系统变量",确保实际不存在该变量 ') sys_var = None else: group_sys_var = [df.iloc[:,[0]] for df in group_sys_var] sys_var = pd.concat(group_sys_var,axis=1) sys_var.columns = group_sys_col return sys_var def get_oth_var(group_oth_var:list,group_oth_col:list,index) -> Union[pd.DataFrame,None]: oth_var_exist = group_oth_col is not None and len(group_oth_var) > 0 if oth_var_exist: is_oth_var_df = all([isinstance(df,pd.DataFrame) for df in group_oth_var]) if not oth_var_exist: # print('【警告】未获取到"其他变量",确保实际不存在该变量 ') oth_var = None elif is_oth_var_df and oth_var_exist: group_oth_var = [df.iloc[:,[0]] for df in group_oth_var] oth_var = pd.concat(group_oth_var,axis=1) oth_var.columns = group_oth_col elif (not is_oth_var_df) and oth_var_exist: print('【警告】未获取到有效的"其他变量",用0值代替所有该类变量 ') oth_var = pd.DataFrame( data = np.zeros_like(group_oth_col,dtype='float').reshape(1,-1), columns = group_oth_col, index = index ) return oth_var def get_cst_var(group_cst_var:list,group_cst_col:list,index) -> Union[pd.DataFrame,None]: group_cst_var = None if (group_cst_var is None) or (len(group_cst_var)==0) else group_cst_var group_cst_col = None if (group_cst_col is None) or (len(group_cst_col)==0) else group_cst_col if group_cst_var is None or len(group_cst_var) == 0: # print('【警告】未获取到"外部变量",确保实际不存在该变量 ') return None new_group_cst_var = [] # 校验数据类型,并重置Index(对齐)和Columns(便于引用) for var in group_cst_var: if not isinstance(var,pd.DataFrame): raise Exception(f'外部变量中存在错误的数据类型{type(var)}({var})') if var.shape[0] != 1: raise Exception(f'外部变量的数据形状有误{var.shape}(var)') var.index = index new_group_cst_var.append(var.iloc[:,[0]]) cst_var = pd.concat(new_group_cst_var,axis=1) cst_var.columns = group_cst_col return cst_var def get_boundary(group_opt_var:list) -> dict: boundary = {} # {'id1':boundary_info1 , 'id2':boundary_info2} boundary_insert = {} # {0:0, 1:2, 3:1} 0位置插入0条,1位置插入2条,3位置插入1条 print('-'*20+'边界条件'+'-'*20) pos_group_id = -1 for idx,bound_info in enumerate(group_opt_var): print('-'*20+ str(idx) +'-'*20) for k,v in bound_info.items(): v = v.iat[0,0] if isinstance(v,pd.DataFrame) else v print(f'{k} : {v} \n') if bound_info['id'] not in boundary.keys(): pos_group_id += 1 # 多个优化变量属于一个组内时(相同id),所有边界的信息以该组的id变量为准 boundary[bound_info['id']] = bound_info # boundary 仅保留唯一的id boundary_insert[pos_group_id] = 0 # 插入的位置:插入的数量 else: boundary_insert[pos_group_id] += 1 return boundary,boundary_insert def init_boundary(all_boundary) -> tuple: # 生成边界 lb = [] # 所有类型边界 ub = [] # 所有类型边界 var_type = [] # 所有类型边界 var_precis = [] # 区间类边界 collection_map = {} # 集合类边界/映射类边界 {'u1_name':{0:u1_a, 1:u1_b}} var_map = {} # 映射类边界 {'u1_name':{0:x1_a, 1:x1_b}} for boundary_id,boundary_info in all_boundary.items(): boundary_type = boundary_info.get('boundary_type') var_name = boundary_info.get('var_name') # 区间类型的边界 if boundary_type == 'interval': lb.append(boundary_info.get('lb')) ub.append(boundary_info.get('ub')) var_type.append(boundary_info.get('var_type')) var_precis.append(boundary_info.get('var_precis')) # 集合类型的边界 elif boundary_type == 'collection': # 序号对应的数值也应该从小到大排序 collection = boundary_info.get('collection') lb.append(0) ub.append(len(collection)-1) var_type.append(1) var_precis.append(0) collection_map[var_name] = dict(enumerate(collection)) # 映射类型边界 elif boundary_type == 'map': collection = boundary_info.get('collection') lb.append(0) ub.append(len(collection)-1) var_type.append(1) var_precis.append(0) collection_map[var_name] = dict(enumerate(collection)) var_map[var_name] = boundary_info.get('var_map') else: raise ValueError('边界类型有误') return lb,ub,var_type,var_precis,collection_map,var_map def result_append_init_sys_y(result:list,output_groups:dict,opt_obj:Opt): if '初始输出' not in output_groups.keys(): return result if len(output_groups['初始输出']) == 0: return result init_sys_y = [] for y_col in output_groups['初始输出']: #TODO 报错时提示 sys_y = opt_obj.current_system_y.loc[:,[y_col]] init_sys_y.append(sys_y) #TODO 放在最后可能不太合适,最好能指定 final_result = result + init_sys_y return final_result