import os from datetime import datetime from pathlib import Path from pprint import pprint import pandas as pd from ...model.DHU.DHU_AB import DHU_AB from ...model.DHU.SDHU_AB import SDHU_AB from ...model.Room.room import RoomDewPredictor from .config_reader import ConfigReader from ...tools.data_loader import DataLoader NOW = datetime.now().replace(second=0,microsecond=0) PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/') MODEL_FUNC_PATH = f'{PATH}/model_func.py' MODEL_FILE_PATH = f'./model.pkl' def train(*inputs,config=None): config = {} if config is None else config if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx') ALL_RESULT = { 'EXCEPTION':{ 'Data': {}, 'Fit' : {}, 'Save': {}, 'Plot': {} } } for each_eaup_name in config_reader.all_equp_names: equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') print(f'{each_eaup_name}开始训练,设备类型为{equp_type}') # 获取数据 try: equp_data = load_data( each_eaup_name=each_eaup_name,each_equp_type=equp_type,config_reader=config_reader, config_reader_path=config_reader_path,data_URL=data_URL ) except Exception as E: ALL_RESULT['EXCEPTION']['Data'][each_eaup_name] = E continue # 训练模型 try: equp_model,equp_data_clean = train_equp_model( each_eaup_name=each_eaup_name,each_equp_type=equp_type,equp_data=equp_data, config_reader=config_reader,config_reader_path=config_reader_path) room_model = train_room_model( each_eaup_name=each_eaup_name,each_equp_type=equp_type,equp_data=equp_data, config_reader=config_reader,config_reader_path=config_reader_path ) except Exception as E: ALL_RESULT['EXCEPTION']['Fit'][each_eaup_name] = E continue # 保存可视化结果 if config_reader.get_app_info(each_eaup_name,'模型训练','保存可视化结果','bool') and equp_model is not None: try: save_train_info( equp_model=equp_model,equp_data=equp_data_clean, config_reader_path=config_reader_path,each_eaup_name=each_eaup_name) except Exception as E: ALL_RESULT['EXCEPTION']['Plot'][each_eaup_name] = E pass # 模型迭代 if not config_reader.get_app_info(each_eaup_name,'模型训练','迭代模型','bool') and equp_model is not None: continue try: monitor_point = config_reader.point.loc[lambda dt:dt.类型=='B'] model_update_info = {} for i in range(len(monitor_point)): name = monitor_point.loc[:,'编号'].iat[i] name_cn = monitor_point.loc[:,'名称'].iat[i] MAE = monitor_point.loc[:,'指标MAE'].iat[i] model_update_info[name] = { 'point_id' : name, 'point_name' : name_cn, 'point_class': name, 'thre_mae' : MAE, 'thre_mape' : 1, 'thre_days' : 7 } equp_model.save_to_platform( version_id = datetime.now().strftime('%Y%m'), model_id = config_reader.get_equp_info(each_eaup_name,'模型编号','str'), update_method = 'update', model_info = model_update_info, MODEL_FILE_PATH = MODEL_FILE_PATH, MODEL_FUNC_PATH = MODEL_FUNC_PATH, ) except Exception as E: ALL_RESULT['EXCEPTION']['Save'][each_eaup_name] = E continue pprint(ALL_RESULT) def save_data(dir,file:str,data:pd.DataFrame): Path(dir).mkdir(parents=True,exist_ok=True) if file.endswith('.csv'): data.to_csv(os.path.join(dir,file),index=True) elif file.endswith('.pkl'): data.to_pickle(os.path.join(dir,file)) else: raise Exception('file type error') def load_data(each_eaup_name,each_equp_type,config_reader,config_reader_path,data_URL): # 部分情况下设备不需要部分点位表中的点位 rm_point_name = [] if not config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'): rm_point_name += ['mixed_1_TinM','mixed_1_DinM'] if not config_reader.get_equp_info(each_eaup_name,'存在补风口','bool'): rm_point_name += ['mixed_2_TinM','mixed_2_DinM'] # 获取历史数据 data_loader = DataLoader( path = f'{config_reader_path}/data/train/data_his/', start_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='开始时间',info_type='datetime'), end_time = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='结束时间',info_type='datetime'), print_process = config_reader.get_app_info(each_eaup_name,app_type='模型训练',key='打印取数日志',info_type='bool'), ) data_loader.download_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','B','C']), url = data_URL, clean_cache = False, rm_point_name = rm_point_name ) equp_data = data_loader.get_equp_data(each_eaup_name) save_data(f'{config_reader_path}/data/train/data_his_raw',f'{each_eaup_name}.pkl',equp_data) return equp_data def train_equp_model(each_eaup_name,each_equp_type,equp_data,config_reader,config_reader_path): if each_equp_type in ['DHU_A','DHU_B']: equp_model = DHU_AB( DHU_type = each_equp_type, exist_Fa_H = config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'), exist_Fa_B = config_reader.get_equp_info(each_eaup_name,'存在补风口','bool'), ) elif each_equp_type in ['SDHU_A','SDHU_B']: equp_model = SDHU_AB( DHU_type = each_equp_type, exist_Fa_H = config_reader.get_equp_info(each_eaup_name,'存在回风口','bool'), ) else: raise NotImplementedError # 清洗数据 Path(f'{config_reader_path}/data/train/clean_log/').mkdir(parents=True, exist_ok=True) equp_data_clean = equp_model.clean_data( data = equp_data, data_type = ['input','observed'], print_process = True, fill_zero = False, save_log = f'{config_reader_path}/data/train/clean_log/{each_eaup_name}.txt', ) equp_data_clean = equp_data_clean.resample('15min').mean().dropna() save_data(f'{config_reader_path}/data/train/data_his_clean',f'{each_eaup_name}.pkl',equp_data_clean) if not config_reader.get_app_info(each_eaup_name,'模型训练','训练设备模型','bool'): return None,None if each_equp_type in ['DHU_A','DHU_B']: equp_model.fit( input_data = equp_data_clean, observed_data = equp_data_clean, plot_TVP = False, rw_FA_val = config_reader.get_app_info(each_eaup_name,'模型训练','新风阀门开度参数','bool') ) elif each_equp_type in ['SDHU_A','SDHU_B']: equp_model:SDHU_AB equp_model.fit( input_data = equp_data_clean, observed_data = equp_data_clean, plot_TVP = False ) else: raise NotImplementedError Path(f'{config_reader_path}/model').mkdir(parents=True, exist_ok=True) equp_model.save(f'{config_reader_path}/model/{each_eaup_name}.pkl') save_data(f'{config_reader_path}/data/train/data_TVP',f'{each_eaup_name}.csv',equp_model.TVP_data) save_data(f'{config_reader_path}/data/train/data_metric',f'{each_eaup_name}.csv',equp_model.TVP_metric.round(2)) return equp_model,equp_data_clean def train_room_model(each_eaup_name,each_equp_type,equp_data,config_reader:ConfigReader,config_reader_path): if not config_reader.get_app_info(each_eaup_name,'模型训练','训练房间模型','bool'): return None N_fit = 1000 equp_model_path = f'{config_reader_path}/model/{each_eaup_name}.pkl' if each_equp_type in ['DHU_A','DHU_B']: equp_model = DHU_AB.load(equp_model_path) Dout = equp_model.predict(equp_data.iloc[-N_fit:,:])['coil_3']['DoutA'] elif each_equp_type in ['SDHU_A','SDHU_B']: equp_model = SDHU_AB.load(equp_model_path) Dout = equp_model.predict(equp_data.iloc[-N_fit:,:])['wheel_1']['DoutP'] else: raise NotImplementedError N_room = config_reader.get_equp_info(each_eaup_name,'房间数量','int') path_diffdata = f'{config_reader_path}/plot/plot_room_diffdata/' path_lagcorr = f'{config_reader_path}/plot/plot_room_lagcorr/' Path(path_diffdata).mkdir(parents=True, exist_ok=True) Path(path_lagcorr).mkdir(parents=True, exist_ok=True) for i in range(1,N_room+1): Droom = equp_data.iloc[-N_fit:,:].loc[:,f'room_{i}_Dpv'].values room_model = RoomDewPredictor().fit_Droom(Dout=Dout,Droom=Droom) room_model.save(f'{config_reader_path}/model/{each_eaup_name}_room_{i}_Dpv.pkl') try: room_model.plot_diffdata(Dout,Droom).save(filename=f'{path_diffdata}/{each_eaup_name}_room_{i}_Dpv.png') room_model.plot_diffdata_lagcorr(Dout,Droom).save(filename=f'{path_lagcorr}/{each_eaup_name}_room_{i}_Dpv.png') except: pass return room_model def save_train_info(equp_model,equp_data,config_reader_path,each_eaup_name): for plot_name,plot in equp_model.plot_check(equp_data).items(): path = f'{config_reader_path}/plot/{plot_name}/' Path(path).mkdir(parents=True, exist_ok=True) plot.save(filename=f'{path}/{each_eaup_name}.png') path = f'{config_reader_path}/plot/TVP' Path(path).mkdir(parents=True, exist_ok=True) equp_model.plot_TVP(equp_model.TVP_data,save_path=f'{path}/{each_eaup_name}.png')