| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import os
- from datetime import datetime,timedelta
- from pathlib import Path
- from pprint import pprint
- import numpy as np
- import pandas as pd
- from ..DHU.config_reader import ConfigReader
- from ..._data.main import get_data
- from ..DHU.optimize import load_model
- from ...tools.data_loader import DataLoader
- from ..._utils.data_summary import print_dataframe,summary_dataframe
- from ...model.Room.room import RoomDewPredictor
- from ..DHU.optimize import load_dhu_State
- PATH = os.path.dirname(os.path.realpath(__file__)).replace('\\','/')
- MODEL_FUNC_PATH = f'{PATH}/model_func.py'
- MODEL_FILE_PATH = f'./model.pkl'
- def predict(*inputs,config=None):
- if '__LOCAL' in config.keys():
- config_reader_path = config['__LOCAL']
- data_URL = config['__URL']
- else:
- config_reader_path = '/mnt/workflow_data'
- data_URL = 'http://basedataportal-svc:8080/data/getpointsdata'
-
- config_reader = ConfigReader(path=f'{config_reader_path}/DHU配置.xlsx')
-
- for each_eaup_name in config_reader.all_equp_names:
- each_equp_type = config_reader.get_equp_info(each_eaup_name,key='设备类型',info_type='str')
-
- NOW = config_reader.get_app_info(each_eaup_name,'露点预测',key='开始时间',info_type='datetime')
-
- use_DHU_model = config_reader.get_app_info(each_eaup_name,'露点预测',key='模型方法',info_type='str')=='除湿机模型'
- dhu_State = load_dhu_State(
- each_eaup_name = each_eaup_name,
- config_reader = config_reader,
- config_reader_path = config_reader_path,
- data_URL = data_URL,
- NOW = NOW
- )
- print(f'{each_eaup_name}设备状态为{dhu_State}')
- if dhu_State < 1:
- print('设备处于非运行状态,跳过')
- continue
-
- if use_DHU_model:
- print('基于DHU模型进行预测')
- equp_data = load_data(
- cur_time = NOW,
- each_eaup_name = each_eaup_name,
- config_reader = config_reader,
- config_reader_path = config_reader_path,
- data_URL = data_URL,
- )
- equp_data_sm = equp_data.rolling(10,min_periods=0).mean()
- equp_model = load_model(
- each_eaup_name = each_eaup_name,
- each_equp_type = each_equp_type,
- config_reader = config_reader,
- config_reader_path = config_reader_path,
- use_adj_name = False
- )
- if each_equp_type in ['DHU_A','DHU_B']:
- Dout_pred = equp_model.predict(equp_data_sm)['coil_3']['DoutA']
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- Dout_pred = equp_model.predict(equp_data_sm)['wheel_1']['DoutP']
- else:
- raise NotImplementedError
- else:
- print('基于再生加热温度进行预测')
- room_model_name = 'TinR'
- point_A = config_reader.get_equp_point(each_eaup_name,equp_class=['A'])
- point = config_reader.get_equp_point(each_eaup_name,equp_class=['C'])
- if each_equp_type in ['DHU_A','DHU_B']:
- TinR_name = ['wheel_1_TinR','wheel_2_TinR']
- point['wheel_1_TinR'] = point_A['wheel_1_TinR']
- point['wheel_2_TinR'] = point_A['wheel_2_TinR']
- elif each_equp_type in ['SDHU_A','SDHU_B']:
- TinR_name = ['wheel_1_TinR']
- point['wheel_1_TinR'] = point_A['wheel_1_TinR']
- else:
- raise NotImplementedError
- equp_data = (
- DataLoader(
- path = f'{config_reader_path}/data/room_predict/data_cur/',
- start_time = NOW - timedelta(minutes=60),
- end_time = NOW,
- print_process = False
- )
- .download_equp_data(
- equp_name = each_eaup_name,
- point = point,
- url = data_URL,
- clean_cache = True
- )
- .get_equp_data(
- equp_name = each_eaup_name,
- )
- )
- equp_data_sm = equp_data.rolling(10,min_periods=0).mean()
- wheel_TinR = equp_data_sm.loc[:,TinR_name].mean(axis=1).values
- room_SP_point = config_reader.get_equp_point(equp_name = each_eaup_name,equp_class = ['C','D'])
-
- for each_room_num in range(1,config_reader.get_equp_info(each_eaup_name,'房间数量','int') + 1):
- try:
- Droom_cur = equp_data.loc[:,f'room_{each_room_num}_Dpv'].values[-1]
- if use_DHU_model:
- room_model = RoomDewPredictor.load(f'{config_reader_path}/model/{each_eaup_name}_room_{each_room_num}_Dpv.pkl')
- Droom_pred = room_model.predict_Droom(Dout=Dout_pred,Droom_cur=Droom_cur)
- else:
- room_model = RoomDewPredictor.load(f'{config_reader_path}/model/{each_eaup_name}_room_{each_room_num}_Dpv_{room_model_name}.pkl')
- Droom_pred = room_model.predict_Droom(Dout=wheel_TinR,Droom_cur=Droom_cur,sm_frac=0.5)
- pred_lag = room_model.model_info['model_Droom']['lag']
-
- print(f'{each_eaup_name}房间{each_room_num}在{NOW}的预测值')
- print(f'完整预测时序:{Droom_pred}')
- index = pd.Index(
- pd.date_range(
- start = NOW + timedelta(minutes=1),
- end = NOW + timedelta(minutes=1)+timedelta(minutes=int(pred_lag)),
- freq = '1min'
- )
- )
- # 完整时序预测
- write_ts_predict(
- point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpv']}",
- value = Droom_pred,
- ts = index
- )
- if config_reader.get_equp_info(each_eaup_name,'房间露点预测用当前值',info_type='bool') or len(Droom_pred)==0:
- print(f'{each_eaup_name}房间{each_room_num}仅输出当前露点')
- Droom_pred_last = Droom_cur
- pred_lag = 1
- else:
- Droom_pred_last = Droom_pred[-1]
- print(f'{pred_lag}分钟后的预测值为{Droom_pred_last}')
- # 最远预测点
- write_lag_predict(
- point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpd']}",
- value = Droom_pred_last
- )
- # 最远预测步长
- write_lag_predict(
- point_id = f"{each_eaup_name}_{room_SP_point[f'room_{each_room_num}_Dpdlag']}",
- value = pred_lag
- )
- except Exception as e:
- print(e)
- continue
-
- def load_data(
- cur_time,
- each_eaup_name,
- config_reader,
- config_reader_path,
- data_URL,
- ):
- data = (
- DataLoader(
- path = f'{config_reader_path}/data/room_predict/data_cur/',
- start_time = cur_time - timedelta(minutes=120),
- end_time = cur_time,
- print_process = False
- )
- .download_equp_data(
- equp_name = each_eaup_name,
- point = config_reader.get_equp_point(each_eaup_name,equp_class=['A','C']),
- url = data_URL,
- clean_cache = True
- )
- .get_equp_data(
- equp_name = each_eaup_name,
- )
- )
- summary_dataframe(data,f'{each_eaup_name}除湿机数据')
- return data
- def write_ts_predict(point_id,value,ts):
- ts_and_value = []
- for ts_i,value_i in zip(ts,value):
- each_ts_and_value = []
- each_ts_and_value.append(ts_i.to_pydatetime().timestamp())
- each_ts_and_value.append(value_i)
- ts_and_value.append(each_ts_and_value)
-
- try:
- from workflowlib import requests
- data = {'point_id': point_id, "json_data": ts_and_value}
- res = requests.post(
- 'http://m2-backend-svc:8000/api/ai/predict_data/add',
- json = data
- )
- print(res.json())
- # if res['errcode'] != 0:
- # print(res.json())
- # raise
- except Exception as e:
- print(f'{point_id}写入预测时序失败,{e}')
- def write_lag_predict(point_id,value):
-
- try:
- from workflowlib import requests
- res = requests.post(
- 'http://basedataportal-svc:8080/value/set_value',
- json={'point_id': point_id, "value": float(value)}
- )
- print(res.json())
- print(f'写入实时点位{point_id}={value}')
- except Exception as e:
- print(f'{point_id}写入预测值失败,{e}')
|