| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- import json
- from datetime import datetime, timedelta
- from workflowlib import requests
- from workflow_utils import update_model_version_v3
- def update(
- version_id : int,
- model_id : str,
- model_info : dict,
- update_method : str,
- MODEL_FUNC_PATH: str,
- MODEL_FILE_PATH: str
- ):
- """
- :param model_info:
- {
- 'E':{
- 'metric' : {
- 'MAE' : ...,
- 'MAPE': ...,
- },
- 'point_id' : 'abc',
- 'point_name' : 'abc',
- 'point_class' : 'abc'
- 'thre_mae' : '123',
- 'thre_mape' : '123',
- 'thre_days' : '123',
- },
- 'AP':{
- 'metric' : {
- 'MAE' : ...,
- 'MAPE': ...,
- },
- 'point_id' : 'abc',
- 'point_name' : 'abc',
- 'point_class' : 'abc'
- 'thre_mae' : '123',
- 'thre_mape' : '123',
- 'thre_days' : '123',
- },
- },
- :param MODEL_FUNC_PATH:
- :param MODEL_FILE_PATH:
- :return:
- """
- factors = ['MAE', 'MAPE'] # 定义指标名称列表
- # 模型管理设置点位
- points_set_dict = {}
- for key, value in model_info.items():
- points_set_dict[key] = {
- 'point_id' : value['point_id'],
- 'point_name' : value['point_name'],
- 'point_class': value['point_class']
- }
- update_model_points(points_set_dict, model_id)
- # 利用模型ID获取模型信息
- model_info_res = get_model_info(model_id=model_id)
- device_name = model_info_res['device_name'] # 设备名称用于更新日志
- # 模型训练指标写于日志
- metrics_log = {}
- for key, value in model_info.items():
- metrics_log[key] = value['metric'] # 模型训练后的指标
- # 点位列表
- point_ids = []
- for key, value in model_info.items():
- point_ids.append(value['point_id'])
- # 键值E/AP:点位,指标阈值
- points_metrics = {}
- for key, value in model_info.items():
- points_metrics[key] = {
- 'point_id': value['point_id'],
- 'MAE': value['thre_mae'],
- 'MAPE': value['thre_mape']
- }
- # print("points_metrics", points_metrics)
- metric_json = get_metrics_json(points_metrics) # 正确的函数调用
- # json_data = json.dumps(metric_json, indent=4)
- # print("json_data:\n", json_data)
- # 配置项中每个目标变量超出阈值的天数,{'E': '123', 'AP': '123'}
- thre_days_dict = {}
- for key, value in model_info.items():
- thre_days_dict[key] = value['thre_days']
- if update_method == 'any_metric':
- print("【更新模式:基于监控指标进行更新】")
- # 从获取的模型信息中读取版本列表,找到旧版本ID,用于读取监控指标
- all_old_version_id = model_info_res['version_list']
- old_version_id = next((_['id'] for _ in all_old_version_id), version_id)
- all_keys_need_update = True
- for key, days in thre_days_dict.items():
- print("{:=^50s}".format(f"目标变量 {key} "))
- print(f"处理目标变量 {key} 的过去时间:")
- past_times_list = get_past_times(days)
- # print(f"过去时间表: {past_times_list}")
- # 获取监控指标
- monitor_results = get_monitor_metric(past_times_list, old_version_id, factors, point_ids)
- # print(f"目标点位的监控指标结果:{monitor_results}")
- # 检查阈值是否超出
- is_update_needed = check_threshold_update_model(monitor_results, points_metrics[key])
- print(f"目标 {key} 是否需要更新模型:{is_update_needed}")
- # 如果任何一个key的指标超出阈值,则将all_keys_need_update置为False
- if is_update_needed:
- all_keys_need_update = False
- if all_keys_need_update:
- print("【所有目标变量的所有指标均未超出阈值,不需要更新模型】")
- return None
- else:
- print("【存在目标变量指标超出阈值,需要更新模型】")
- version_update(model_id, MODEL_FUNC_PATH, MODEL_FILE_PATH, metric_json, version_id, device_name,
- metrics_log)
- elif update_method == 'update':
- print("【更新模式:强制更新】")
- version_update(model_id, MODEL_FUNC_PATH, MODEL_FILE_PATH, metric_json, version_id, device_name, metrics_log)
- else:
- raise ValueError
- # 通过model_id读取模型版本界面信息
- def get_model_info(model_id):
- res = requests.get(url=f"http://m2-backend-svc:8000/api/ai/model/get_details/{model_id}")
- res = res.json()['result']
- # print(f"模型版本信息: ", res)
- return res
- # 计算过去时间
- def get_past_times(thre_days: int) -> list:
- """
- 根据传入的天数计算过去几天的起始时间(从凌晨开始)。
- Args:
- thre_days (int): 要计算的天数范围。
- Returns:
- list: 包含过去几天的 datetime 对象列表。
- """
- thre_days = int(thre_days)
- nowtime = datetime.now()
- past_times = []
- for day in range(1, thre_days + 1):
- past_time = (nowtime - timedelta(days=day))
- past_time = past_time.replace(hour=0, minute=0, second=0, microsecond=0)
- past_times.append(past_time)
- print(f"根据配置 {thre_days} 天计算的时间:", past_times)
- return past_times
- # 获取监控指标
- def get_monitor_metric(past_times: list, version_id: str, factors: list, point_ids: list) -> dict:
- """
- 获取时间内单体模型监控界面的指定模型指标。
- Args:
- past_times (list): 时间列表,包含时间段的 datetime 对象。
- version_id (str): 模型版本ID。
- factors (list): 指标列表,例如 ['MAE', 'MAPE']。
- point_ids (list): 目标变量点位编号列表,例如 ['_E', '_AP']。
- Returns:
- dict: 包含每个 factor 的监控结果,格式如下:
- {
- 'MAE': [值1, 值2, ...],
- 'MAPE': [值1, 值2, ...],
- }
- """
- url = "http://m2-backend-svc:8000/api/ai/monitor/get_single_factor_sequence"
- # monitor_metric = defaultdict(list)
- monitor_metric = {}
- for time_point in past_times:
- formatted_time = time_point.strftime("%Y-%m-%d %H:%M:%S")
- for point_id in point_ids:
- if point_id not in monitor_metric:
- monitor_metric[point_id] = {}
- for factor in factors:
- if factor not in monitor_metric[point_id]:
- monitor_metric[point_id][factor] = []
- data = {
- "factor": factor,
- "point_id": point_id,
- "time_begin": formatted_time,
- "time_end": formatted_time,
- "version_id": version_id,
- "type": "DAILY"
- }
- try:
- response = requests.post(url=url, data=json.dumps(data))
- response.raise_for_status() # 捕获 HTTP 请求异常
- result = response.json().get('results', [])
- if result:
- # monitor_metric[factor].append(result[0][1]) # 假定结果中目标值在 result[0][1]
- monitor_metric[point_id][factor].append(result[0][1])
- else:
- # monitor_metric[factor].append(0) # 填充0
- monitor_metric[point_id][factor].append(0)
- except (KeyError, IndexError, Exception) as e:
- print(f"Error fetching data for {factor}, {point_id}, {formatted_time}: {e}")
- # monitor_metric[factor].append(0)
- monitor_metric[point_id][factor].append(0)
- return dict(monitor_metric)
- # 检查指标是否超出阈值
- def check_threshold_update_model(monitor_metric, points_metrics):
- """
- 判断任意一种指标在配置时间内是否超出阈值
- Args:
- monitor_metric (dict): 模型的监控指标数据
- points_metrics (dict): 指标阈值,如 {'point_id': 'abc', 'MAE': '123', 'MAPE': '123'}
- Returns:
- bool: 若任意一个指标的值超过阈值,则返回 True,否则返回 False
- """
- # print("监控指标结果:", monitor_metric)
- # print("点位指标阈值:", points_metrics)
- point_id = points_metrics['point_id']
- any_greater = {}
- if point_id in monitor_metric:
- print(f"目标点位 {point_id} 的监控指标结果:{monitor_metric[point_id]}")
- for factor, values in monitor_metric[point_id].items():
- threshold = float(points_metrics.get(factor, float('inf')))
- # 判断是否有指标超出阈值
- any_greater[factor] = any(
- value > threshold for value in values if value is not None
- )
- any_true_greater = any(value for value in any_greater.values())
- print("监控模型指标是否超出阈值: ", any_greater)
- return any_true_greater
- def get_metrics_json(points_metrics):
- metric_json = []
- for key, metrics in points_metrics.items():
- # 提取阈值 MAE 和 MAPE
- thre_mae = metrics.get('MAE', None)
- thre_mape = metrics.get('MAPE', None)
- # 更新 factors 列表中的 upr_limit
- factors = [
- {"factor": "MAE", "lwr_limit": None, "upr_limit": float(thre_mae) if thre_mae is not None else None,
- "trained_value": None},
- {"factor": "MBE", "lwr_limit": None, "upr_limit": None, "trained_value": None},
- {"factor": "MSE", "lwr_limit": None, "upr_limit": None, "trained_value": None},
- {"factor": "MdAE", "lwr_limit": None, "upr_limit": None, "trained_value": None},
- {"factor": "std_MAE", "lwr_limit": None, "upr_limit": None, "trained_value": None},
- {"factor": "MAPE", "lwr_limit": None, "upr_limit": float(thre_mape) if thre_mape is not None else None,
- "trained_value": None},
- {"factor": "std_MAPE", "lwr_limit": None, "upr_limit": None, "trained_value": None},
- ]
- metric_json.append({"factors": factors, "point_id": metrics.get("point_id")})
- return metric_json
- def version_update(model_id, mod_func_path, mod_file_path, metric_json, new_version_id, device_name, metrics_log):
- """
- 上传新模型版本,并生成上传日志
- :param model_id: 模型ID
- :param mod_func_path: 模型文件地址
- :param mod_file_path: 模型文件地址
- :param metric_json: 模型指标metric json
- :param new_version_id: 新版本ID
- :param device_name: 设备名称
- :param metrics_log: 上传日志用到的指标log
- :return:
- """
- filename_list = [
- {"filename": mod_file_path},
- {"filename": mod_func_path}
- ]
- # 上传模型新版本
- update_model_version_v3(model_id, new_version_id, filename_list, workflow_id=None, factors=metric_json)
- # 上传日志
- device_name_value = f"设备名称:{device_name}, 模型文件:{model_id}, 更新后的指标:{metrics_log}"
- # print(device_name_value)
- r = requests.post(
- "http://m2-backend-svc:8000/api/ai/sys_opt_log/create_one",
- json={
- "上传日志" : "上传日志",
- "user_id": 10,
- "type" : "模型自动迭代操作",
- "log" : device_name_value
- }
- )
- return
- def update_model_points(points_set_dict, model_id):
- """
- points_set_dict: 包含点位信息的字典
- model_id: 模型id
- """
- url = f"http://m2-backend-svc:8000/api/ai/model/get_details/{model_id}"
- update_url = f"http://m2-backend-svc:8000/api/ai/model/update_info/{model_id}"
- try:
- print("{:=^50s}".format("设置模型文件点位"))
- r = requests.get(url=url)
- print(f"上传模型点位请求响应:{r}。")
- r.raise_for_status()
- result = r.json().get('result', {})
- device_data = result.get('device_data', {})
- device_id = result.get('device_id', {})
- # 定义 points 列表
- points = []
- for key, value in points_set_dict.items():
- points.append({
- "point_id" : value['point_id'],
- "point_class": value['point_class'],
- "name" : value['point_name'],
- "device_id" : device_id
- })
- if device_data['point_list'] is None:
- device_data['point_list'] = points
- else:
- for point in points:
- existing_point = next(
- (p for p in device_data['point_list'] if p['point_class'] == point['point_class']), None)
- if existing_point:
- existing_point['point_id'] = point['point_id']
- print(f"点位 {point['point_class']} 已存在,更新 point_id 为 {point['point_id']}。")
- else:
- device_data['point_list'].append(point)
- print(f"添加新点位 {point}。")
- update_r = requests.post(update_url, json=result) # 上传result
- if update_r.status_code == 200:
- print(f'模型点位保存成功!')
- else:
- print(f'保存模型点位时出错:', update_r.status_code)
- except Exception as e:
- print(f"请求错误:{e}")
- result = {}
- return
|