import os from datetime import datetime,timedelta from pathlib import Path import numpy as np import pandas as pd from .._utils.point_io import PointReader from .._utils.config_info import ConfigInfo from .._utils.data_summary import summary_dataframe from .cache_data import CacheData def main(set_time_start=None,set_time_end=None,config=None): config_info = ConfigInfo(config) int_time = config_info.get_property('int_time',default='M') from_cache = config_info.get_property('from_cache',default=False) clean_cache = config_info.get_property('clean_cache',default=False) na_behave = config_info.get_property('na_behave',default='Exception') output_id = config_info.get_io_id(io='out') time_start,time_end = get_time(config_info,set_time_start,set_time_end,int_time) point_ids = [point['point_id'] for point in config['_PORTS_OUT']] file_name = config.get('_CODE','DEFAULT') file_name = 'DEFAULT' if file_name is None else file_name # 组件config中的_CODE偶尔会出现能取到None的情况 PATH = Path(f'/mnt/workflow_data/{file_name}_File.pkl') if int_time == 'S': url = 'http://basedataportal-svc:8080/data/get_points_real_value' else: url = config_info.get_property('url','http://basedataportal-svc:8080/data/getpointsdata') # 清除缓存数据 if clean_cache: CacheData.clean_cache(PATH=PATH) data = get_data(point_ids,time_start,time_end,int_time,url,from_cache,PATH) result = get_result(data,point_ids,output_id,na_behave) return result[0] if len(result)==1 else result def parse_str_time(str_time): return datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S') def get_result(data,point_ids,output_id,na_behave): result = [] if len(output_id) == 0: return [None] for col in point_ids: if col in data.columns: df = data.loc[:,[col]] else: df = pd.DataFrame({col:np.ones(shape=data.index)*np.nan},index=data.index) if not df.isna().any().any(): result.append(df) elif na_behave == 'Exception': raise Exception(f'{col}未获取到对应数据') elif na_behave == 'Zero': df = df.fillna(0) result.append(df) else: raise Exception('WRONG') # 非优化回算任务打印数据 if 'OPTIM_CALC_LOG_ID' not in os.environ: print(result) return result def get_data( points_id : list, time_start: datetime, time_end : datetime, int_time : str, url : str, from_cache: bool, PATH ) -> pd.DataFrame: cache_data = CacheData(PATH,points_id,time_start,time_end,int_time) if not from_cache: print('Data Source : DB (Set)') data = get_data_from_db(points_id,time_start,time_end,int_time,url) data = data.fillna(-9999) elif cache_data.is_missing_point(): print('Data Source : DB (Missing Point)') data = get_data_from_db(points_id,time_start,time_end,int_time,url) data = data.fillna(-9999) elif cache_data.is_missing_time(): dt_end,dt_start = cache_data.get_missing_dt() print(f'Data Source : DB+CACHE (Missing Time {dt_start}~{dt_end})') data_db = get_data_from_db(points_id,dt_start,dt_end,int_time,url) data_db = data_db.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()] data_cache = cache_data.need_cache_data data_cache = data_cache.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()] data = data_cache.copy() data.update(data_db.fillna(value=-9999)) data = data.loc[lambda dt:~dt.index.duplicated(keep='last'),:].sort_index() else: print('Data Source : CACHE') data = cache_data.need_cache_data cache_data.update_cache(data) print(f'URL : {url}') print(f'Int : {int_time}') summary_dataframe(data,'点位加载数据') return data def get_data_from_db( points_id : list, time_start: datetime, time_end : datetime, int_time : str, url : str ) -> pd.DataFrame: point_reader = PointReader(point_ids=points_id,dt_begin=time_start,dt_end=time_end,url=url) if int_time in ['M']: data = point_reader.read_interval() elif int_time in ['H','D']: data = point_reader.read_int_interval(freq=int_time) elif int_time in ['S']: data = point_reader.read_current() else: raise Exception('WRONG PARAM') return data def get_time( config_info:ConfigInfo, set_time_start, set_time_end, int_time:str ) -> tuple: if 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ: # 任务计划获取当前时间 NOW = parse_str_time(os.environ.get('JOB_SCHEDULE_CALC_TIME_NOW')) - timedelta(minutes=1) else: # 如果取当前分钟的数据,在该分钟的前几秒可能会取不到数据,因此将当前时间往前推一分钟 NOW = datetime.now() - timedelta(minutes=1) # 获取组件设定的开始和结束时间 if isinstance(set_time_start,datetime) and isinstance(set_time_end,datetime): time_type = '组件输入' config_time_end = set_time_end config_time_start = set_time_start else: time_type = '组件配置' config_time_end = config_info.get_property('time_end',None) config_time_end = parse_str_time(config_time_end) if config_time_end is not None else NOW time_end_shift = config_info.get_property('time_end_shift',0) time_end_shift = timedelta(minutes=time_end_shift) config_time_end -= time_end_shift if config_info.get_property('time_start1',None) is not None: config_time_start = config_info.get_property('time_start1') config_time_start = parse_str_time(config_time_start) elif config_info.get_property('time_start2',None) is not None: config_time_start = config_info.get_property('time_start2') config_time_start = config_time_end - timedelta(minutes=config_time_start) else: config_time_start = config_time_end # # 根据环境确定取数时间 # 仿真环境 if 'SIMULATOR_JOB_ID' in os.environ: time_type = '仿真环境:' + time_type time_end = NOW time_start = time_end # 监控环境 elif 'MONITOR_TYPE' in os.environ: time_type = '监控环境:' + time_type monitor_begin = os.environ.get('MONITOR_BEGIN', None) monitor_end = os.environ.get('MONITOR_END', None) time_end = parse_str_time(monitor_end) time_start = parse_str_time(monitor_begin) - timedelta(hours=1) # 优化回算 elif 'OPTIM_CALC_LOG_ID' in os.environ: time_type = '优化回算:' + time_type time_end = parse_str_time(os.environ['OPTIM_TIME']) time_start = time_end - timedelta(minutes=10) print(f"os.environ['OPTIM_TIME']: {os.environ['OPTIM_TIME']}") # 实时优化环境 elif 'OPTIM_JOB_ID' in os.environ: time_type = '实时优化:' + time_type time_end = config_time_end time_start = config_time_start elif 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ: time_type = '任务计划:' + time_type time_end = config_time_end time_start = config_time_start # 调试环境 WORKFLOW_DEBUG else: time_type = '调试环境:' + time_type time_end = config_time_end time_start = config_time_start if int_time == 'H': time_start = time_start.replace(minute=0) time_end = time_end.replace(minute=0) elif int_time == 'D': time_start = time_start.replace(hour=0,minute=0) time_end = time_end.replace(hour=0,minute=0) elif int_time == 'S': time_start = datetime.now() time_end = datetime.now() time_start = time_start.replace(second=0,microsecond=0) time_end = time_end.replace(second=0,microsecond=0) print(f'time_type : {time_type}') print(f'time_start : {time_start}') print(f'time_end : {time_end}') return time_start,time_end