| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- import os
- from datetime import datetime,timedelta
- from pathlib import Path
- import numpy as np
- import pandas as pd
- from .._utils.point_io import PointReader
- from .._utils.config_info import ConfigInfo
- from .._utils.data_summary import summary_dataframe
- from .cache_data import CacheData
- def main(set_time_start=None,set_time_end=None,config=None):
-
- config_info = ConfigInfo(config)
- int_time = config_info.get_property('int_time',default='M')
- from_cache = config_info.get_property('from_cache',default=False)
- clean_cache = config_info.get_property('clean_cache',default=False)
- na_behave = config_info.get_property('na_behave',default='Exception')
- output_id = config_info.get_io_id(io='out')
- time_start,time_end = get_time(config_info,set_time_start,set_time_end,int_time)
-
- point_ids = [point['point_id'] for point in config['_PORTS_OUT']]
- file_name = config.get('_CODE','DEFAULT')
- file_name = 'DEFAULT' if file_name is None else file_name # 组件config中的_CODE偶尔会出现能取到None的情况
- PATH = Path(f'/mnt/workflow_data/{file_name}_File.pkl')
-
- if int_time == 'S':
- url = 'http://basedataportal-svc:8080/data/get_points_real_value'
- else:
- url = config_info.get_property('url','http://basedataportal-svc:8080/data/getpointsdata')
-
- # 清除缓存数据
- if clean_cache:
- CacheData.clean_cache(PATH=PATH)
-
- data = get_data(point_ids,time_start,time_end,int_time,url,from_cache,PATH)
- result = get_result(data,point_ids,output_id,na_behave)
-
- return result[0] if len(result)==1 else result
- def parse_str_time(str_time):
- return datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
- def get_result(data,point_ids,output_id,na_behave):
-
- result = []
-
- if len(output_id) == 0:
- return [None]
-
- for col in point_ids:
- if col in data.columns:
- df = data.loc[:,[col]]
- else:
- df = pd.DataFrame({col:np.ones(shape=data.index)*np.nan},index=data.index)
-
- if not df.isna().any().any():
- result.append(df)
-
- elif na_behave == 'Exception':
- raise Exception(f'{col}未获取到对应数据')
-
- elif na_behave == 'Zero':
- df = df.fillna(0)
- result.append(df)
-
- else:
- raise Exception('WRONG')
-
- # 非优化回算任务打印数据
- if 'OPTIM_CALC_LOG_ID' not in os.environ:
- print(result)
-
- return result
-
- def get_data(
- points_id : list,
- time_start: datetime,
- time_end : datetime,
- int_time : str,
- url : str,
- from_cache: bool,
- PATH
- ) -> pd.DataFrame:
- cache_data = CacheData(PATH,points_id,time_start,time_end,int_time)
-
- if not from_cache:
- print('Data Source : DB (Set)')
- data = get_data_from_db(points_id,time_start,time_end,int_time,url)
- data = data.fillna(-9999)
-
- elif cache_data.is_missing_point():
- print('Data Source : DB (Missing Point)')
- data = get_data_from_db(points_id,time_start,time_end,int_time,url)
- data = data.fillna(-9999)
-
- elif cache_data.is_missing_time():
- dt_end,dt_start = cache_data.get_missing_dt()
- print(f'Data Source : DB+CACHE (Missing Time {dt_start}~{dt_end})')
-
- data_db = get_data_from_db(points_id,dt_start,dt_end,int_time,url)
- data_db = data_db.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
- data_cache = cache_data.need_cache_data
- data_cache = data_cache.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
- data = data_cache.copy()
- data.update(data_db.fillna(value=-9999))
- data = data.loc[lambda dt:~dt.index.duplicated(keep='last'),:].sort_index()
-
- else:
- print('Data Source : CACHE')
- data = cache_data.need_cache_data
-
- cache_data.update_cache(data)
-
- print(f'URL : {url}')
- print(f'Int : {int_time}')
- summary_dataframe(data,'点位加载数据')
-
- return data
- def get_data_from_db(
- points_id : list,
- time_start: datetime,
- time_end : datetime,
- int_time : str,
- url : str
- ) -> pd.DataFrame:
-
- point_reader = PointReader(point_ids=points_id,dt_begin=time_start,dt_end=time_end,url=url)
-
- if int_time in ['M']:
- data = point_reader.read_interval()
- elif int_time in ['H','D']:
- data = point_reader.read_int_interval(freq=int_time)
- elif int_time in ['S']:
- data = point_reader.read_current()
- else:
- raise Exception('WRONG PARAM')
-
- return data
- def get_time(
- config_info:ConfigInfo,
- set_time_start,
- set_time_end,
- int_time:str
- ) -> tuple:
-
- if 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
- # 任务计划获取当前时间
- NOW = parse_str_time(os.environ.get('JOB_SCHEDULE_CALC_TIME_NOW')) - timedelta(minutes=1)
- else:
- # 如果取当前分钟的数据,在该分钟的前几秒可能会取不到数据,因此将当前时间往前推一分钟
- NOW = datetime.now() - timedelta(minutes=1)
-
- # 获取组件设定的开始和结束时间
- if isinstance(set_time_start,datetime) and isinstance(set_time_end,datetime):
- time_type = '组件输入'
- config_time_end = set_time_end
- config_time_start = set_time_start
- else:
- time_type = '组件配置'
-
- config_time_end = config_info.get_property('time_end',None)
- config_time_end = parse_str_time(config_time_end) if config_time_end is not None else NOW
- time_end_shift = config_info.get_property('time_end_shift',0)
- time_end_shift = timedelta(minutes=time_end_shift)
- config_time_end -= time_end_shift
-
- if config_info.get_property('time_start1',None) is not None:
- config_time_start = config_info.get_property('time_start1')
- config_time_start = parse_str_time(config_time_start)
-
- elif config_info.get_property('time_start2',None) is not None:
- config_time_start = config_info.get_property('time_start2')
- config_time_start = config_time_end - timedelta(minutes=config_time_start)
-
- else:
- config_time_start = config_time_end
-
- # # 根据环境确定取数时间
- # 仿真环境
- if 'SIMULATOR_JOB_ID' in os.environ:
- time_type = '仿真环境:' + time_type
- time_end = NOW
- time_start = time_end
-
- # 监控环境
- elif 'MONITOR_TYPE' in os.environ:
- time_type = '监控环境:' + time_type
- monitor_begin = os.environ.get('MONITOR_BEGIN', None)
- monitor_end = os.environ.get('MONITOR_END', None)
- time_end = parse_str_time(monitor_end)
- time_start = parse_str_time(monitor_begin) - timedelta(hours=1)
-
- # 优化回算
- elif 'OPTIM_CALC_LOG_ID' in os.environ:
- time_type = '优化回算:' + time_type
- time_end = parse_str_time(os.environ['OPTIM_TIME'])
- time_start = time_end - timedelta(minutes=10)
- print(f"os.environ['OPTIM_TIME']: {os.environ['OPTIM_TIME']}")
-
- # 实时优化环境
- elif 'OPTIM_JOB_ID' in os.environ:
- time_type = '实时优化:' + time_type
- time_end = config_time_end
- time_start = config_time_start
-
- elif 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
- time_type = '任务计划:' + time_type
- time_end = config_time_end
- time_start = config_time_start
-
- # 调试环境 WORKFLOW_DEBUG
- else:
- time_type = '调试环境:' + time_type
- time_end = config_time_end
- time_start = config_time_start
-
- if int_time == 'H':
- time_start = time_start.replace(minute=0)
- time_end = time_end.replace(minute=0)
- elif int_time == 'D':
- time_start = time_start.replace(hour=0,minute=0)
- time_end = time_end.replace(hour=0,minute=0)
- elif int_time == 'S':
- time_start = datetime.now()
- time_end = datetime.now()
- time_start = time_start.replace(second=0,microsecond=0)
- time_end = time_end.replace(second=0,microsecond=0)
-
- print(f'time_type : {time_type}')
- print(f'time_start : {time_start}')
- print(f'time_end : {time_end}')
-
- return time_start,time_end
|