| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- from typing import Union
- import pandas as pd
- from ._utils.config_info import ConfigInfo
- # 三种类型的边界
- # 1. 静态边界:
- # - 内在的属性,应该是范围最宽的
- # - 必定存在
- # 2. 动态边界:
- # - 外在限制下的额外边界,边界两端已知
- # - 非必要
- # 3. 动态目标:外在限制下的额外边界,边界中心已知
- # - 通常目标值会使用当前值
- # - 非必要
- # - 当多个变量统一优化时,这些变量应该具有相同的边界,因此不适合将当前值作为目标值
- # 最终的边界由上述三种边界的交集决定,因此某个类型的边界越宽则约束越小
- # 对于非必要的边界,需要存在一种动态的方法能够使其失效
- # 动态边界:组件外部实现
- # 动态目标:与静态范围不重叠 / 组件输入的动态目标为False
- def main(*args,config = None) -> list:
-
- config_info = ConfigInfo(config)
- syn_opt = config_info.get_property('syn_opt',default=False) # 是否统一优化
- var_type = config_info.get_property('var_type',default=1) # 变量的类型 0为连续变量,1为离散变量
- var_type = 0 if var_type is True else 1
- var_precis = config_info.get_property(key='var_precis',default=1) # 变量的精度(精确到小数点后n位)
- ub_static = config['ub_static'] # 静态上边界
- lb_static = config['lb_static'] # 静态下边界
- check_boundary(ub_static, lb_static)
- ub_dyn_drift = config_info.get_property('dynamic_ub_drift',default=0) # 动态上边界偏移
- lb_dyn_drift = config_info.get_property('dynamic_lb_drift',default=0) # 动态下边界偏移
- adj_val = config_info.get_property('adj_val') # 基于目标边界的调整半径
- cur_as_target = config_info.get_property('cur_as_target',default=False)
-
- group_info = config_info.get_io_group_info(io='in',type='data',data=args)
- group_id_info = config_info.get_io_group_info(io='in',type='point_id')
- output_id = config_info.get_io_id('out')
-
- bound_id = output_id[0]
-
- cur_val = get_cur_val(
- cur_val_df = group_info['当前值(必填)'],
- cur_val_col = group_id_info['当前值(必填)'],
- config_info = config_info
- )
-
- ub_dynamic, lb_dynamic = get_dynamic_boundary(
- ub_dyn_drift,
- lb_dyn_drift,
- group_info['动态边界(选填)'][0:2]
- )
-
- boundary = [] # boundary = [{'var1':{...}}, {'var2':{...}}]
- for cur_var_name in output_id:
-
- if syn_opt == False:
- bound_id = cur_var_name
-
- if cur_var_name not in cur_val.columns:
- raise Exception(f'组件输出的参数编号{cur_var_name}未在输入的数据中找到,输入的数据包含以下列{cur_val.columns}')
-
- # 获取动态目标边界
- ub_trg, lb_trg = get_target_boundary(
- adj_val = adj_val,
- cur_as_target = cur_as_target,
- cur_var_value = cur_val.loc[:,cur_var_name].iloc[0],
- ub_static = ub_static,
- lb_static = lb_static,
- syn_opt = syn_opt,
- input_trg = group_info['动态边界(选填)'][2]
- )
-
- # 多种类型的边界取交集
- ub = min(ub_static,ub_dynamic,ub_trg)
- lb = max(lb_static,lb_dynamic,lb_trg)
- check_boundary(ub,lb)
- boundary_info = {
- 'id' : bound_id, # 标记"组" 来源于哪个边界组件
- 'var_name' : cur_var_name, # 标记组内名称(优化变量名称)
- 'ub' : ub, # 组内相同
- 'lb' : lb, # 组内相同
- 'var_type' : var_type, # 组内相同
- 'var_precis' : var_precis, # 组内相同
- 'boundary_type': 'interval', # 组内相同
- 'cur_val' : cur_val.loc[:,[cur_var_name]] # 组内不同
- }
-
- boundary.append(boundary_info)
-
- return boundary if len(boundary)>1 else boundary[0]
- def get_cur_val(
- cur_val_df:list,
- cur_val_col:list,
- config_info
- ) -> pd.DataFrame:
- # 检验组件输入的数据类型是否为DataFrame
-
- for cur_id,cur_df in zip(cur_val_col,cur_val_df):
- if not isinstance(cur_df,pd.DataFrame):
- cur_df_type = type(cur_df)
- raise Exception(f'组件的输入({cur_id})不是DataFrame,是{cur_df_type}({cur_df})')
-
- cur_val = pd.concat(cur_val_df,axis=1)
- cur_val.columns = cur_val_col
-
- # 校验组件输入的当前值与组件的输出是否一致
- if cur_val.columns.to_list() != config_info.get_io_id(io='out'):
- raise Exception(f'组件输入的当前值与组件输出不匹配,组件的当前值为:{cur_val.columns.to_list()},组件的输出为:{config_info.get_io_id(io="out")}')
-
- return cur_val
- def get_dynamic_boundary(ub_dynamic_drift,lb_dynamic_drift,dyn_input):
- for df in dyn_input:
- if (not isinstance(df,pd.DataFrame)) and (df is not None):
- raise Exception(f'动态边界的输入必须是DataFrame或None,获取到的输入是{type(df)}')
-
- ub_input = dyn_input[0].iat[0,0] if isinstance(dyn_input[0],pd.DataFrame) else None
- lb_input = dyn_input[1].iat[0,0] if isinstance(dyn_input[1],pd.DataFrame) else None
-
- ub_dynamic = ub_input + ub_dynamic_drift if ub_input is not None else 1e100
- lb_dynamic = lb_input + lb_dynamic_drift if lb_input is not None else -1e100
-
- check_boundary(ub_dynamic,lb_dynamic)
-
- return ub_dynamic,lb_dynamic
- def get_target_boundary(
- adj_val : float,
- cur_as_target: bool,
- cur_var_value: float,
- ub_static : float,
- lb_static : float,
- syn_opt : bool,
- input_trg : Union[pd.DataFrame,bool]
- ):
- # 基于目标值调整的边界调整
-
- default_ub_trg = 1e100
- default_lb_trg = -1e100
-
- # 该边界失效的情况
- # 1. 没有输入adj_val
- # 2. 目标边界与静态边界完全不重叠
- # 3. 输入的目标值为False
-
- if adj_val is None:
- return default_ub_trg, default_lb_trg
-
- if isinstance(input_trg,bool) and input_trg == False:
- return default_ub_trg, default_lb_trg
-
- if cur_as_target == True and syn_opt == True:
- raise Exception('当开启统一优化时,不能将当前变量设为目标变量,需要额外指定目标变量')
-
- # 获取目标值
- if cur_as_target == True:
- target_val = cur_var_value
- elif cur_as_target == False and isinstance(input_trg,pd.DataFrame) and len(input_trg)==1:
- target_val = input_trg.iat[0,0]
- else:
- raise Exception(f'输入的动态目标值有误:{input_trg}')
-
- ub_trg = target_val + adj_val
- lb_trg = target_val - adj_val
- check_boundary(ub_trg,lb_trg)
-
- if (ub_trg < lb_static) or (lb_trg > ub_static):
- return default_ub_trg, default_lb_trg
-
- return ub_trg,lb_trg
- def check_boundary(ub,lb):
- if ub < lb:
- raise ValueError(f'变量的上限({ub})<下限({lb}),需检查设定的边界值范围')
-
|