from typing import Union import pandas as pd from ._utils.config_info import ConfigInfo # 三种类型的边界 # 1. 静态边界: # - 内在的属性,应该是范围最宽的 # - 必定存在 # 2. 动态边界: # - 外在限制下的额外边界,边界两端已知 # - 非必要 # 3. 动态目标:外在限制下的额外边界,边界中心已知 # - 通常目标值会使用当前值 # - 非必要 # - 当多个变量统一优化时,这些变量应该具有相同的边界,因此不适合将当前值作为目标值 # 最终的边界由上述三种边界的交集决定,因此某个类型的边界越宽则约束越小 # 对于非必要的边界,需要存在一种动态的方法能够使其失效 # 动态边界:组件外部实现 # 动态目标:与静态范围不重叠 / 组件输入的动态目标为False def main(*args,config = None) -> list: config_info = ConfigInfo(config) syn_opt = config_info.get_property('syn_opt',default=False) # 是否统一优化 var_type = config_info.get_property('var_type',default=1) # 变量的类型 0为连续变量,1为离散变量 var_type = 0 if var_type is True else 1 var_precis = config_info.get_property(key='var_precis',default=1) # 变量的精度(精确到小数点后n位) ub_static = config['ub_static'] # 静态上边界 lb_static = config['lb_static'] # 静态下边界 check_boundary(ub_static, lb_static) ub_dyn_drift = config_info.get_property('dynamic_ub_drift',default=0) # 动态上边界偏移 lb_dyn_drift = config_info.get_property('dynamic_lb_drift',default=0) # 动态下边界偏移 adj_val = config_info.get_property('adj_val') # 基于目标边界的调整半径 cur_as_target = config_info.get_property('cur_as_target',default=False) group_info = config_info.get_io_group_info(io='in',type='data',data=args) group_id_info = config_info.get_io_group_info(io='in',type='point_id') output_id = config_info.get_io_id('out') bound_id = output_id[0] cur_val = get_cur_val( cur_val_df = group_info['当前值(必填)'], cur_val_col = group_id_info['当前值(必填)'], config_info = config_info ) ub_dynamic, lb_dynamic = get_dynamic_boundary( ub_dyn_drift, lb_dyn_drift, group_info['动态边界(选填)'][0:2] ) boundary = [] # boundary = [{'var1':{...}}, {'var2':{...}}] for cur_var_name in output_id: if syn_opt == False: bound_id = cur_var_name if cur_var_name not in cur_val.columns: raise Exception(f'组件输出的参数编号{cur_var_name}未在输入的数据中找到,输入的数据包含以下列{cur_val.columns}') # 获取动态目标边界 ub_trg, lb_trg = get_target_boundary( adj_val = adj_val, cur_as_target = cur_as_target, cur_var_value = cur_val.loc[:,cur_var_name].iloc[0], ub_static = ub_static, lb_static = lb_static, syn_opt = syn_opt, input_trg = group_info['动态边界(选填)'][2] ) # 多种类型的边界取交集 ub = min(ub_static,ub_dynamic,ub_trg) lb = max(lb_static,lb_dynamic,lb_trg) check_boundary(ub,lb) boundary_info = { 'id' : bound_id, # 标记"组" 来源于哪个边界组件 'var_name' : cur_var_name, # 标记组内名称(优化变量名称) 'ub' : ub, # 组内相同 'lb' : lb, # 组内相同 'var_type' : var_type, # 组内相同 'var_precis' : var_precis, # 组内相同 'boundary_type': 'interval', # 组内相同 'cur_val' : cur_val.loc[:,[cur_var_name]] # 组内不同 } boundary.append(boundary_info) return boundary if len(boundary)>1 else boundary[0] def get_cur_val( cur_val_df:list, cur_val_col:list, config_info ) -> pd.DataFrame: # 检验组件输入的数据类型是否为DataFrame for cur_id,cur_df in zip(cur_val_col,cur_val_df): if not isinstance(cur_df,pd.DataFrame): cur_df_type = type(cur_df) raise Exception(f'组件的输入({cur_id})不是DataFrame,是{cur_df_type}({cur_df})') cur_val = pd.concat(cur_val_df,axis=1) cur_val.columns = cur_val_col # 校验组件输入的当前值与组件的输出是否一致 if cur_val.columns.to_list() != config_info.get_io_id(io='out'): raise Exception(f'组件输入的当前值与组件输出不匹配,组件的当前值为:{cur_val.columns.to_list()},组件的输出为:{config_info.get_io_id(io="out")}') return cur_val def get_dynamic_boundary(ub_dynamic_drift,lb_dynamic_drift,dyn_input): for df in dyn_input: if (not isinstance(df,pd.DataFrame)) and (df is not None): raise Exception(f'动态边界的输入必须是DataFrame或None,获取到的输入是{type(df)}') ub_input = dyn_input[0].iat[0,0] if isinstance(dyn_input[0],pd.DataFrame) else None lb_input = dyn_input[1].iat[0,0] if isinstance(dyn_input[1],pd.DataFrame) else None ub_dynamic = ub_input + ub_dynamic_drift if ub_input is not None else 1e100 lb_dynamic = lb_input + lb_dynamic_drift if lb_input is not None else -1e100 check_boundary(ub_dynamic,lb_dynamic) return ub_dynamic,lb_dynamic def get_target_boundary( adj_val : float, cur_as_target: bool, cur_var_value: float, ub_static : float, lb_static : float, syn_opt : bool, input_trg : Union[pd.DataFrame,bool] ): # 基于目标值调整的边界调整 default_ub_trg = 1e100 default_lb_trg = -1e100 # 该边界失效的情况 # 1. 没有输入adj_val # 2. 目标边界与静态边界完全不重叠 # 3. 输入的目标值为False if adj_val is None: return default_ub_trg, default_lb_trg if isinstance(input_trg,bool) and input_trg == False: return default_ub_trg, default_lb_trg if cur_as_target == True and syn_opt == True: raise Exception('当开启统一优化时,不能将当前变量设为目标变量,需要额外指定目标变量') # 获取目标值 if cur_as_target == True: target_val = cur_var_value elif cur_as_target == False and isinstance(input_trg,pd.DataFrame) and len(input_trg)==1: target_val = input_trg.iat[0,0] else: raise Exception(f'输入的动态目标值有误:{input_trg}') ub_trg = target_val + adj_val lb_trg = target_val - adj_val check_boundary(ub_trg,lb_trg) if (ub_trg < lb_static) or (lb_trg > ub_static): return default_ub_trg, default_lb_trg return ub_trg,lb_trg def check_boundary(ub,lb): if ub < lb: raise ValueError(f'变量的上限({ub})<下限({lb}),需检查设定的边界值范围')