optimizer.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import os
  2. import sys
  3. import time
  4. import warnings
  5. from contextlib import contextmanager
  6. import pandas as pd
  7. from ..model._base._base_device import BaseDevice
  8. from .._opt.algorithm.sim_config import simulate_config as sim_opt_config
  9. from .._opt.algorithm.main import main as main_opt
  10. from .._opt.boundary.sim_config import simulate_config as sim_config_bound
  11. from .._opt.boundary.main import main as main_bound
  12. from .._opt.algorithm.model.model import SystemModel
  13. def optimizer(
  14. model : BaseDevice,
  15. opt_var_boundary: dict,
  16. opt_var_value : pd.DataFrame,
  17. oth_var_value : pd.DataFrame,
  18. target : str,
  19. target_min : bool,
  20. constrains : list,
  21. other_kwargs : dict = None,
  22. logging : bool = True
  23. ):
  24. if other_kwargs is None:
  25. other_kwargs = {}
  26. var_name_opt = []
  27. boundary_info = {}
  28. for var_name,var_info in opt_var_boundary.items():
  29. var_name_opt.append(var_name)
  30. boundary_info[var_name] = main_bound(
  31. None,None,None,
  32. opt_var_value.iloc[[0],:].loc[:,[var_name]],
  33. config = sim_config_bound(
  34. opt_var = [var_name],
  35. syn_opt = False,
  36. var_type = True,
  37. lb_static = var_info['lb'],
  38. ub_static = var_info['ub'],
  39. var_precis = var_info.get('precis',1)
  40. )
  41. )
  42. var_name_oth = []
  43. oth_var_info = {}
  44. for var_name in oth_var_value.columns:
  45. var_name_oth.append(var_name)
  46. oth_var_info[var_name] = oth_var_value.iloc[[0],:].loc[:,[var_name]]
  47. opt_config = sim_opt_config(
  48. target = target,
  49. dir_min = target_min,
  50. var_opt = var_name_opt,
  51. var_sys = var_name_oth,
  52. diag_model = False,
  53. algorithm = 'soea_DE_best_1_L_templet',
  54. NIND = other_kwargs.get('NIND',1000),
  55. MAXGEN = other_kwargs.get('MAXGEN',100),
  56. constrains = constrains,
  57. allow_neg_opt = False,
  58. )
  59. with suppress_output(show=logging):
  60. opt_output = main_opt(
  61. *list(boundary_info.values()),
  62. *list(oth_var_info.values()),
  63. system_model = AirSystem(model=model),
  64. config = opt_config
  65. )
  66. return {
  67. 'opt_summary': opt_output[:3],
  68. 'opt_var' : opt_output[3:],
  69. }
  70. class AirSystem(SystemModel):
  71. def __init__(self,model):
  72. super().__init__()
  73. self.model = model
  74. def predict(self,data:pd.DataFrame) -> pd.DataFrame:
  75. time_start = time.time()
  76. self.PENALTY = {}
  77. self.index = data.index
  78. with warnings.catch_warnings():
  79. warnings.simplefilter("ignore",category=RuntimeWarning)
  80. sys_out = self.model.predict_system(data)
  81. time_end = time.time()
  82. past_time = round(time_end-time_start,2)
  83. self.PAST_TIME.append(past_time)
  84. # print(f'第{len(self.PAST_TIME)}次调用系统模型,本次调用时长为:{past_time}秒 \n')
  85. return sys_out
  86. @contextmanager
  87. def suppress_output(show=True):
  88. """
  89. 上下文管理器,根据参数控制是否显示输出
  90. 参数:
  91. show (bool): 如果为True则显示输出,False则隐藏输出
  92. """
  93. if not show:
  94. original_stdout = sys.stdout
  95. sys.stdout = open(os.devnull, 'w')
  96. try:
  97. yield
  98. finally:
  99. if not show:
  100. sys.stdout = original_stdout