import json from datetime import datetime, timedelta from workflowlib import requests from workflow_utils import update_model_version_v3 def update( version_id : int, model_id : str, model_info : dict, update_method : str, MODEL_FUNC_PATH: str, MODEL_FILE_PATH: str ): """ :param model_info: { 'E':{ 'metric' : { 'MAE' : ..., 'MAPE': ..., }, 'point_id' : 'abc', 'point_name' : 'abc', 'point_class' : 'abc' 'thre_mae' : '123', 'thre_mape' : '123', 'thre_days' : '123', }, 'AP':{ 'metric' : { 'MAE' : ..., 'MAPE': ..., }, 'point_id' : 'abc', 'point_name' : 'abc', 'point_class' : 'abc' 'thre_mae' : '123', 'thre_mape' : '123', 'thre_days' : '123', }, }, :param MODEL_FUNC_PATH: :param MODEL_FILE_PATH: :return: """ factors = ['MAE', 'MAPE'] # 定义指标名称列表 # 模型管理设置点位 points_set_dict = {} for key, value in model_info.items(): points_set_dict[key] = { 'point_id' : value['point_id'], 'point_name' : value['point_name'], 'point_class': value['point_class'] } update_model_points(points_set_dict, model_id) # 利用模型ID获取模型信息 model_info_res = get_model_info(model_id=model_id) device_name = model_info_res['device_name'] # 设备名称用于更新日志 # 模型训练指标写于日志 metrics_log = {} for key, value in model_info.items(): metrics_log[key] = value['metric'] # 模型训练后的指标 # 点位列表 point_ids = [] for key, value in model_info.items(): point_ids.append(value['point_id']) # 键值E/AP:点位,指标阈值 points_metrics = {} for key, value in model_info.items(): points_metrics[key] = { 'point_id': value['point_id'], 'MAE': value['thre_mae'], 'MAPE': value['thre_mape'] } # print("points_metrics", points_metrics) metric_json = get_metrics_json(points_metrics) # 正确的函数调用 # json_data = json.dumps(metric_json, indent=4) # print("json_data:\n", json_data) # 配置项中每个目标变量超出阈值的天数,{'E': '123', 'AP': '123'} thre_days_dict = {} for key, value in model_info.items(): thre_days_dict[key] = value['thre_days'] if update_method == 'any_metric': print("【更新模式:基于监控指标进行更新】") # 从获取的模型信息中读取版本列表,找到旧版本ID,用于读取监控指标 all_old_version_id = model_info_res['version_list'] old_version_id = next((_['id'] for _ in all_old_version_id), version_id) all_keys_need_update = True for key, days in thre_days_dict.items(): print("{:=^50s}".format(f"目标变量 {key} ")) print(f"处理目标变量 {key} 的过去时间:") past_times_list = get_past_times(days) # print(f"过去时间表: {past_times_list}") # 获取监控指标 monitor_results = get_monitor_metric(past_times_list, old_version_id, factors, point_ids) # print(f"目标点位的监控指标结果:{monitor_results}") # 检查阈值是否超出 is_update_needed = check_threshold_update_model(monitor_results, points_metrics[key]) print(f"目标 {key} 是否需要更新模型:{is_update_needed}") # 如果任何一个key的指标超出阈值,则将all_keys_need_update置为False if is_update_needed: all_keys_need_update = False if all_keys_need_update: print("【所有目标变量的所有指标均未超出阈值,不需要更新模型】") return None else: print("【存在目标变量指标超出阈值,需要更新模型】") version_update(model_id, MODEL_FUNC_PATH, MODEL_FILE_PATH, metric_json, version_id, device_name, metrics_log) elif update_method == 'update': print("【更新模式:强制更新】") version_update(model_id, MODEL_FUNC_PATH, MODEL_FILE_PATH, metric_json, version_id, device_name, metrics_log) else: raise ValueError # 通过model_id读取模型版本界面信息 def get_model_info(model_id): res = requests.get(url=f"http://m2-backend-svc:8000/api/ai/model/get_details/{model_id}") res = res.json()['result'] # print(f"模型版本信息: ", res) return res # 计算过去时间 def get_past_times(thre_days: int) -> list: """ 根据传入的天数计算过去几天的起始时间(从凌晨开始)。 Args: thre_days (int): 要计算的天数范围。 Returns: list: 包含过去几天的 datetime 对象列表。 """ thre_days = int(thre_days) nowtime = datetime.now() past_times = [] for day in range(1, thre_days + 1): past_time = (nowtime - timedelta(days=day)) past_time = past_time.replace(hour=0, minute=0, second=0, microsecond=0) past_times.append(past_time) print(f"根据配置 {thre_days} 天计算的时间:", past_times) return past_times # 获取监控指标 def get_monitor_metric(past_times: list, version_id: str, factors: list, point_ids: list) -> dict: """ 获取时间内单体模型监控界面的指定模型指标。 Args: past_times (list): 时间列表,包含时间段的 datetime 对象。 version_id (str): 模型版本ID。 factors (list): 指标列表,例如 ['MAE', 'MAPE']。 point_ids (list): 目标变量点位编号列表,例如 ['_E', '_AP']。 Returns: dict: 包含每个 factor 的监控结果,格式如下: { 'MAE': [值1, 值2, ...], 'MAPE': [值1, 值2, ...], } """ url = "http://m2-backend-svc:8000/api/ai/monitor/get_single_factor_sequence" # monitor_metric = defaultdict(list) monitor_metric = {} for time_point in past_times: formatted_time = time_point.strftime("%Y-%m-%d %H:%M:%S") for point_id in point_ids: if point_id not in monitor_metric: monitor_metric[point_id] = {} for factor in factors: if factor not in monitor_metric[point_id]: monitor_metric[point_id][factor] = [] data = { "factor": factor, "point_id": point_id, "time_begin": formatted_time, "time_end": formatted_time, "version_id": version_id, "type": "DAILY" } try: response = requests.post(url=url, data=json.dumps(data)) response.raise_for_status() # 捕获 HTTP 请求异常 result = response.json().get('results', []) if result: # monitor_metric[factor].append(result[0][1]) # 假定结果中目标值在 result[0][1] monitor_metric[point_id][factor].append(result[0][1]) else: # monitor_metric[factor].append(0) # 填充0 monitor_metric[point_id][factor].append(0) except (KeyError, IndexError, Exception) as e: print(f"Error fetching data for {factor}, {point_id}, {formatted_time}: {e}") # monitor_metric[factor].append(0) monitor_metric[point_id][factor].append(0) return dict(monitor_metric) # 检查指标是否超出阈值 def check_threshold_update_model(monitor_metric, points_metrics): """ 判断任意一种指标在配置时间内是否超出阈值 Args: monitor_metric (dict): 模型的监控指标数据 points_metrics (dict): 指标阈值,如 {'point_id': 'abc', 'MAE': '123', 'MAPE': '123'} Returns: bool: 若任意一个指标的值超过阈值,则返回 True,否则返回 False """ # print("监控指标结果:", monitor_metric) # print("点位指标阈值:", points_metrics) point_id = points_metrics['point_id'] any_greater = {} if point_id in monitor_metric: print(f"目标点位 {point_id} 的监控指标结果:{monitor_metric[point_id]}") for factor, values in monitor_metric[point_id].items(): threshold = float(points_metrics.get(factor, float('inf'))) # 判断是否有指标超出阈值 any_greater[factor] = any( value > threshold for value in values if value is not None ) any_true_greater = any(value for value in any_greater.values()) print("监控模型指标是否超出阈值: ", any_greater) return any_true_greater def get_metrics_json(points_metrics): metric_json = [] for key, metrics in points_metrics.items(): # 提取阈值 MAE 和 MAPE thre_mae = metrics.get('MAE', None) thre_mape = metrics.get('MAPE', None) # 更新 factors 列表中的 upr_limit factors = [ {"factor": "MAE", "lwr_limit": None, "upr_limit": float(thre_mae) if thre_mae is not None else None, "trained_value": None}, {"factor": "MBE", "lwr_limit": None, "upr_limit": None, "trained_value": None}, {"factor": "MSE", "lwr_limit": None, "upr_limit": None, "trained_value": None}, {"factor": "MdAE", "lwr_limit": None, "upr_limit": None, "trained_value": None}, {"factor": "std_MAE", "lwr_limit": None, "upr_limit": None, "trained_value": None}, {"factor": "MAPE", "lwr_limit": None, "upr_limit": float(thre_mape) if thre_mape is not None else None, "trained_value": None}, {"factor": "std_MAPE", "lwr_limit": None, "upr_limit": None, "trained_value": None}, ] metric_json.append({"factors": factors, "point_id": metrics.get("point_id")}) return metric_json def version_update(model_id, mod_func_path, mod_file_path, metric_json, new_version_id, device_name, metrics_log): """ 上传新模型版本,并生成上传日志 :param model_id: 模型ID :param mod_func_path: 模型文件地址 :param mod_file_path: 模型文件地址 :param metric_json: 模型指标metric json :param new_version_id: 新版本ID :param device_name: 设备名称 :param metrics_log: 上传日志用到的指标log :return: """ filename_list = [ {"filename": mod_file_path}, {"filename": mod_func_path} ] # 上传模型新版本 update_model_version_v3(model_id, new_version_id, filename_list, workflow_id=None, factors=metric_json) # 上传日志 device_name_value = f"设备名称:{device_name}, 模型文件:{model_id}, 更新后的指标:{metrics_log}" # print(device_name_value) r = requests.post( "http://m2-backend-svc:8000/api/ai/sys_opt_log/create_one", json={ "上传日志" : "上传日志", "user_id": 10, "type" : "模型自动迭代操作", "log" : device_name_value } ) return def update_model_points(points_set_dict, model_id): """ points_set_dict: 包含点位信息的字典 model_id: 模型id """ url = f"http://m2-backend-svc:8000/api/ai/model/get_details/{model_id}" update_url = f"http://m2-backend-svc:8000/api/ai/model/update_info/{model_id}" try: print("{:=^50s}".format("设置模型文件点位")) r = requests.get(url=url) print(f"上传模型点位请求响应:{r}。") r.raise_for_status() result = r.json().get('result', {}) device_data = result.get('device_data', {}) device_id = result.get('device_id', {}) # 定义 points 列表 points = [] for key, value in points_set_dict.items(): points.append({ "point_id" : value['point_id'], "point_class": value['point_class'], "name" : value['point_name'], "device_id" : device_id }) if device_data['point_list'] is None: device_data['point_list'] = points else: for point in points: existing_point = next( (p for p in device_data['point_list'] if p['point_class'] == point['point_class']), None) if existing_point: existing_point['point_id'] = point['point_id'] print(f"点位 {point['point_class']} 已存在,更新 point_id 为 {point['point_id']}。") else: device_data['point_list'].append(point) print(f"添加新点位 {point}。") update_r = requests.post(update_url, json=result) # 上传result if update_r.status_code == 200: print(f'模型点位保存成功!') else: print(f'保存模型点位时出错:', update_r.status_code) except Exception as e: print(f"请求错误:{e}") result = {} return