import os from datetime import datetime from pathlib import Path from pprint import pprint import pandas as pd from ...model.DHU.DHU_A import DHU_A from ...model.DHU.DHU_B import DHU_B from ...tools.config_reader import ConfigReader from ...tools.data_loader import DataLoader NOW = datetime.now().replace(second=0,microsecond=0) PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/') MODEL_FUNC_PATH = f'{PATH}/model_func.py' MODEL_FILE_PATH = f'./model.pkl' def train(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU_A配置.xlsx') if config_reader.meta['设备类型'] == 'DHU_A': MODEL = DHU_A elif config_reader.meta['设备类型'] == 'DHU_B': MODEL = DHU_B else: raise NotImplementedError(config_reader.meta['设备类型']) ALL_RESULT = { 'EXCEPTION':{ 'Data': {}, 'Fit' : {}, 'Save': {} } } for each_eaup_name in config_reader.all_equp_names: # 获取数据 try: data_loader = DataLoader( path = f'{config_reader_path}/data/train/data_his/', start_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='开始时间',info_type='datetime'), end_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='结束时间',info_type='datetime') ) data_loader.dowload_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','B']), url = data_URL, clean_cache = False ) equp_data = data_loader.get_equp_data(each_eaup_name) equp_data = clean_data(equp_data) save_data(f'{config_reader_path}/data/train/data_his_clean',f'{each_eaup_name}.pkl',equp_data) except Exception as E: ALL_RESULT['EXCEPTION']['Data'][each_eaup_name] = E continue # 训练模型 try: equp_model:DHU_A = MODEL() equp_model.fit( input_data = equp_data, observed_data = equp_data, plot_TVP = False, rw_FA_val = True, #TODO exist_Fa_H = config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'), exist_Fa_B = config_reader.get_equp_info(each_eaup_name,'存在补风口','bool'), ) Path(f'{config_reader_path}/model').mkdir(parents=True, exist_ok=True) equp_model.save(f'{config_reader_path}/model/{each_eaup_name}.pkl') save_data(f'{config_reader_path}/data/train/data_TVP',f'{each_eaup_name}.csv',equp_model.TVP_data) save_data(f'{config_reader_path}/data/train/data_metric',f'{each_eaup_name}.csv',equp_model.TVP_metric) except Exception as E: ALL_RESULT['EXCEPTION']['Fit'][each_eaup_name] = E continue # 模型迭代 if not config_reader.get_app_info(each_eaup_name,'模型训练','保存模型','bool'): continue try: monitor_point = config_reader.point.loc[lambda dt:dt.类型=='B'] model_update_info = {} for i in range(len(monitor_point)): name = monitor_point.loc[:,'编号'].iat[i] name_cn = monitor_point.loc[:,'名称'].iat[i] MAE = monitor_point.loc[:,'指标MAE'].iat[i] model_update_info[name] = { 'point_id' : name, 'point_name' : name_cn, 'point_class': name, 'thre_mae' : MAE, 'thre_mape' : 1, 'thre_days' : 7 } equp_model.save_to_platform( version_id = datetime.now().strftime('%Y%m'), model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'), update_method = 'update', model_info = model_update_info, MODEL_FILE_PATH = MODEL_FILE_PATH, MODEL_FUNC_PATH = MODEL_FUNC_PATH, ) except Exception as E: ALL_RESULT['EXCEPTION']['Save'][each_eaup_name] = E continue pprint(ALL_RESULT) def clean_data(data) -> pd.DataFrame: data = ( data .resample('60min') .mean() ) return data def save_data(dir,file:str,data:pd.DataFrame): Path(dir).mkdir(parents=True,exist_ok=True) if file.endswith('.csv'): data.to_csv(os.path.join(dir,file),index=True) elif file.endswith('.pkl'): data.to_pickle(os.path.join(dir,file)) else: raise Exception('file type error')