import os from datetime import datetime from pathlib import Path from pprint import pprint import pandas as pd from ...model.DHU.DHU_AB import DHU_AB from .config_reader import ConfigReader from ...tools.data_loader import DataLoader NOW = datetime.now().replace(second=0,microsecond=0) PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/') MODEL_FUNC_PATH = f'{PATH}/model_func.py' MODEL_FILE_PATH = f'./model.pkl' def train(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] plot_metric = True else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' plot_metric = False config_reader = ConfigReader(path=f'{config_reader_path}/DHU_AB配置.xlsx') ALL_RESULT = { 'EXCEPTION':{ 'Data': {}, 'Fit' : {}, 'Save': {} } } for each_eaup_name in config_reader.all_equp_names: equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') # 获取数据 try: # 部分情况下设备不需要部分点位表中的点位 rm_point_name = [] if not config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'): rm_point_name += ['mixed_1_TinM','mixed_1_DinM'] if not config_reader.get_equp_info(each_eaup_name,'存在补风口','bool'): rm_point_name += ['mixed_2_TinM','mixed_2_DinM'] # 获取历史数据 data_loader = DataLoader( path = f'{config_reader_path}/data/train/data_his/', start_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='开始时间',info_type='datetime'), end_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='结束时间',info_type='datetime'), print_process = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='打印取数日志',info_type='bool'), ) data_loader.download_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,equp_type,equp_class=['A','B']), url = data_URL, clean_cache = False, rm_point_name = rm_point_name ) equp_data = data_loader.get_equp_data(each_eaup_name) save_data(f'{config_reader_path}/data/train/data_his_raw',f'{each_eaup_name}.pkl',equp_data) except Exception as E: ALL_RESULT['EXCEPTION']['Data'][each_eaup_name] = E continue # 训练模型 try: equp_model = DHU_AB( DHU_type = equp_type, exist_Fa_H = config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'), exist_Fa_B = config_reader.get_equp_info(each_eaup_name,'存在补风口','bool'), other_info={ 'heatingcoil_1_Fs_rated': config_reader.get_equp_info(each_eaup_name,'前蒸汽盘管额定流量','float'), 'heatingcoil_2_Fs_rated': config_reader.get_equp_info(each_eaup_name,'后蒸汽盘管额定流量','float'), } ) # 清洗数据 Path(f'{config_reader_path}/data/train/clean_log/').mkdir(parents=True, exist_ok=True) equp_data = equp_model.clean_data( data = equp_data, data_type = ['input','observed'], print_process = True, fill_zero = False, save_log = f'{config_reader_path}/data/train/clean_log/{each_eaup_name}.txt', ) equp_data = equp_data.resample('60min').mean().dropna() save_data(f'{config_reader_path}/data/train/data_his_clean',f'{each_eaup_name}.pkl',equp_data) if not config_reader.get_app_info(each_eaup_name,'模型训练','训练模型','bool'): continue equp_model.fit( input_data = equp_data, observed_data = equp_data, plot_TVP = False, rw_FA_val = config_reader.get_app_info(each_eaup_name,'模型训练','新风阀门开度参数','bool') ) Path(f'{config_reader_path}/model').mkdir(parents=True, exist_ok=True) equp_model.save(f'{config_reader_path}/model/{each_eaup_name}.pkl') save_data(f'{config_reader_path}/data/train/data_TVP',f'{each_eaup_name}.csv',equp_model.TVP_data) save_data(f'{config_reader_path}/data/train/data_metric',f'{each_eaup_name}.csv',equp_model.TVP_metric.round(2)) if plot_metric: path = f'{config_reader_path}/plot/TVP' Path(path).mkdir(parents=True, exist_ok=True) equp_model.plot_TVP(equp_model.TVP_data,save_path=f'{path}/{each_eaup_name}.png') except Exception as E: ALL_RESULT['EXCEPTION']['Fit'][each_eaup_name] = E continue # 模型迭代 if not config_reader.get_app_info(each_eaup_name,'模型训练','迭代模型','bool'): continue try: monitor_point = config_reader.point.loc[lambda dt:dt.类型=='B'] model_update_info = {} for i in range(len(monitor_point)): name = monitor_point.loc[:,'编号'].iat[i] name_cn = monitor_point.loc[:,'名称'].iat[i] MAE = monitor_point.loc[:,'指标MAE'].iat[i] model_update_info[name] = { 'point_id' : name, 'point_name' : name_cn, 'point_class': name, 'thre_mae' : MAE, 'thre_mape' : 1, 'thre_days' : 7 } equp_model.save_to_platform( version_id = datetime.now().strftime('%Y%m'), model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'), update_method = 'update', model_info = model_update_info, MODEL_FILE_PATH = MODEL_FILE_PATH, MODEL_FUNC_PATH = MODEL_FUNC_PATH, ) except Exception as E: ALL_RESULT['EXCEPTION']['Save'][each_eaup_name] = E continue pprint(ALL_RESULT) def save_data(dir,file:str,data:pd.DataFrame): Path(dir).mkdir(parents=True,exist_ok=True) if file.endswith('.csv'): data.to_csv(os.path.join(dir,file),index=True) elif file.endswith('.pkl'): data.to_pickle(os.path.join(dir,file)) else: raise Exception('file type error')