import os from datetime import datetime,timedelta from pathlib import Path from pprint import pprint import numpy as np import pandas as pd from ..DHU.config_reader import ConfigReader from ..._data.main import get_data from ..DHU.optimize import load_model from ...tools.data_loader import DataLoader from ..._utils.data_summary import print_dataframe,summary_dataframe from ...model.Room.room import RoomDewPredictor from ..DHU.optimize import load_dhu_State PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/') MODEL_FUNC_PATH = f'{PATH}/model_func.py' MODEL_FILE_PATH = f'./model.pkl' def predict(*inputs,config=None): if '__LOCAL' in config.keys(): config_reader_path = config['__LOCAL'] data_URL = config['__URL'] else: config_reader_path = '/mnt/workflow_data' data_URL = 'http://basedataportal-svc:8080/data/getpointsdata' config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx') for each_eaup_name in config_reader.all_equp_names: each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str') NOW = config_reader.get_app_info(each_eaup_name,'露点预测',key='开始时间',info_type='datetime') dhu_State = load_dhu_State( each_eaup_name = each_eaup_name, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL, NOW = NOW ) print(f'{each_eaup_name}设备状态为{dhu_State}') if dhu_State < 1: print('设备处于非运行状态,跳过') continue equp_data = load_data( cur_time = NOW, each_eaup_name = each_eaup_name, config_reader = config_reader, config_reader_path = config_reader_path, data_URL = data_URL, ) equp_data_sm = equp_data.rolling(10,min_periods=0).mean() equp_model = load_model( each_eaup_name = each_eaup_name, each_equp_type = each_equp_type, config_reader = config_reader, config_reader_path = config_reader_path, use_adj_name = False ) room_SP_point = config_reader.get_equp_point( equp_name = each_eaup_name, equp_class = ['C','D'] ) if each_equp_type in ['DHU_A','DHU_B']: Dout_pred = equp_model.predict(equp_data_sm)['coil_3']['DoutA'] elif each_equp_type in ['SDHU_A','SDHU_B']: Dout_pred = equp_model.predict(equp_data_sm)['wheel_1']['DoutP'] else: raise NotImplementedError for each_room_num in range( 1,config_reader.get_equp_info(each_eaup_name,'房间数量','int') + 1 ): room_model = RoomDewPredictor.load( f'{config_reader_path}/model/{each_eaup_name}_room_{each_room_num}_Dpv.pkl' ) Droom_cur = equp_data.loc[:,f'room_{each_room_num}_Dpv'].values[-1] Droom_pred = room_model.predict_Droom(Dout=Dout_pred,Droom_cur=Droom_cur) pred_lag = room_model.model_info['model_Droom']['lag'] print(f'{each_eaup_name}房间{each_room_num}在{NOW}的预测值') print(f'完整预测时序:{Droom_pred}') print(f'{pred_lag}分钟后的预测值为{Droom_pred[-1]}') index = pd.Index( pd.date_range( start = NOW+timedelta(minutes=1), end = NOW+timedelta(minutes=1)+timedelta(minutes=int(pred_lag)), freq = '1min' ) ) # 完整时序预测 write_ts_predict( point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpv']}", value = Droom_pred, ts = index ) # 最远预测点 write_lag_predict( point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpd']}", value = Droom_pred[-1] ) # 最远预测步长 write_lag_predict( point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpdlag']}", value = pred_lag ) def load_data( cur_time, each_eaup_name, config_reader, config_reader_path, data_URL, ): data = ( DataLoader( path = f'{config_reader_path}/data/room_predict/data_cur/', start_time = cur_time - timedelta(minutes=120), end_time = cur_time, print_process = False ) .download_equp_data( equp_name = each_eaup_name, point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','C']), url = data_URL, clean_cache = True ) .get_equp_data( equp_name = each_eaup_name, ) ) summary_dataframe(data,f'{each_eaup_name}除湿机数据') return data def write_ts_predict(point_id,value,ts): ts_and_value = [] for ts_i,value_i in zip(ts,value): each_ts_and_value = [] each_ts_and_value.append(ts_i.to_pydatetime().timestamp()) each_ts_and_value.append(value_i) ts_and_value.append(each_ts_and_value) try: from workflowlib import requests data = {'point_id': point_id, "json_data": ts_and_value} res = requests.post( 'http://m2-backend-svc:8000/api/ai/predict_data/add', json = data ) print(res.json()) # if res['errcode'] != 0: # print(res.json()) # raise except Exception as e: print(f'{point_id}写入预测时序失败,{e}') def write_lag_predict(point_id,value): try: from workflowlib import requests res = requests.post( 'http://basedataportal-svc:8080/value/set_value', json={'point_id': point_id, "value": float(value)} ) print(res.json()) print(f'写入实时点位{point_id}={value}') except Exception as e: print(f'{point_id}写入预测值失败,{e}')