| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from typing import Union
- import numpy as np
- import pandas as pd
- from .opt_obj import Opt
- from .opt_alg import main as opt_alg_main
- from .constrains.build import main as build_all_constrains
- from ._utils.config_info import ConfigInfo
- from .model.diagnosis import ModelDiag
- def main(*args, system_model=None, config=None):
- config_info = ConfigInfo(config=config)
-
- group_data = config_info.get_io_group_info(io='in',type='data',data=args)
- group_point = config_info.get_io_group_info(io='in',type='point_id')
- check_var_data(group_data) # 校验数据的维度
-
- # 组件输入一:模型变量
- group_opt_var = group_data.get('优化变量')
- group_sys_var = group_data.get('系统变量')
- group_oth_var = group_data.get('其他变量')
-
- group_opt_col = group_point.get('优化变量')
- group_sys_col = group_point.get('系统变量')
- group_oth_col = group_point.get('其他变量')
-
- # 组件输入二:外部变量
- group_cst_var = group_data.get('外部变量')
- group_cst_col = group_point.get('外部变量')
-
- # 组件输入三:动态配置项
- group_dyn_cfg = group_data.get('动态配置',[None])
- allow_neg_opt = group_dyn_cfg[0] # 是否需要校验负优化
-
- # 初始化边界条件
- boundary,boundary_insert = get_boundary(group_opt_var)
- lb,ub,var_type,var_precis,collection_map,var_map = init_boundary(boundary)
-
- opt_var = get_opt_var(group_opt_var,group_opt_col)
- sys_var = get_sys_var(group_sys_var,group_sys_col)
- oth_var = get_oth_var(group_oth_var,group_oth_col,index=opt_var.index)
- cst_var = get_cst_var(group_cst_var,group_cst_col,index=opt_var.index)
-
- # 初始化可行性约束,动态可行性约束需要提前解析
- constrains = build_all_constrains(config)
-
- # 系统模型
- if system_model is None:
- from .model.model import main as get_system_model
- system_model = get_system_model(config)
-
- # 优化对象
- opt_object = Opt(
- lb = lb,
- ub = ub,
- varTypes = var_type,
- var_precis = var_precis,
- maxormins = 1 if config['dir_min'] is True else -1,
- collection_map = collection_map,
- var_map = var_map,
- boundary_insert = boundary_insert,
- opt_var = opt_var,
- sys_var = sys_var,
- system_model = system_model,
- oth_var = oth_var,
- cst_var = cst_var,
- opt_target = config['target'],
- constrains = constrains
- )
-
- # 系统模型诊断
- diag = config.get('diag_model',True)
- if diag == True:
- model_diag = ModelDiag(
- model = system_model,
- data = opt_object.current_system_x,
- c_var = opt_object.name_opt_vars_c,
- d_var = 1,
- )
- model_diag.summary_diag()
-
- # 优化算法
- # 是否允许负优化,组件的配置或输入中任意一处允许负优化,则不检查负优化
- allow_neg_opt_config = config_info.get_property('allow_neg_opt',default=False)
- allow_neg_opt = any([allow_neg_opt,allow_neg_opt_config])
- result_list = opt_alg_main(opt_obj = opt_object, allow_neg_opt = allow_neg_opt, config =config)
-
- # 输出中增加系统模型的初始输出(对应系统模型的初始输入)
- output_groups = config_info.get_io_group_info(io='out',type='point_id')
- result_list = result_append_init_sys_y(result_list,output_groups,opt_object)
-
- return result_list
- def check_var_data(group_data) -> None:
- for name,data in group_data.items():
- if not isinstance(data,pd.DataFrame):
- continue
- if data.shape[0] == 1:
- raise Exception(f'{name}的数据行数不等于1,不满足要求,数据为{data}')
- def get_opt_var(group_opt_var:list,group_opt_col:list) -> pd.DataFrame:
- opt_var = []
- for bound_info in group_opt_var:
- opt_var.append(bound_info['cur_val'])
- opt_var = pd.concat(opt_var,axis=1)
- opt_var.columns = group_opt_col
- return opt_var
- def get_sys_var(group_sys_var:list,group_sys_col:list) -> Union[pd.DataFrame,None]:
- if group_sys_var is None or len(group_sys_var)==0:
- print('【警告】未获取到"系统变量",确保实际不存在该变量 ')
- sys_var = None
- else:
- group_sys_var = [df.iloc[:,[0]] for df in group_sys_var]
- sys_var = pd.concat(group_sys_var,axis=1)
- sys_var.columns = group_sys_col
- return sys_var
- def get_oth_var(group_oth_var:list,group_oth_col:list,index) -> Union[pd.DataFrame,None]:
-
- oth_var_exist = group_oth_col is not None and len(group_oth_var) > 0
-
- if oth_var_exist:
- is_oth_var_df = all([isinstance(df,pd.DataFrame) for df in group_oth_var])
-
- if not oth_var_exist:
- # print('【警告】未获取到"其他变量",确保实际不存在该变量 ')
- oth_var = None
-
- elif is_oth_var_df and oth_var_exist:
- group_oth_var = [df.iloc[:,[0]] for df in group_oth_var]
- oth_var = pd.concat(group_oth_var,axis=1)
- oth_var.columns = group_oth_col
-
- elif (not is_oth_var_df) and oth_var_exist:
- print('【警告】未获取到有效的"其他变量",用0值代替所有该类变量 ')
- oth_var = pd.DataFrame(
- data = np.zeros_like(group_oth_col,dtype='float').reshape(1,-1),
- columns = group_oth_col,
- index = index
- )
-
- return oth_var
- def get_cst_var(group_cst_var:list,group_cst_col:list,index) -> Union[pd.DataFrame,None]:
-
- group_cst_var = None if (group_cst_var is None) or (len(group_cst_var)==0) else group_cst_var
- group_cst_col = None if (group_cst_col is None) or (len(group_cst_col)==0) else group_cst_col
-
- if group_cst_var is None or len(group_cst_var) == 0:
- # print('【警告】未获取到"外部变量",确保实际不存在该变量 ')
- return None
-
- new_group_cst_var = []
-
- # 校验数据类型,并重置Index(对齐)和Columns(便于引用)
- for var in group_cst_var:
- if not isinstance(var,pd.DataFrame):
- raise Exception(f'外部变量中存在错误的数据类型{type(var)}({var})')
- if var.shape[0] != 1:
- raise Exception(f'外部变量的数据形状有误{var.shape}(var)')
- var.index = index
- new_group_cst_var.append(var.iloc[:,[0]])
-
- cst_var = pd.concat(new_group_cst_var,axis=1)
- cst_var.columns = group_cst_col
-
- return cst_var
- def get_boundary(group_opt_var:list) -> dict:
- boundary = {} # {'id1':boundary_info1 , 'id2':boundary_info2}
- boundary_insert = {} # {0:0, 1:2, 3:1} 0位置插入0条,1位置插入2条,3位置插入1条
-
- print('-'*20+'边界条件'+'-'*20)
-
- pos_group_id = -1
- for idx,bound_info in enumerate(group_opt_var):
-
- print('-'*20+ str(idx) +'-'*20)
- for k,v in bound_info.items():
- v = v.iat[0,0] if isinstance(v,pd.DataFrame) else v
- print(f'{k} : {v} \n')
-
- if bound_info['id'] not in boundary.keys():
- pos_group_id += 1
- # 多个优化变量属于一个组内时(相同id),所有边界的信息以该组的id变量为准
- boundary[bound_info['id']] = bound_info # boundary 仅保留唯一的id
- boundary_insert[pos_group_id] = 0 # 插入的位置:插入的数量
-
- else:
- boundary_insert[pos_group_id] += 1
-
- return boundary,boundary_insert
- def init_boundary(all_boundary) -> tuple:
- # 生成边界
- lb = [] # 所有类型边界
- ub = [] # 所有类型边界
- var_type = [] # 所有类型边界
- var_precis = [] # 区间类边界
- collection_map = {} # 集合类边界/映射类边界 {'u1_name':{0:u1_a, 1:u1_b}}
- var_map = {} # 映射类边界 {'u1_name':{0:x1_a, 1:x1_b}}
-
- for boundary_id,boundary_info in all_boundary.items():
- boundary_type = boundary_info.get('boundary_type')
- var_name = boundary_info.get('var_name')
-
- # 区间类型的边界
- if boundary_type == 'interval':
- lb.append(boundary_info.get('lb'))
- ub.append(boundary_info.get('ub'))
- var_type.append(boundary_info.get('var_type'))
- var_precis.append(boundary_info.get('var_precis'))
-
- # 集合类型的边界
- elif boundary_type == 'collection':
- # 序号对应的数值也应该从小到大排序
- collection = boundary_info.get('collection')
- lb.append(0)
- ub.append(len(collection)-1)
- var_type.append(1)
- var_precis.append(0)
- collection_map[var_name] = dict(enumerate(collection))
-
- # 映射类型边界
- elif boundary_type == 'map':
- collection = boundary_info.get('collection')
- lb.append(0)
- ub.append(len(collection)-1)
- var_type.append(1)
- var_precis.append(0)
- collection_map[var_name] = dict(enumerate(collection))
- var_map[var_name] = boundary_info.get('var_map')
-
- else:
- raise ValueError('边界类型有误')
-
- return lb,ub,var_type,var_precis,collection_map,var_map
- def result_append_init_sys_y(result:list,output_groups:dict,opt_obj:Opt):
- if '初始输出' not in output_groups.keys():
- return result
- if len(output_groups['初始输出']) == 0:
- return result
-
- init_sys_y = []
- for y_col in output_groups['初始输出']:
- #TODO 报错时提示
- sys_y = opt_obj.current_system_y.loc[:,[y_col]]
- init_sys_y.append(sys_y)
- #TODO 放在最后可能不太合适,最好能指定
-
- final_result = result + init_sys_y
- return final_result
|